You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2020/01/19 16:45:41 UTC

[nifi] 01/02: NIFI-818: This closes #3926. Initial implementation of NiFi-Accumulo ( https://github.com/phrocker/nifi-accumulo ) with connectors to Apache Accumulo 2.x

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit de2a286a7a01725ee11d839ead7811a4135f8b41
Author: Marc Parisi <ph...@apache.org>
AuthorDate: Wed Dec 11 11:58:56 2019 -0500

    NIFI-818: This closes #3926. Initial implementation of NiFi-Accumulo ( https://github.com/phrocker/nifi-accumulo ) with connectors to Apache Accumulo 2.x
    
    Signed-off-by: Joe Witt <jo...@apache.org>
---
 nifi-assembly/pom.xml                              |  27 +
 nifi-nar-bundles/nifi-accumulo-bundle/README.md    |  23 +
 .../nifi-accumulo-bundle/nifi-accumulo-nar/pom.xml |  52 ++
 .../nifi-accumulo-processors/pom.xml               | 101 ++++
 .../accumulo/data/AccumuloRecordConfiguration.java | 159 +++++
 .../org/apache/nifi/accumulo/data/KeySchema.java   | 115 ++++
 .../accumulo/processors/BaseAccumuloProcessor.java |  76 +++
 .../accumulo/processors/PutAccumuloRecord.java     | 657 +++++++++++++++++++++
 .../nifi/accumulo/processors/ScanAccumulo.java     | 349 +++++++++++
 .../services/org.apache.nifi.processor.Processor   |  16 +
 .../controllerservices/MockAccumuloService.java    |  42 ++
 .../nifi/accumulo/processors/TestPutRecord.java    | 218 +++++++
 .../nifi/accumulo/processors/TestScanAccumulo.java | 236 ++++++++
 .../nifi-accumulo-services-api-nar/pom.xml         |  52 ++
 .../nifi-accumulo-services-api/pom.xml             |  61 ++
 .../controllerservices/BaseAccumuloService.java    |  32 +
 .../nifi-accumulo-services-nar/pom.xml             |  53 ++
 .../nifi-accumulo-services/pom.xml                 |  70 +++
 .../controllerservices/AccumuloService.java        | 210 +++++++
 .../org.apache.nifi.controller.ControllerService   |  15 +
 nifi-nar-bundles/nifi-accumulo-bundle/pom.xml      |  50 ++
 .../reporting/datadog/DataDogReportingTask.java    |   2 +-
 nifi-nar-bundles/pom.xml                           |   3 +-
 23 files changed, 2617 insertions(+), 2 deletions(-)

diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index b69aa64..7c9c7d8 100755
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -897,6 +897,33 @@ language governing permissions and limitations under the License. -->
                     <type>nar</type>
                 </dependency>
             </dependencies>
+    	</profile>
+	<profile>
+            <id>include-accumulo</id>
+            <!-- This profile handles the inclusion of nifi-accumulo artifacts. -->
+            <activation>
+                <activeByDefault>false</activeByDefault>
+            </activation>
+            <dependencies>
+                <dependency>
+                    <groupId>org.apache.nifi</groupId>
+                    <artifactId>nifi-accumulo-nar</artifactId>
+                    <version>1.11.0-SNAPSHOT</version>
+                    <type>nar</type>
+	    	</dependency>
+		<dependency>
+                    <groupId>org.apache.nifi</groupId>
+                    <artifactId>nifi-accumulo-services-api-nar</artifactId>
+                    <version>1.11.0-SNAPSHOT</version>
+                    <type>nar</type>
+		</dependency>
+		<dependency>
+                    <groupId>org.apache.nifi</groupId>
+                    <artifactId>nifi-accumulo-services-nar</artifactId>
+                    <version>1.11.0-SNAPSHOT</version>
+                    <type>nar</type>
+                </dependency>
+            </dependencies>
         </profile>
         <profile>
             <id>rpm</id>
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/README.md b/nifi-nar-bundles/nifi-accumulo-bundle/README.md
new file mode 100644
index 0000000..e431586
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/README.md
@@ -0,0 +1,23 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# nifi-accumulo
+
+This is a basic NiFi->Accumulo integration. Running `mvn install` will create your NAR, which can be added
+to Apache NiFi. This is intended to be created with Apache Accumulo 2.x.
+
+The resulting NAR will be named 'nifi-accumulo-nar'
+
+
+Note that some of this code was modeled after the HBase work.
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-nar/pom.xml b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-nar/pom.xml
new file mode 100644
index 0000000..b128110
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-nar/pom.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-accumulo-bundle</artifactId>
+        <version>1.11.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-accumulo-nar</artifactId>
+    <version>1.11.0-SNAPSHOT</version>
+    <packaging>nar</packaging>
+    <properties>
+        <maven.javadoc.skip>false</maven.javadoc.skip>
+        <source.skip>true</source.skip>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-accumulo-processors</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-accumulo-services-api-nar</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+            <type>nar</type>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/pom.xml b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/pom.xml
new file mode 100644
index 0000000..23e9902
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/pom.xml
@@ -0,0 +1,101 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+	    <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-accumulo-bundle</artifactId>
+        <version>1.11.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-accumulo-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-accumulo-services-api</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.accumulo</groupId>
+            <artifactId>accumulo-core</artifactId>
+            <version>${accumulo.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>3.9</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock-record-utils</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-path</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.accumulo</groupId>
+            <artifactId>accumulo-minicluster</artifactId>
+            <version>${accumulo.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-accumulo-services</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/data/AccumuloRecordConfiguration.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/data/AccumuloRecordConfiguration.java
new file mode 100644
index 0000000..164f9d5
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/data/AccumuloRecordConfiguration.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.accumulo.data;
+
+/**
+ * Encapsulates configuring the session with some required parameters.
+ *
+ * Justification: Generally not a fan of this fluent API to configure other objects, but there is a lot encapsulated here
+ * so it helps minimize what we pass between the current set of classes and the upcoming features.
+ */
+public class AccumuloRecordConfiguration {
+    private String tableName;
+    private String rowFieldName;
+    private String columnFamily;
+    private String columnFamilyField;
+    private String timestampField;
+    private String fieldDelimiter;
+    private boolean encodeFieldDelimiter;
+    private boolean qualifierInKey;
+    private boolean deleteKeys;
+
+
+    protected AccumuloRecordConfiguration(final String tableName, final String rowFieldName, final String columnFamily,
+                                          final String columnFamilyField,
+                                          final String timestampField, final String fieldDelimiter,
+                                          final boolean encodeFieldDelimiter,
+                                          final boolean qualifierInKey, final boolean deleteKeys) {
+        this.tableName = tableName;
+        this.rowFieldName = rowFieldName;
+        this.columnFamily = columnFamily;
+        this.columnFamilyField = columnFamilyField;
+        this.timestampField = timestampField;
+        this.fieldDelimiter = fieldDelimiter;
+        this.encodeFieldDelimiter = encodeFieldDelimiter;
+        this.qualifierInKey = qualifierInKey;
+        this.deleteKeys = deleteKeys;
+    }
+
+    public String getTableName(){
+        return tableName;
+    }
+
+    public String getColumnFamily() {
+        return columnFamily;
+    }
+
+    public String getColumnFamilyField() {
+        return columnFamilyField;
+    }
+
+    public boolean getEncodeDelimiter(){
+        return encodeFieldDelimiter;
+    }
+
+    public String getTimestampField(){
+
+        return timestampField;
+    }
+
+    public String getFieldDelimiter(){
+        return fieldDelimiter;
+    }
+
+    public boolean getQualifierInKey(){
+        return qualifierInKey;
+    }
+
+    public boolean isDeleteKeys(){
+        return deleteKeys;
+    }
+
+
+    public String getRowField(){
+        return rowFieldName;
+    }
+
+    public static class Builder{
+
+        public static final Builder newBuilder(){
+            return new Builder();
+        }
+
+        public Builder setRowField(final String rowFieldName){
+            this.rowFieldName = rowFieldName;
+            return this;
+        }
+
+        public Builder setTableName(final String tableName){
+            this.tableName = tableName;
+            return this;
+        }
+
+        public Builder setEncodeFieldDelimiter(final boolean encodeFieldDelimiter){
+            this.encodeFieldDelimiter = encodeFieldDelimiter;
+            return this;
+        }
+
+
+        public Builder setColumnFamily(final String columnFamily){
+            this.columnFamily = columnFamily;
+            return this;
+        }
+
+        public Builder setColumnFamilyField(final String columnFamilyField){
+            this.columnFamilyField = columnFamilyField;
+            return this;
+        }
+
+        public Builder setTimestampField(final String timestampField){
+            this.timestampField = timestampField;
+            return this;
+        }
+
+        public Builder setQualifierInKey(final boolean qualifierInKey){
+            this.qualifierInKey = qualifierInKey;
+            return this;
+        }
+
+        public Builder setFieldDelimiter(final String fieldDelimiter){
+            this.fieldDelimiter = fieldDelimiter;
+            return this;
+        }
+
+        public Builder setDelete(final boolean deleteKeys){
+            this.deleteKeys = deleteKeys;
+            return this;
+        }
+
+        public AccumuloRecordConfiguration build(){
+            return new AccumuloRecordConfiguration(tableName,rowFieldName,columnFamily,columnFamilyField,timestampField,fieldDelimiter,encodeFieldDelimiter,qualifierInKey,deleteKeys);
+        }
+
+
+        private String tableName;
+        private String rowFieldName;
+        private String columnFamily;
+        private String columnFamilyField;
+        private String fieldDelimiter;
+        private boolean qualifierInKey=false;
+        private boolean encodeFieldDelimiter=false;
+        private String timestampField;
+        private boolean deleteKeys=false;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/data/KeySchema.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/data/KeySchema.java
new file mode 100644
index 0000000..7ac74b8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/data/KeySchema.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.accumulo.data;
+
+
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+public class KeySchema implements RecordSchema {
+    private static final List<RecordField> KEY_FIELDS = new ArrayList<>();
+
+    private static final List<DataType> DATA_TYPES = new ArrayList<>();
+
+    private static final List<String> FIELD_NAMES = new ArrayList<>();
+
+    static {
+        KEY_FIELDS.add(new RecordField("row", RecordFieldType.STRING.getDataType(),false));
+        KEY_FIELDS.add(new RecordField("columnFamily",RecordFieldType.STRING.getDataType(),true));
+        KEY_FIELDS.add(new RecordField("columnQualifier",RecordFieldType.STRING.getDataType(),true));
+        KEY_FIELDS.add(new RecordField("columnVisibility",RecordFieldType.STRING.getDataType(),true));
+        KEY_FIELDS.add(new RecordField("timestamp",RecordFieldType.LONG.getDataType(),true));
+        DATA_TYPES.add(RecordFieldType.STRING.getDataType());
+        DATA_TYPES.add(RecordFieldType.LONG.getDataType());
+        FIELD_NAMES.addAll(KEY_FIELDS.stream().map( x-> x.getFieldName()).collect(Collectors.toList()));
+    }
+    @Override
+    public List<RecordField> getFields() {
+        return KEY_FIELDS;
+    }
+
+    @Override
+    public int getFieldCount() {
+        return KEY_FIELDS.size();
+    }
+
+    @Override
+    public RecordField getField(int i) {
+        return KEY_FIELDS.get(i);
+    }
+
+    @Override
+    public List<DataType> getDataTypes() {
+         return DATA_TYPES;
+    }
+
+    @Override
+    public List<String> getFieldNames() {
+        return FIELD_NAMES;
+    }
+
+    @Override
+    public Optional<DataType> getDataType(String s) {
+        if (s.equalsIgnoreCase("timestamp")){
+            return Optional.of( RecordFieldType.LONG.getDataType() );
+        } else{
+            if (FIELD_NAMES.stream().filter(x -> s.equalsIgnoreCase(s)).count() > 0){
+                return  Optional.of(RecordFieldType.STRING.getDataType());
+            }
+        }
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<String> getSchemaText() {
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<String> getSchemaFormat() {
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<RecordField> getField(final String s) {
+        return KEY_FIELDS.stream().filter(x -> x.getFieldName().equalsIgnoreCase(s)).findFirst();
+    }
+
+    @Override
+    public SchemaIdentifier getIdentifier() {
+        return SchemaIdentifier.builder().name("AccumuloKeySchema").version(1).branch("nifi-accumulo").build();
+    }
+
+    @Override
+    public Optional<String> getSchemaName() {
+        return Optional.of("AccumuloKeySchema");
+    }
+
+    @Override
+    public Optional<String> getSchemaNamespace() {
+        return Optional.of("nifi-accumulo");
+    }
+}
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/BaseAccumuloProcessor.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/BaseAccumuloProcessor.java
new file mode 100644
index 0000000..d0888ac
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/BaseAccumuloProcessor.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.accumulo.processors;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.nifi.accumulo.controllerservices.BaseAccumuloService;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.util.StandardValidators;
+
+/**
+ * Base Accumulo class that provides connector services, table name, and thread
+ * properties
+ */
+public abstract class BaseAccumuloProcessor extends AbstractProcessor {
+
+    protected static final PropertyDescriptor ACCUMULO_CONNECTOR_SERVICE = new PropertyDescriptor.Builder()
+            .name("accumulo-connector-service")
+            .displayName("Accumulo Connector Service")
+            .description("Specifies the Controller Service to use for accessing Accumulo.")
+            .required(true)
+            .identifiesControllerService(BaseAccumuloService.class)
+            .build();
+
+
+    protected static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
+            .name("Table Name")
+            .description("The name of the Accumulo Table into which data will be placed")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    protected static final PropertyDescriptor CREATE_TABLE = new PropertyDescriptor.Builder()
+            .name("Create Table")
+            .description("Creates a table if it does not exist. This property will only be used when EL is not present in 'Table Name'")
+            .required(true)
+            .defaultValue("False")
+            .allowableValues("True", "False")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+
+    protected static final PropertyDescriptor THREADS = new PropertyDescriptor.Builder()
+            .name("Threads")
+            .description("Number of threads used for reading and writing")
+            .required(false)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("10")
+            .build();
+
+    /**
+     * Implementations can decide to include all base properties or individually include them. List is immutable
+     * so that implementations must constructor their own lists knowingly
+     */
+
+    protected static final ImmutableList<PropertyDescriptor> baseProperties = ImmutableList.of(ACCUMULO_CONNECTOR_SERVICE,TABLE_NAME,CREATE_TABLE,THREADS);
+
+
+}
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/PutAccumuloRecord.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/PutAccumuloRecord.java
new file mode 100644
index 0000000..dbcc3e7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/PutAccumuloRecord.java
@@ -0,0 +1,657 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.accumulo.processors;
+
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.MultiTableBatchWriter;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.hadoop.io.Text;
+import org.apache.nifi.annotation.behavior.DynamicProperties;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.accumulo.controllerservices.BaseAccumuloService;
+import org.apache.nifi.accumulo.data.AccumuloRecordConfiguration;
+
+import javax.xml.bind.DatatypeConverter;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@EventDriven
+@SupportsBatching
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"hadoop", "accumulo", "put", "record"})
+@DynamicProperties({
+        @DynamicProperty(name = "visibility.<COLUMN FAMILY>", description = "Visibility label for everything under that column family " +
+                "when a specific label for a particular column qualifier is not available.", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+                value = "visibility label for <COLUMN FAMILY>"
+        ),
+        @DynamicProperty(name = "visibility.<COLUMN FAMILY>.<COLUMN QUALIFIER>", description = "Visibility label for the specified column qualifier " +
+                "qualified by a configured column family.", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+                value = "visibility label for <COLUMN FAMILY>:<COLUMN QUALIFIER>."
+        )
+})
+/**
+ * Purpose and Design: Requires a connector be defined by way of an AccumuloService object. This class
+ * simply extens BaseAccumuloProcessor to extract records from a flow file. The location of a record field value can be
+ * placed into the value or part of the column qualifier ( this can/may change )
+ *
+ * Supports deletes. If the delete flag is used we'll delete keys found within that flow file.
+ */
+public class PutAccumuloRecord extends BaseAccumuloProcessor {
+
+    protected static final PropertyDescriptor MEMORY_SIZE = new PropertyDescriptor.Builder()
+            .name("Memory Size")
+            .description("The maximum memory size Accumulo at any one time from the record set.")
+            .required(true)
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .defaultValue("10 MB")
+            .build();
+
+    protected static final PropertyDescriptor COLUMN_FAMILY = new PropertyDescriptor.Builder()
+            .name("Column Family")
+            .description("The Column Family to use when inserting data into Accumulo")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(Validator.VALID)
+            .build();
+
+    protected static final PropertyDescriptor COLUMN_FAMILY_FIELD = new PropertyDescriptor.Builder()
+            .name("Column Family Field")
+            .description("Field name used as the column family if one is not specified above.")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(Validator.VALID)
+            .build();
+
+    protected static final PropertyDescriptor DELETE_KEY = new PropertyDescriptor.Builder()
+            .name("delete-key")
+            .displayName("Delete Key")
+            .description("Deletes the key")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    protected static final PropertyDescriptor RECORD_IN_QUALIFIER = new PropertyDescriptor.Builder()
+            .name("record-value-in-qualifier")
+            .displayName("Record Value In Qualifier")
+            .description("Places the record value into the column qualifier instead of the value.")
+            .required(false)
+            .defaultValue("False")
+            .allowableValues("True", "False")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    protected static final PropertyDescriptor FLUSH_ON_FLOWFILE = new PropertyDescriptor.Builder()
+            .name("flush-on-flow-file")
+            .displayName("Flush Every FlowFile")
+            .description("Flushes the table writer on every flow file.")
+            .required(true)
+            .defaultValue("True")
+            .allowableValues("True", "False")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    protected static final PropertyDescriptor FIELD_DELIMITER_AS_HEX = new PropertyDescriptor.Builder()
+            .name("field-delimiter-as-hex")
+            .displayName("Hex Encode Field Delimiter")
+            .description("Allows you to hex encode the delimiter as a character. So 0x00 places a null character between the record name and value.")
+            .required(false)
+            .defaultValue("False")
+            .allowableValues("True", "False")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    protected static final PropertyDescriptor FIELD_DELIMITER = new PropertyDescriptor.Builder()
+            .name("field-delimiter")
+            .displayName("Field Delimiter")
+            .description("Delimiter between the record value and name. ")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+
+    protected static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    protected static final PropertyDescriptor ROW_FIELD_NAME = new PropertyDescriptor.Builder()
+            .name("Row Identifier Field Name")
+            .description("Specifies the name of a record field whose value should be used as the row id for the given record." +
+                    " If EL defines a value that is not a field name that will be used as the row identifier.")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+
+    protected static final PropertyDescriptor TIMESTAMP_FIELD = new PropertyDescriptor.Builder()
+            .name("timestamp-field")
+            .displayName("Timestamp Field")
+            .description("Specifies the name of a record field whose value should be used as the timestamp. If empty a timestamp will be recorded as the time of insertion")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+
+    protected static final PropertyDescriptor VISIBILITY_PATH = new PropertyDescriptor.Builder()
+            .name("visibility-path")
+            .displayName("Visibility String Record Path Root")
+            .description("A record path that points to part of the record which contains a path to a mapping of visibility strings to record paths")
+            .required(false)
+            .addValidator(Validator.VALID)
+            .build();
+
+    protected static final PropertyDescriptor DEFAULT_VISIBILITY = new PropertyDescriptor.Builder()
+            .name("default-visibility")
+            .displayName("Default Visibility")
+            .description("Default visibility when VISIBILITY_PATH is not defined. ")
+            .required(false)
+            .addValidator(Validator.VALID)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is routed to this relationship after it has been successfully stored in Accumulo")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is routed to this relationship if it cannot be sent to Accumulo")
+            .build();
+
+
+    /**
+     * Connector service which provides us a connector if the configuration is correct.
+     */
+    protected BaseAccumuloService accumuloConnectorService;
+
+    /**
+     * Connector that we need to persist while we are operational.
+     */
+    protected AccumuloClient client;
+
+    /**
+     * Table writer that will close when we shutdown or upon error.
+     */
+    private MultiTableBatchWriter tableWriter = null;
+
+    /**
+     * Record path cache
+     */
+    protected RecordPathCache recordPathCache;
+
+
+    /**
+     * Flushes the tableWriter on every flow file if true.
+     */
+    protected boolean flushOnEveryFlow;
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_SUCCESS);
+        rels.add(REL_FAILURE);
+        return rels;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        Collection<ValidationResult> set = Collections.emptySet();
+        if (!validationContext.getProperty(COLUMN_FAMILY).isSet() && !validationContext.getProperty(COLUMN_FAMILY_FIELD).isSet())
+            set.add(new ValidationResult.Builder().explanation("Column Family OR Column family field name must be defined").build());
+        else if (validationContext.getProperty(COLUMN_FAMILY).isSet() && validationContext.getProperty(COLUMN_FAMILY_FIELD).isSet())
+            set.add(new ValidationResult.Builder().explanation("Column Family OR Column family field name must be defined, but not both").build());
+        return set;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        accumuloConnectorService = context.getProperty(ACCUMULO_CONNECTOR_SERVICE).asControllerService(BaseAccumuloService.class);
+        final Double maxBytes = context.getProperty(MEMORY_SIZE).asDataSize(DataUnit.B);
+        this.client = accumuloConnectorService.getClient();
+        BatchWriterConfig writerConfig = new BatchWriterConfig();
+        writerConfig.setMaxWriteThreads(context.getProperty(THREADS).asInteger());
+        writerConfig.setMaxMemory(maxBytes.longValue());
+        tableWriter = client.createMultiTableBatchWriter(writerConfig);
+        flushOnEveryFlow = context.getProperty(FLUSH_ON_FLOWFILE).asBoolean();
+        if (!flushOnEveryFlow){
+            writerConfig.setMaxLatency(60, TimeUnit.SECONDS);
+        }
+
+        if (context.getProperty(CREATE_TABLE).asBoolean() && !context.getProperty(TABLE_NAME).isExpressionLanguagePresent()) {
+            final Map<String, String> flowAttributes = new HashMap<>();
+            final String table = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowAttributes).getValue();
+              final TableOperations tableOps = this.client.tableOperations();
+              if (!tableOps.exists(table)) {
+                  getLogger().info("Creating " + table + " table.");
+                  try {
+                      tableOps.create(table);
+                  } catch (TableExistsException te) {
+                      // can safely ignore
+                  } catch (AccumuloSecurityException | AccumuloException e) {
+                      getLogger().info("Accumulo or Security error creating. Continuing... " + table + ". ", e);
+                  }
+              }
+        }
+    }
+
+
+    @OnUnscheduled
+    @OnDisabled
+    public synchronized void shutdown(){
+        /**
+         * Close the writer when we are shut down.
+         */
+        if (null != tableWriter){
+            try {
+                tableWriter.close();
+            } catch (MutationsRejectedException e) {
+                getLogger().error("Mutations were rejected",e);
+            }
+            tableWriter = null;
+        }
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>(baseProperties);
+        properties.add(RECORD_READER_FACTORY);
+        properties.add(ROW_FIELD_NAME);
+        properties.add(ROW_FIELD_NAME);
+        properties.add(COLUMN_FAMILY);
+        properties.add(COLUMN_FAMILY_FIELD);
+        properties.add(DELETE_KEY);
+        properties.add(FLUSH_ON_FLOWFILE);
+        properties.add(FIELD_DELIMITER);
+        properties.add(FIELD_DELIMITER_AS_HEX);
+        properties.add(MEMORY_SIZE);
+        properties.add(RECORD_IN_QUALIFIER);
+        properties.add(TIMESTAMP_FIELD);
+        properties.add(VISIBILITY_PATH);
+        properties.add(DEFAULT_VISIBILITY);
+        return properties;
+    }
+
+
+    @Override
+    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
+        final FlowFile flowFile = processSession.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final RecordReaderFactory recordParserFactory = processContext.getProperty(RECORD_READER_FACTORY)
+                .asControllerService(RecordReaderFactory.class);
+
+        final String recordPathText = processContext.getProperty(VISIBILITY_PATH).getValue();
+        final String defaultVisibility = processContext.getProperty(DEFAULT_VISIBILITY).isSet() ? processContext.getProperty(DEFAULT_VISIBILITY).getValue() : null;
+
+        final String tableName = processContext.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+
+        // create the table if EL is present, create table is true and the table does not exist.
+        if (processContext.getProperty(TABLE_NAME).isExpressionLanguagePresent() && processContext.getProperty(CREATE_TABLE).asBoolean()) {
+            final TableOperations tableOps = this.client.tableOperations();
+            if (!tableOps.exists(tableName)) {
+                getLogger().info("Creating " + tableName + " table.");
+                try {
+                    tableOps.create(tableName);
+                } catch (TableExistsException te) {
+                    // can safely ignore, though we shouldn't arrive here due to table.exists called, but it's possible
+                    // that with multiple threads two could attempt table creation concurrently. We don't want that
+                    // to be a failure.
+                } catch (AccumuloSecurityException | AccumuloException e) {
+                    throw new ProcessException("Accumulo or Security error creating. Continuing... " + tableName + ". ",e);
+                }
+            }
+        }
+
+        AccumuloRecordConfiguration builder = AccumuloRecordConfiguration.Builder.newBuilder()
+                .setTableName(tableName)
+                .setColumnFamily(processContext.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue())
+                .setColumnFamilyField(processContext.getProperty(COLUMN_FAMILY_FIELD).evaluateAttributeExpressions(flowFile).getValue())
+                .setRowField(processContext.getProperty(ROW_FIELD_NAME).evaluateAttributeExpressions(flowFile).getValue())
+                .setEncodeFieldDelimiter(processContext.getProperty(FIELD_DELIMITER_AS_HEX).asBoolean())
+                .setFieldDelimiter(processContext.getProperty(FIELD_DELIMITER).isSet() ? processContext.getProperty(FIELD_DELIMITER).evaluateAttributeExpressions(flowFile).getValue() : "")
+                .setQualifierInKey(processContext.getProperty(RECORD_IN_QUALIFIER).isSet() ? processContext.getProperty(RECORD_IN_QUALIFIER).asBoolean() : false)
+                .setDelete(processContext.getProperty(DELETE_KEY).isSet() ? processContext.getProperty(DELETE_KEY).evaluateAttributeExpressions(flowFile).asBoolean() : false)
+                .setTimestampField(processContext.getProperty(TIMESTAMP_FIELD).evaluateAttributeExpressions(flowFile).getValue()).build();
+
+
+        RecordPath recordPath = null;
+        if (recordPathCache != null && !StringUtils.isEmpty(recordPathText)) {
+            recordPath = recordPathCache.getCompiled(recordPathText);
+        }
+
+        boolean failed = false;
+        Mutation prevMutation=null;
+        try (final InputStream in = processSession.read(flowFile);
+             final RecordReader reader = recordParserFactory.createRecordReader(flowFile, in, getLogger())) {
+            Record record;
+            /**
+             * HBase supports a restart point. This may be something that we can/should add if needed.
+             */
+            while ((record = reader.nextRecord()) != null) {
+                prevMutation = createMutation(prevMutation, processContext, record, reader.getSchema(), recordPath, flowFile,defaultVisibility,  builder);
+
+            }
+            addMutation(builder.getTableName(),prevMutation);
+        } catch (Exception ex) {
+            getLogger().error("Failed to put records to Accumulo.", ex);
+            failed = true;
+        }
+
+        if (flushOnEveryFlow){
+            try {
+                tableWriter.flush();
+            } catch (MutationsRejectedException e) {
+                throw new ProcessException(e);
+            }
+        }
+
+
+        if (!failed) {
+            processSession.transfer(flowFile, REL_SUCCESS);
+        } else {
+            processSession.transfer(processSession.penalize(flowFile), REL_FAILURE);
+        }
+
+        processSession.commit();
+    }
+
+    /**
+     * Adapted from HBASEUtils. Their approach seemed ideal for what our intent is here.
+     * @param columnFamily column family from which to extract the visibility or to execute an expression against
+     * @param columnQualifier column qualifier from which to extract the visibility or to execute an expression against
+     * @param flowFile flow file being written
+     * @param context process context
+     * @return
+     */
+    public static String produceVisibility(String columnFamily, String columnQualifier, FlowFile flowFile, ProcessContext context) {
+        if (org.apache.commons.lang3.StringUtils.isNotEmpty(columnFamily)) {
+            return null;
+        }
+        String lookupKey = String.format("visibility.%s%s%s", columnFamily, !org.apache.commons.lang3.StringUtils.isNotEmpty(columnQualifier) ? "." : "", columnQualifier);
+        String fromAttribute = flowFile.getAttribute(lookupKey);
+
+        if (fromAttribute == null && !org.apache.commons.lang3.StringUtils.isBlank(columnQualifier)) {
+            String lookupKeyFam = String.format("visibility.%s", columnFamily);
+            fromAttribute = flowFile.getAttribute(lookupKeyFam);
+        }
+
+        if (fromAttribute != null) {
+            return fromAttribute;
+        } else {
+            PropertyValue descriptor = context.getProperty(lookupKey);
+            if (descriptor == null || !descriptor.isSet()) {
+                descriptor = context.getProperty(String.format("visibility.%s", columnFamily));
+            }
+
+            String retVal = descriptor != null ? descriptor.evaluateAttributeExpressions(flowFile).getValue() : null;
+
+            return retVal;
+        }
+    }
+
+    private void addMutation(final String tableName, final Mutation m) throws AccumuloSecurityException, AccumuloException, TableNotFoundException {
+        tableWriter.getBatchWriter(tableName).addMutation(m);
+
+    }
+
+    /**
+     * Returns the row provided the record schema
+     * @param record record against which we are evaluating
+     * @param schema Record schema
+     * @param rowOrFieldName Row identifier or field name
+     * @return Text object containing the resulting row.
+     */
+    private Text getRow(final Record record,
+                        final RecordSchema schema,
+                        final String rowOrFieldName){
+        if ( !schema.getFieldNames().contains(rowOrFieldName) ){
+            return new Text(rowOrFieldName);
+        } else{
+            return new Text(record.getAsString(rowOrFieldName));
+        }
+    }
+
+    /**
+     * Creates a mutation with the provided arguments
+     * @param prevMutation previous mutation, to append to if in the same row.
+     * @param context process context.
+     * @param record record object extracted from the flow file
+     * @param schema schema for this record
+     * @param recordPath record path for visibility extraction
+     * @param flowFile flow file
+     * @param defaultVisibility default visibility
+     * @param config configuration of this instance.
+     * @return Returns the Mutation to insert
+     * @throws AccumuloSecurityException Error accessing Accumulo
+     * @throws AccumuloException Non security ( or table ) related Accumulo exceptions writing to the store.
+     * @throws TableNotFoundException Table not found on the cluster
+     */
+    protected Mutation createMutation(final Mutation prevMutation,
+                                      final ProcessContext context,
+                                      final Record record,
+                                      final RecordSchema schema,
+                                      final RecordPath recordPath,
+                                      final FlowFile flowFile,
+                                      final String defaultVisibility,
+                                      AccumuloRecordConfiguration config) throws AccumuloSecurityException, AccumuloException, TableNotFoundException {
+        Mutation m=null;
+        if (record != null) {
+
+            final Long timestamp;
+            Set<String> fieldsToSkip = new HashSet<>();
+            if (!StringUtils.isBlank(config.getTimestampField())) {
+                try {
+                    timestamp = record.getAsLong(config.getTimestampField());
+                    fieldsToSkip.add(config.getTimestampField());
+                } catch (Exception e) {
+                    throw new AccumuloException("Could not convert " + config.getTimestampField() + " to a long", e);
+                }
+
+                if (timestamp == null) {
+                    getLogger().warn("The value of timestamp field " + config.getTimestampField() + " was null, record will be inserted with latest timestamp");
+                }
+            } else {
+                timestamp = null;
+            }
+
+
+
+            RecordField visField = null;
+            Map visSettings = null;
+            if (recordPath != null) {
+                final RecordPathResult result = recordPath.evaluate(record);
+                FieldValue fv = result.getSelectedFields().findFirst().get();
+                visField = fv.getField();
+                if (null != visField)
+                fieldsToSkip.add(visField.getFieldName());
+                visSettings = (Map)fv.getValue();
+            }
+
+
+            if (null != prevMutation){
+                Text row = new Text(prevMutation.getRow());
+                Text curRow = getRow(record,schema,config.getRowField());
+                if (row.equals(curRow)){
+                    m = prevMutation;
+                } else{
+                    m = new Mutation(curRow);
+                    addMutation(config.getTableName(),prevMutation);
+                }
+            } else{
+                Text row = getRow(record,schema,config.getRowField());
+                m = new Mutation(row);
+            }
+
+            fieldsToSkip.add(config.getRowField());
+
+            String columnFamily = config.getColumnFamily();
+            if (StringUtils.isBlank(columnFamily) && !StringUtils.isBlank(config.getColumnFamilyField())) {
+                final String cfField = config.getColumnFamilyField();
+                columnFamily = record.getAsString(cfField);
+                fieldsToSkip.add(cfField);
+            } else if (StringUtils.isBlank(columnFamily) && StringUtils.isBlank(config.getColumnFamilyField())){
+                throw new IllegalArgumentException("Invalid configuration for column family " + columnFamily + " and " + config.getColumnFamilyField());
+            }
+            final Text cf = new Text(columnFamily);
+
+            for (String name : schema.getFieldNames().stream().filter(p->!fieldsToSkip.contains(p)).collect(Collectors.toList())) {
+                String visString = (visField != null && visSettings != null && visSettings.containsKey(name))
+                        ? (String)visSettings.get(name) : defaultVisibility;
+
+                Text cq = new Text(name);
+                final Value value;
+                String recordValue  = record.getAsString(name);
+                if (config.getQualifierInKey()){
+                    final String delim = config.getFieldDelimiter();
+                    if (!StringUtils.isEmpty(delim)) {
+                        if (config.getEncodeDelimiter()) {
+                            byte [] asHex = DatatypeConverter.parseHexBinary(delim);
+                            cq.append(asHex, 0, asHex.length);
+                        }else{
+                            cq.append(delim.getBytes(), 0, delim.length());
+                        }
+                    }
+                    cq.append(recordValue.getBytes(),0,recordValue.length());
+                    value = new Value();
+                } else{
+                    value = new Value(recordValue.getBytes());
+                }
+
+                if (StringUtils.isBlank(visString)) {
+                    visString = produceVisibility(cf.toString(), cq.toString(), flowFile, context);
+                }
+
+                ColumnVisibility cv = new ColumnVisibility();
+                if (StringUtils.isBlank(visString)) {
+                    if (!StringUtils.isBlank(defaultVisibility)) {
+                        cv = new ColumnVisibility(defaultVisibility);
+                    }
+                } else {
+                    cv = new ColumnVisibility(visString);
+                }
+
+                if (null != timestamp) {
+                    if (config.isDeleteKeys()) {
+                        m.putDelete(cf, cq, cv, timestamp);
+                    } else {
+                        m.put(cf, cq, cv, timestamp, value);
+                    }
+                } else{
+                    if (config.isDeleteKeys())
+                        m.putDelete(cf, cq, cv);
+                    else
+                        m.put(cf, cq, cv, value);
+                }
+            }
+
+
+
+        }
+
+        return m;
+    }
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+        /**
+         * Adapted from HBase puts. This is a good approach and one that we should adopt here, too.
+         */
+        if (propertyDescriptorName.startsWith("visibility.")) {
+            String[] parts = propertyDescriptorName.split("\\.");
+            String displayName;
+            String description;
+
+            if (parts.length == 2) {
+                displayName = String.format("Column Family %s Default Visibility", parts[1]);
+                description = String.format("Default visibility setting for %s", parts[1]);
+            } else if (parts.length == 3) {
+                displayName = String.format("Column Qualifier %s.%s Default Visibility", parts[1], parts[2]);
+                description = String.format("Default visibility setting for %s.%s", parts[1], parts[2]);
+            } else {
+                return null;
+            }
+
+            return new PropertyDescriptor.Builder()
+                    .name(propertyDescriptorName)
+                    .displayName(displayName)
+                    .description(description)
+                    .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .dynamic(true)
+                    .build();
+        }
+
+        return null;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/ScanAccumulo.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/ScanAccumulo.java
new file mode 100644
index 0000000..f31c15b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/ScanAccumulo.java
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.accumulo.processors;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.io.Text;
+import org.apache.nifi.accumulo.controllerservices.BaseAccumuloService;
+import org.apache.nifi.accumulo.data.KeySchema;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.StreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.LongAdder;
+
+@EventDriven
+@SupportsBatching
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@Tags({"hadoop", "accumulo", "scan", "record"})
+/**
+ * Purpose and Design: Requires a connector be defined by way of an AccumuloService object. This class
+ * simply extends BaseAccumuloProcessor to scan accumulo based on aspects and expression executed against
+ * a flow file
+ *
+ */
+public class ScanAccumulo extends BaseAccumuloProcessor {
+    static final PropertyDescriptor START_KEY = new PropertyDescriptor.Builder()
+            .displayName("Start key")
+            .name("start-key")
+            .description("Start row key")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(Validator.VALID)
+            .build();
+
+    static final PropertyDescriptor START_KEY_INCLUSIVE = new PropertyDescriptor.Builder()
+            .displayName("Start key Inclusive")
+            .name("start-key-inclusive")
+            .description("Determines if the start key is inclusive ")
+            .required(false)
+            .defaultValue("True")
+            .allowableValues("True", "False")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor END_KEY = new PropertyDescriptor.Builder()
+            .displayName("End key")
+            .name("end-key")
+            .description("End row key for this. If not specified or empty this will be infinite")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(Validator.VALID)
+            .build();
+
+    static final PropertyDescriptor END_KEY_INCLUSIVE = new PropertyDescriptor.Builder()
+            .displayName("End key Inclusive")
+            .name("end-key-inclusive")
+            .description("Determines if the end key is inclusive")
+            .required(false)
+            .defaultValue("False")
+            .allowableValues("True", "False")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor AUTHORIZATIONS = new PropertyDescriptor.Builder()
+            .name("accumulo-authorizations")
+            .displayName("Authorizations")
+            .description("The comma separated list of authorizations to pass to the scanner.")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(Validator.VALID)
+            .build();
+
+    static final PropertyDescriptor COLUMNFAMILY = new PropertyDescriptor.Builder()
+            .name("column-family")
+            .displayName("Start Column Family")
+            .description("The column family that is part of the start key. If no column key is defined only this column family will be selected")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(Validator.VALID)
+            .build();
+
+    static final PropertyDescriptor COLUMNFAMILY_END = new PropertyDescriptor.Builder()
+            .name("column-family-end")
+            .displayName("End Column Family")
+            .description("The column family to select is part of end key")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(Validator.VALID)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is routed to this relationship after it has been successfully retrieved from Accumulo")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is routed to this relationship if it cannot be retrieved fromAccumulo")
+            .build();
+
+    static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("Specifies the Controller Service to use for writing out the records")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .required(true)
+            .build();
+
+    /**
+     * Connector service which provides us a connector if the configuration is correct.
+     */
+    protected BaseAccumuloService accumuloConnectorService;
+
+    /**
+     * Connector that we need to persist while we are operational.
+     */
+    protected AccumuloClient client;
+
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_SUCCESS);
+        rels.add(REL_FAILURE);
+        return rels;
+    }
+
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        Collection<ValidationResult> set = new ArrayList<>();
+        if ((validationContext.getProperty(COLUMNFAMILY).isSet() && !validationContext.getProperty(COLUMNFAMILY_END).isSet())
+        || !validationContext.getProperty(COLUMNFAMILY).isSet() && validationContext.getProperty(COLUMNFAMILY_END).isSet() )
+            set.add(new ValidationResult.Builder().explanation("Column Family and Column family end  must be defined").build());
+        return set;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        accumuloConnectorService = context.getProperty(ACCUMULO_CONNECTOR_SERVICE).asControllerService(BaseAccumuloService.class);
+        this.client = accumuloConnectorService.getClient();
+    }
+
+    private Authorizations stringToAuth(final String authorizations){
+        if (!StringUtils.isBlank(authorizations))
+            return  new Authorizations(authorizations.split(","));
+        else
+            return new Authorizations();
+    }
+
+
+    protected long scanAccumulo(final RecordSetWriterFactory writerFactory, final ProcessContext processContext, final ProcessSession processSession, final Optional<FlowFile> incomingFlowFile){
+
+        final Map<String, String> flowAttributes = incomingFlowFile.isPresent() ?  incomingFlowFile.get().getAttributes() : new HashMap<>();
+        final String table = processContext.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowAttributes).getValue();
+        final String startKey = processContext.getProperty(START_KEY).evaluateAttributeExpressions(flowAttributes).getValue();
+        final boolean startKeyInclusive = processContext.getProperty(START_KEY_INCLUSIVE).asBoolean();
+        final boolean endKeyInclusive = processContext.getProperty(END_KEY_INCLUSIVE).asBoolean();
+        final String endKey = processContext.getProperty(END_KEY).evaluateAttributeExpressions(flowAttributes).getValue();
+        final String authorizations = processContext.getProperty(AUTHORIZATIONS).isSet()
+                ? processContext.getProperty(AUTHORIZATIONS).evaluateAttributeExpressions(flowAttributes).getValue() : "";
+        final int threads = processContext.getProperty(THREADS).asInteger();
+        final String startKeyCf = processContext.getProperty(COLUMNFAMILY).evaluateAttributeExpressions(flowAttributes).getValue();
+        final String endKeyCf = processContext.getProperty(COLUMNFAMILY_END).evaluateAttributeExpressions(flowAttributes).getValue();
+
+        final Authorizations auths = stringToAuth(authorizations);
+
+        final LongAdder recordCounter = new LongAdder();
+
+        final Range lookupRange = buildRange(startKey,startKeyCf,startKeyInclusive,endKey,endKeyCf,endKeyInclusive);
+
+        boolean cloneFlowFile = incomingFlowFile.isPresent();
+
+        try (BatchScanner scanner = client.createBatchScanner(table,auths,threads)) {
+            if (!StringUtils.isBlank(startKeyCf) &&  StringUtils.isBlank(endKeyCf))
+                scanner.fetchColumnFamily(new Text(startKeyCf));
+            scanner.setRanges(Collections.singleton(lookupRange));
+
+            final Iterator<Map.Entry<Key,Value>> kvIter = scanner.iterator();
+            if (!kvIter.hasNext()){
+                /**
+                 * Create a flow file with a record count of zero.
+                 */
+                final Map<String, String> attributes = new HashMap<>();
+                attributes.put("record.count", String.valueOf(0));
+                final FlowFile newFlow = processSession.create();
+                processSession.putAllAttributes(newFlow,attributes);
+                processSession.transfer(newFlow, REL_SUCCESS);
+                return 0;
+            } else{
+
+                while (kvIter.hasNext()) {
+                    FlowFile iterationFlowFile = cloneFlowFile ? processSession.clone(incomingFlowFile.get()) : processSession.create();
+
+                    final int keysPerFlowFile = 1000;
+                    final Map<String, String> attributes = new HashMap<>();
+                    iterationFlowFile = processSession.write(iterationFlowFile, new StreamCallback() {
+                        @Override
+                        public void process(final InputStream in, final OutputStream out) throws IOException {
+
+                            try{
+                                final RecordSchema writeSchema = writerFactory.getSchema(flowAttributes, new KeySchema());
+                                try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out)) {
+
+                                    int i = 0;
+                                    writer.beginRecordSet();
+                                    for (; i < keysPerFlowFile && kvIter.hasNext(); i++) {
+
+                                        Map.Entry<Key, Value> kv = kvIter.next();
+
+                                        final Key key = kv.getKey();
+
+                                        Map<String, Object> data = new HashMap<>();
+                                        data.put("row", key.getRow().toString());
+                                        data.put("columnFamily", key.getColumnFamily().toString());
+                                        data.put("columnQualifier", key.getColumnQualifier().toString());
+                                        data.put("columnVisibility", key.getColumnVisibility().toString());
+                                        data.put("timestamp", key.getTimestamp());
+
+                                        MapRecord record = new MapRecord(new KeySchema(), data);
+                                        writer.write(record);
+
+
+                                    }
+                                    recordCounter.add(i);
+
+                                    final WriteResult writeResult = writer.finishRecordSet();
+                                    attributes.put("record.count", String.valueOf(i));
+                                    attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
+                                    attributes.putAll(writeResult.getAttributes());
+                                }
+                            } catch (SchemaNotFoundException e) {
+                                getLogger().error("Failed to process {}; will route to failure", new Object[] {
+                                        incomingFlowFile.isPresent() ? incomingFlowFile.get() : "No incoming flow file", e});
+
+                                throw new IOException(e);
+                            }
+                        }
+
+                    });
+                    processSession.putAllAttributes(iterationFlowFile,attributes);
+                    processSession.transfer(iterationFlowFile, REL_SUCCESS);
+                }
+            }
+        } catch (final Exception e) {
+            getLogger().error("Failed to process {}; will route to failure", new Object[] {incomingFlowFile.isPresent() ? incomingFlowFile.get() : "No incoming flow file", e});
+            if (cloneFlowFile) {
+                processSession.transfer(incomingFlowFile.get(), REL_FAILURE);
+            }
+            return 0;
+        }
+
+        if (cloneFlowFile) {
+            processSession.remove(incomingFlowFile.get());
+        }
+
+        getLogger().info("Successfully converted {} records for {}", new Object[] {recordCounter.longValue(), incomingFlowFile.toString()});
+
+        return recordCounter.longValue();
+    }
+
+
+    Range buildRange(final String startRow, final String startKeyCf,boolean startKeyInclusive, final String endRow, final String endKeyCf,boolean endKeyInclusive){
+        Key start = StringUtils.isBlank(startRow) ? null : StringUtils.isBlank(startKeyCf) ? new Key(startRow) : new Key(startRow,startKeyCf);
+        Key end = StringUtils.isBlank(endRow) ? null : StringUtils.isBlank(endKeyCf) ? new Key(endRow) : new Key(endRow,endKeyCf);
+        return new Range(start,startKeyInclusive,end,endKeyInclusive);
+    }
+
+    @Override
+    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
+        FlowFile flowFile = processSession.get();
+
+        final RecordSetWriterFactory writerFactory = processContext.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+        long recordCount = scanAccumulo(writerFactory,processContext,processSession,Optional.ofNullable(flowFile));
+
+        processSession.adjustCounter("Records Processed", recordCount, false);
+    }
+
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>(baseProperties);
+        properties.add(START_KEY);
+        properties.add(START_KEY_INCLUSIVE);
+        properties.add(END_KEY);
+        properties.add(COLUMNFAMILY);
+        properties.add(COLUMNFAMILY_END);
+        properties.add(END_KEY_INCLUSIVE);
+        properties.add(RECORD_WRITER);
+        properties.add(AUTHORIZATIONS);
+        return properties;
+    }
+
+}
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000..a1ce072
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.accumulo.processors.PutAccumuloRecord
+org.apache.nifi.accumulo.processors.ScanAccumulo
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/controllerservices/MockAccumuloService.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/controllerservices/MockAccumuloService.java
new file mode 100644
index 0000000..4ad489a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/controllerservices/MockAccumuloService.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.accumulo.controllerservices;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class MockAccumuloService {
+
+
+    public static AccumuloService getService(final TestRunner runner, final String zk, final String instanceName, final String user, final String password) throws InitializationException {
+        final AccumuloService accclient = new AccumuloService();
+        Map<String,String> properties = new HashMap<>();
+        properties.put(AccumuloService.ACCUMULO_PASSWORD.getName(), password);
+        properties.put(AccumuloService.AUTHENTICATION_TYPE.getName(), "PASSWORD");
+        properties.put(AccumuloService.ACCUMULO_USER.getName(), user);
+        properties.put(AccumuloService.ZOOKEEPER_QUORUM.getName(), zk);
+        properties.put(AccumuloService.INSTANCE_NAME.getName(), instanceName);
+        runner.addControllerService("accclient", accclient, properties);
+        runner.enableControllerService(accclient);
+        runner.setProperty("accumulo-connector-service","accclient");
+        return accclient;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/processors/TestPutRecord.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/processors/TestPutRecord.java
new file mode 100644
index 0000000..2d45a48
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/processors/TestPutRecord.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.accumulo.processors;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.minicluster.MiniAccumuloCluster;
+import org.apache.hadoop.io.Text;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.apache.nifi.accumulo.controllerservices.AccumuloService;
+import org.apache.nifi.accumulo.controllerservices.MockAccumuloService;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+
+public class TestPutRecord {
+
+    public static final String DEFAULT_COLUMN_FAMILY = "family1";
+
+    /**
+     * Though deprecated in 2.0 it still functions very well
+     */
+    private static MiniAccumuloCluster accumulo;
+
+    private TestRunner getTestRunner(String table, String columnFamily) {
+        final TestRunner runner = TestRunners.newTestRunner(PutAccumuloRecord.class);
+        runner.enforceReadStreamsClosed(false);
+        runner.setProperty(PutAccumuloRecord.TABLE_NAME, table);
+        runner.setProperty(PutAccumuloRecord.COLUMN_FAMILY, columnFamily);
+        return runner;
+    }
+
+
+
+
+    @BeforeClass
+    public static void setupInstance() throws IOException, InterruptedException, AccumuloSecurityException, AccumuloException, TableExistsException {
+        Path tempDirectory = Files.createTempDirectory("acc"); // JUnit and Guava supply mechanisms for creating temp directories
+        accumulo = new MiniAccumuloCluster(tempDirectory.toFile(), "password");
+        accumulo.start();
+    }
+
+    private Set<Key> generateTestData(TestRunner runner, boolean valueincq, String delim, String cv) throws IOException {
+
+        final MockRecordParser parser = new MockRecordParser();
+        try {
+            runner.addControllerService("parser", parser);
+        } catch (InitializationException e) {
+            throw new IOException(e);
+        }
+        runner.enableControllerService(parser);
+        runner.setProperty(PutAccumuloRecord.RECORD_READER_FACTORY, "parser");
+
+        long ts = System.currentTimeMillis();
+
+        parser.addSchemaField("id", RecordFieldType.STRING);
+        parser.addSchemaField("name", RecordFieldType.STRING);
+        parser.addSchemaField("code", RecordFieldType.STRING);
+        parser.addSchemaField("timestamp", RecordFieldType.LONG);
+
+        Set<Key> expectedKeys = new HashSet<>();
+        ColumnVisibility colViz = new ColumnVisibility();
+        if (null != cv)
+            colViz = new ColumnVisibility(cv);
+        Random random = new Random();
+        for (int x = 0; x < 5; x++) {
+            //final int row = random.nextInt(10000000);
+            final String row = UUID.randomUUID().toString();
+            final String cf = UUID.randomUUID().toString();
+            final String cq = UUID.randomUUID().toString();
+            Text keyCq = new Text("name");
+            if (valueincq){
+                if (null != delim && !delim.isEmpty())
+                    keyCq.append(delim.getBytes(),0,delim.length());
+                keyCq.append(cf.getBytes(),0,cf.length());
+            }
+            expectedKeys.add(new Key(new Text(row), new Text("family1"), keyCq, colViz,ts));
+            keyCq = new Text("code");
+            if (valueincq){
+                if (null != delim && !delim.isEmpty())
+                    keyCq.append(delim.getBytes(),0,delim.length());
+                keyCq.append(cq.getBytes(),0,cq.length());
+            }
+            expectedKeys.add(new Key(new Text(row), new Text("family1"), keyCq, colViz, ts));
+            parser.addRecord(row, cf, cq, ts);
+        }
+
+        return expectedKeys;
+    }
+
+    void verifyKey(String tableName, Set<Key> expectedKeys, Authorizations auths) throws AccumuloSecurityException, AccumuloException, TableNotFoundException {
+        if (null == auths)
+            auths = new Authorizations();
+        try(BatchScanner scanner = accumulo.getConnector("root","password").createBatchScanner(tableName,auths,1)) {
+            List<Range> ranges = new ArrayList<>();
+            ranges.add(new Range());
+            scanner.setRanges(ranges);
+            for (Map.Entry<Key, Value> kv : scanner) {
+                Assert.assertTrue(kv.getKey() + " not in expected keys",expectedKeys.remove(kv.getKey()));
+            }
+        }
+        Assert.assertEquals(0, expectedKeys.size());
+
+    }
+
+    private void basicPutSetup(boolean valueincq) throws Exception {
+        basicPutSetup(valueincq,null,null,null,false);
+    }
+
+    private void basicPutSetup(boolean valueincq, final String delim) throws Exception {
+        basicPutSetup(valueincq,delim,null,null,false);
+    }
+
+    private void basicPutSetup(boolean valueincq,String delim, String auths, Authorizations defaultVis, boolean deletes) throws Exception {
+        String tableName = UUID.randomUUID().toString();
+        tableName=tableName.replace("-","a");
+        if (null != defaultVis)
+        accumulo.getConnector("root","password").securityOperations().changeUserAuthorizations("root",defaultVis);
+        TestRunner runner = getTestRunner(tableName, DEFAULT_COLUMN_FAMILY);
+        runner.setProperty(PutAccumuloRecord.CREATE_TABLE, "True");
+        runner.setProperty(PutAccumuloRecord.ROW_FIELD_NAME, "id");
+        runner.setProperty(PutAccumuloRecord.COLUMN_FAMILY, DEFAULT_COLUMN_FAMILY);
+        runner.setProperty(PutAccumuloRecord.TIMESTAMP_FIELD, "timestamp");
+        if (valueincq) {
+            if (null != delim){
+                runner.setProperty(PutAccumuloRecord.FIELD_DELIMITER, delim);
+            }
+            runner.setProperty(PutAccumuloRecord.RECORD_IN_QUALIFIER, "True");
+        }
+        if (null != defaultVis){
+            runner.setProperty(PutAccumuloRecord.DEFAULT_VISIBILITY, auths);
+        }
+        AccumuloService client = MockAccumuloService.getService(runner,accumulo.getZooKeepers(),accumulo.getInstanceName(),"root","password");
+        Set<Key> expectedKeys = generateTestData(runner,valueincq,delim, auths);
+        runner.enqueue("Test".getBytes("UTF-8")); // This is to coax the processor into reading the data in the reader.l
+        runner.run();
+
+        List<MockFlowFile> results = runner.getFlowFilesForRelationship(PutAccumuloRecord.REL_SUCCESS);
+        Assert.assertTrue("Wrong count", results.size() == 1);
+        verifyKey(tableName, expectedKeys, defaultVis);
+        if (deletes){
+            runner.setProperty(PutAccumuloRecord.DELETE_KEY, "true");
+            runner.enqueue("Test".getBytes("UTF-8")); // This is to coax the processor into reading the data in the reader.l
+            runner.run();
+            runner.getFlowFilesForRelationship(PutAccumuloRecord.REL_SUCCESS);
+            verifyKey(tableName, new HashSet<>(), defaultVis);
+        }
+
+    }
+
+
+
+
+    @Test
+    public void testByteEncodedPut() throws Exception {
+        basicPutSetup(false);
+    }
+
+    @Test
+    public void testByteEncodedPutThenDelete() throws Exception {
+        basicPutSetup(true,null,"A&B",new Authorizations("A","B"),true);
+    }
+
+
+    @Test
+    public void testByteEncodedPutCq() throws Exception {
+        basicPutSetup(true);
+    }
+
+    @Test
+    public void testByteEncodedPutCqDelim() throws Exception {
+        basicPutSetup(true,"\u0000");
+    }
+
+    @Test
+    public void testByteEncodedPutCqWithVis() throws Exception {
+        basicPutSetup(true,null,"A&B",new Authorizations("A","B"),false);
+    }
+}
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/processors/TestScanAccumulo.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/processors/TestScanAccumulo.java
new file mode 100644
index 0000000..3be8c72
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/processors/TestScanAccumulo.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.accumulo.processors;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.MultiTableBatchWriter;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.minicluster.MiniAccumuloCluster;
+import org.apache.hadoop.io.Text;
+import org.apache.nifi.accumulo.controllerservices.AccumuloService;
+import org.apache.nifi.accumulo.controllerservices.MockAccumuloService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+
+public class TestScanAccumulo {
+
+    public static final String DEFAULT_COLUMN_FAMILY = "family1";
+
+    /**
+     * Though deprecated in 2.0 it still functions very well
+     */
+    private static MiniAccumuloCluster accumulo;
+
+    private TestRunner getTestRunner(String table, String columnFamily) {
+        final TestRunner runner = TestRunners.newTestRunner(ScanAccumulo.class);
+        runner.enforceReadStreamsClosed(false);
+        runner.setProperty(ScanAccumulo.TABLE_NAME, table);
+        return runner;
+    }
+
+
+
+
+    @BeforeClass
+    public static void setupInstance() throws IOException, InterruptedException, AccumuloSecurityException, AccumuloException, TableExistsException {
+        Path tempDirectory = Files.createTempDirectory("acc"); // JUnit and Guava supply mechanisms for creating temp directories
+        accumulo = new MiniAccumuloCluster(tempDirectory.toFile(), "password");
+        accumulo.start();
+    }
+
+    private Set<Key> generateTestData(TestRunner runner, String definedRow, String table, boolean valueincq, String delim, String cv)
+            throws IOException, AccumuloSecurityException, AccumuloException, TableNotFoundException {
+
+        BatchWriterConfig writerConfig = new BatchWriterConfig();
+        writerConfig.setMaxWriteThreads(2);
+        writerConfig.setMaxMemory(1024*1024);
+        MultiTableBatchWriter writer  = accumulo.getConnector("root","password").createMultiTableBatchWriter(writerConfig);
+
+        long ts = System.currentTimeMillis();
+
+
+        final MockRecordWriter parser = new MockRecordWriter();
+        try {
+            runner.addControllerService("parser", parser);
+        } catch (InitializationException e) {
+            throw new IOException(e);
+        }
+        runner.enableControllerService(parser);
+        runner.setProperty(ScanAccumulo.RECORD_WRITER,"parser");
+
+
+        Set<Key> expectedKeys = new HashSet<>();
+        ColumnVisibility colViz = new ColumnVisibility();
+        if (null != cv)
+            colViz = new ColumnVisibility(cv);
+        Random random = new Random();
+        for (int x = 0; x < 5; x++) {
+            //final int row = random.nextInt(10000000);
+            final String row = definedRow.isEmpty() ? UUID.randomUUID().toString() : definedRow;
+            final String cf = UUID.randomUUID().toString();
+            final String cq = UUID.randomUUID().toString();
+            Text keyCq = new Text("name");
+            if (valueincq){
+                if (null != delim && !delim.isEmpty())
+                    keyCq.append(delim.getBytes(),0,delim.length());
+                keyCq.append(cf.getBytes(),0,cf.length());
+            }
+            expectedKeys.add(new Key(new Text(row), new Text(DEFAULT_COLUMN_FAMILY), keyCq, colViz,ts));
+            keyCq = new Text("code");
+            if (valueincq){
+                if (null != delim && !delim.isEmpty())
+                    keyCq.append(delim.getBytes(),0,delim.length());
+                keyCq.append(cq.getBytes(),0,cq.length());
+            }
+            expectedKeys.add(new Key(new Text(row), new Text(DEFAULT_COLUMN_FAMILY), keyCq, colViz, ts));
+            Mutation m = new Mutation(row);
+            m.put(new Text(DEFAULT_COLUMN_FAMILY),new Text(keyCq),colViz,ts, new Value());
+            writer.getBatchWriter(table).addMutation(m);
+        }
+        writer.flush();
+        return expectedKeys;
+    }
+
+    void verifyKey(String tableName, Set<Key> expectedKeys, Authorizations auths) throws AccumuloSecurityException, AccumuloException, TableNotFoundException {
+        if (null == auths)
+            auths = new Authorizations();
+        try(BatchScanner scanner = accumulo.getConnector("root","password").createBatchScanner(tableName,auths,1)) {
+            List<Range> ranges = new ArrayList<>();
+            ranges.add(new Range());
+            scanner.setRanges(ranges);
+            for (Map.Entry<Key, Value> kv : scanner) {
+                Assert.assertTrue(kv.getKey() + " not in expected keys",expectedKeys.remove(kv.getKey()));
+            }
+        }
+        Assert.assertEquals(0, expectedKeys.size());
+
+    }
+
+    private void basicPutSetup(boolean sendFlowFile, boolean valueincq) throws Exception {
+        basicPutSetup(sendFlowFile,"","","","",valueincq,null,"",null,false,5);
+    }
+
+    private void basicPutSetup(boolean sendFlowFile, boolean valueincq, final String delim) throws Exception {
+        basicPutSetup(sendFlowFile,"","","","",valueincq,delim,"",null,false,5);
+    }
+
+    private void basicPutSetup(boolean sendFlowFile,String row,String endrow, String cf,String endcf, boolean valueincq,String delim,
+                               String auths, Authorizations defaultVis, boolean deletes, int expected) throws Exception {
+        String tableName = UUID.randomUUID().toString();
+        tableName=tableName.replace("-","a");
+        accumulo.getConnector("root","password").tableOperations().create(tableName);
+        if (null != defaultVis)
+        accumulo.getConnector("root","password").securityOperations().changeUserAuthorizations("root",defaultVis);
+        TestRunner runner = getTestRunner(tableName, DEFAULT_COLUMN_FAMILY);
+        runner.setProperty(ScanAccumulo.START_KEY, row);
+        if (!cf.isEmpty())
+        runner.setProperty(ScanAccumulo.COLUMNFAMILY, cf);
+        if (!endcf.isEmpty())
+        runner.setProperty(ScanAccumulo.COLUMNFAMILY_END, endcf);
+        runner.setProperty(ScanAccumulo.AUTHORIZATIONS, auths);
+        runner.setProperty(ScanAccumulo.END_KEY, endrow);
+
+        AccumuloService client = MockAccumuloService.getService(runner,accumulo.getZooKeepers(),accumulo.getInstanceName(),"root","password");
+        Set<Key> expectedKeys = generateTestData(runner,row,tableName,valueincq,delim, auths);
+        if (sendFlowFile) {
+            runner.enqueue("Test".getBytes("UTF-8")); // This is to coax the processor into reading the data in the reader.l
+        }
+        runner.run();
+
+
+        List<MockFlowFile> results = runner.getFlowFilesForRelationship(ScanAccumulo.REL_SUCCESS);
+        for(MockFlowFile ff : results){
+            String attr = ff.getAttribute("record.count");
+            Assert.assertEquals(expected,Integer.valueOf(attr).intValue());
+        }
+        Assert.assertTrue("Wrong count, received " + results.size(), results.size() == 1);
+    }
+
+
+
+
+    @Test
+    public void testPullDatWithFlowFile() throws Exception {
+        basicPutSetup(true,false);
+    }
+
+    @Test
+    public void testPullDatWithOutFlowFile() throws Exception {
+        basicPutSetup(false,false);
+    }
+
+    @Test
+    public void testSameRowCf() throws Exception {
+        basicPutSetup(false,"2019","2019","family1","family2",false,null,"",null,false,1);
+    }
+
+    @Test
+    public void testSameRowCfValueInCq() throws Exception {
+        basicPutSetup(false,"2019","2019","family1","family2",true,null,"",null,false,5);
+    }
+
+    @Test
+    public void testSameRowCfValueInCqWithAuths() throws Exception {
+        basicPutSetup(false,"2019","2019","family1","family2",true,null,"abcd",new Authorizations("abcd"),false,5);
+    }
+
+    @Test(expected = AssertionError.class)
+    public void testSameRowCfValueInCqErrorCfEnd() throws Exception {
+        basicPutSetup(false,"2019","2019","family1","",true,null,"",null,false,5);
+    }
+
+    @Test(expected = AssertionError.class)
+    public void testSameRowCfValueInCqErrorCf() throws Exception {
+        basicPutSetup(false,"2019","2019","","family2",true,null,"",null,false,5);
+    }
+
+    @Test(expected = AssertionError.class)
+    public void testSameRowCfValueInCqErrorNotLess() throws Exception {
+        basicPutSetup(false,"2019","2019","family1","family1",true,null,"",null,false,5);
+    }
+
+
+
+}
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api-nar/pom.xml b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api-nar/pom.xml
new file mode 100644
index 0000000..c1a86ef
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api-nar/pom.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-accumulo-bundle</artifactId>
+        <version>1.11.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-accumulo-services-api-nar</artifactId>
+    <version>1.11.0-SNAPSHOT</version>
+    <packaging>nar</packaging>
+    <properties>
+        <maven.javadoc.skip>false</maven.javadoc.skip>
+        <source.skip>true</source.skip>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-standard-services-api-nar</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+            <type>nar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-accumulo-services-api</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api/pom.xml b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api/pom.xml
new file mode 100644
index 0000000..6fe8641
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api/pom.xml
@@ -0,0 +1,61 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+	    <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-accumulo-bundle</artifactId>
+        <version>1.11.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-accumulo-services-api</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+	<dependency>
+	    <groupId>org.apache.accumulo</groupId>
+	    <artifactId>accumulo-core</artifactId>
+	    <version>${accumulo.version}</version>
+	</dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>3.9</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-path</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+        </dependency>
+
+    </dependencies>
+</project>
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api/src/main/java/org/apache/nifi/accumulo/controllerservices/BaseAccumuloService.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api/src/main/java/org/apache/nifi/accumulo/controllerservices/BaseAccumuloService.java
new file mode 100644
index 0000000..d92b152
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api/src/main/java/org/apache/nifi/accumulo/controllerservices/BaseAccumuloService.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.accumulo.controllerservices;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.controller.ControllerService;
+
+@Tags({"accumulo", "client", "service"})
+@CapabilityDescription("Provides a basic connector to Accumulo services")
+public interface BaseAccumuloService extends ControllerService {
+
+
+    AccumuloClient getClient();
+
+}
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services-nar/pom.xml b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services-nar/pom.xml
new file mode 100644
index 0000000..c19e8ce
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services-nar/pom.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-accumulo-bundle</artifactId>
+        <version>1.11.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-accumulo-services-nar</artifactId>
+    <version>1.11.0-SNAPSHOT</version>
+    <packaging>nar</packaging>
+    <properties>
+        <maven.javadoc.skip>false</maven.javadoc.skip>
+        <source.skip>true</source.skip>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-accumulo-services</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+           <artifactId>nifi-accumulo-services-api-nar</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+            <type>nar</type>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/pom.xml b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/pom.xml
new file mode 100644
index 0000000..f6985c7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/pom.xml
@@ -0,0 +1,70 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+	    <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-accumulo-bundle</artifactId>
+        <version>1.11.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-accumulo-services</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+	<dependency>
+	    <groupId>org.apache.accumulo</groupId>
+	    <artifactId>accumulo-core</artifactId>
+	    <version>${accumulo.version}</version>
+	</dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>3.9</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-lookup-service-api</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-path</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-accumulo-services-api</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/java/org/apache/nifi/accumulo/controllerservices/AccumuloService.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/java/org/apache/nifi/accumulo/controllerservices/AccumuloService.java
new file mode 100644
index 0000000..91da7fe
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/java/org/apache/nifi/accumulo/controllerservices/AccumuloService.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.accumulo.controllerservices;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Purpose: Controller service that provides us a configured connector. Note that we don't need to close this
+ *
+ * Justification: Centralizes the configuration of the connecting accumulo code. This also will be used
+ * for any kerberos integration.
+ */
+@RequiresInstanceClassLoading
+@Tags({"accumulo", "client", "service"})
+@CapabilityDescription("A controller service for accessing an HBase client.")
+public class AccumuloService extends AbstractControllerService implements BaseAccumuloService {
+
+    private enum AuthenticationType{
+        PASSWORD,
+        NONE
+    }
+
+    protected static final PropertyDescriptor ZOOKEEPER_QUORUM = new PropertyDescriptor.Builder()
+            .name("ZooKeeper Quorum")
+            .description("Comma-separated list of ZooKeeper hosts for Accumulo.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    protected static final PropertyDescriptor INSTANCE_NAME = new PropertyDescriptor.Builder()
+            .name("Instance Name")
+            .description("Instance name of the Accumulo cluster")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+
+    protected static final PropertyDescriptor ACCUMULO_USER = new PropertyDescriptor.Builder()
+            .name("Accumulo User")
+            .description("Connecting user for Accumulo")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    protected static final PropertyDescriptor ACCUMULO_PASSWORD = new PropertyDescriptor.Builder()
+            .name("Accumulo Password")
+            .description("Connecting user's password when using the PASSWORD Authentication type")
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    protected static final PropertyDescriptor AUTHENTICATION_TYPE = new PropertyDescriptor.Builder()
+            .name("Authentication Type")
+            .description("Authentication Type")
+            .allowableValues(AuthenticationType.values())
+            .defaultValue(AuthenticationType.PASSWORD.toString())
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+
+    /**
+     * Reference to the accumulo client.
+     */
+    AccumuloClient client;
+
+    /**
+     * properties
+     */
+    private List<PropertyDescriptor> properties;
+
+    @Override
+    protected void init(ControllerServiceInitializationContext config) throws InitializationException {
+        List<PropertyDescriptor> props = new ArrayList<>();
+        props.add(ZOOKEEPER_QUORUM);
+        props.add(INSTANCE_NAME);
+        props.add(ACCUMULO_USER);
+        props.add(AUTHENTICATION_TYPE);
+        props.add(ACCUMULO_PASSWORD);
+        properties = Collections.unmodifiableList(props);
+    }
+
+    private AuthenticationToken getToken(final AuthenticationType type, final ConfigurationContext context){
+        switch(type){
+            case PASSWORD:
+                return new PasswordToken(context.getProperty(ACCUMULO_PASSWORD).getValue());
+            default:
+                return null;
+        }
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(INSTANCE_NAME);
+        properties.add(ZOOKEEPER_QUORUM);
+        properties.add(ACCUMULO_USER);
+        properties.add(AUTHENTICATION_TYPE);
+        properties.add(ACCUMULO_PASSWORD);
+        return properties;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        final List<ValidationResult> problems = new ArrayList<>();
+
+        if (!validationContext.getProperty(INSTANCE_NAME).isSet()){
+            problems.add(new ValidationResult.Builder().valid(false).subject(INSTANCE_NAME.getName()).explanation("Instance name must be supplied").build());
+        }
+
+        if (!validationContext.getProperty(ZOOKEEPER_QUORUM).isSet()){
+            problems.add(new ValidationResult.Builder().valid(false).subject(ZOOKEEPER_QUORUM.getName()).explanation("Zookeepers must be supplied").build());
+        }
+
+        if (!validationContext.getProperty(ACCUMULO_USER).isSet()){
+            problems.add(new ValidationResult.Builder().valid(false).subject(ACCUMULO_USER.getName()).explanation("Accumulo user must be supplied").build());
+        }
+
+        final AuthenticationType type = validationContext.getProperty(
+                AUTHENTICATION_TYPE).isSet() ? AuthenticationType.valueOf( validationContext.getProperty(AUTHENTICATION_TYPE).getValue() ) : AuthenticationType.PASSWORD;
+
+        switch(type){
+            case PASSWORD:
+                if (!validationContext.getProperty(ACCUMULO_PASSWORD).isSet()){
+                    problems.add(
+                            new ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName()).explanation("Password must be supplied for the Password Authentication type").build());
+                }
+                break;
+            default:
+                problems.add(new ValidationResult.Builder().valid(false).subject(ACCUMULO_PASSWORD.getName()).explanation("Non supported Authentication type").build());
+        }
+
+        return problems;
+    }
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, InterruptedException {
+        if (!context.getProperty(INSTANCE_NAME).isSet() || !context.getProperty(ZOOKEEPER_QUORUM).isSet() || !context.getProperty(ACCUMULO_USER).isSet()){
+            throw new InitializationException("Instance name and Zookeeper Quorum must be specified");
+        }
+
+
+
+        final String instanceName = context.getProperty(INSTANCE_NAME).evaluateAttributeExpressions().getValue();
+        final String zookeepers = context.getProperty(ZOOKEEPER_QUORUM).evaluateAttributeExpressions().getValue();
+        final String accumuloUser = context.getProperty(ACCUMULO_USER).evaluateAttributeExpressions().getValue();
+
+        final AuthenticationType type = AuthenticationType.valueOf( context.getProperty(AUTHENTICATION_TYPE).getValue() );
+
+
+
+        final AuthenticationToken token = getToken(type,context);
+
+        this.client = Accumulo.newClient().to(instanceName,zookeepers).as(accumuloUser,token).build();
+
+        if (null == token){
+            throw new InitializationException("Feature not implemented");
+        }
+
+    }
+
+    @Override
+    public AccumuloClient getClient(){
+        return client;
+    }
+
+    @OnDisabled
+    public void shutdown() {
+        client.close();
+    }
+
+}
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
new file mode 100644
index 0000000..0e27be4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.accumulo.controllerservices.AccumuloService
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/pom.xml b/nifi-nar-bundles/nifi-accumulo-bundle/pom.xml
new file mode 100644
index 0000000..a43c010
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/pom.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+    license agreements. See the NOTICE file distributed with this work for additional
+    information regarding copyright ownership. The ASF licenses this file to
+    You under the Apache License, Version 2.0 (the "License"); you may not use
+    this file except in compliance with the License. You may obtain a copy of
+    the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+    by applicable law or agreed to in writing, software distributed under the
+    License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+    OF ANY KIND, either express or implied. See the License for the specific
+    language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-nar-bundles</artifactId>
+        <version>1.11.0-SNAPSHOT</version>
+    </parent>
+
+    <properties>
+        <accumulo.version>2.0.0</accumulo.version>
+    </properties>
+
+    <artifactId>nifi-accumulo-bundle</artifactId>
+    <version>1.11.0-SNAPSHOT</version>
+    <packaging>pom</packaging>
+
+    <modules>
+        <module>nifi-accumulo-services-api</module>
+        <module>nifi-accumulo-services-api-nar</module>
+        <module>nifi-accumulo-services</module>
+        <module>nifi-accumulo-services-nar</module>
+        <module>nifi-accumulo-processors</module>
+        <module>nifi-accumulo-nar</module>
+    </modules>
+
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-accumulo-processors</artifactId>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-services</artifactId>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+</project>
diff --git a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/DataDogReportingTask.java b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/DataDogReportingTask.java
index 1c0704e..3747571 100644
--- a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/DataDogReportingTask.java
+++ b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/DataDogReportingTask.java
@@ -139,7 +139,7 @@ public class DataDogReportingTask extends AbstractReportingTask {
         try {
             updateDataDogTransport(context);
         } catch (IOException e) {
-            e.printStackTrace();
+            logger.warn("Unable to update data dog transport", e);
         }
         updateAllMetricGroups(status);
         ddMetricRegistryBuilder.getDatadogReporter().report();
diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml
index d23aa21..522cdb4 100755
--- a/nifi-nar-bundles/pom.xml
+++ b/nifi-nar-bundles/pom.xml
@@ -99,7 +99,8 @@
         <module>nifi-prometheus-bundle</module>
         <module>nifi-easyrules-bundle</module>
         <module>nifi-sql-reporting-bundle</module>
-        <module>nifi-rules-action-handler-bundle</module>
+	<module>nifi-rules-action-handler-bundle</module>
+	<module>nifi-accumulo-bundle</module>
     </modules>
 
     <build>