You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2020/01/19 16:45:41 UTC
[nifi] 01/02: NIFI-818: This closes #3926. Initial implementation
of NiFi-Accumulo ( https://github.com/phrocker/nifi-accumulo ) with
connectors to Apache Accumulo 2.x
This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit de2a286a7a01725ee11d839ead7811a4135f8b41
Author: Marc Parisi <ph...@apache.org>
AuthorDate: Wed Dec 11 11:58:56 2019 -0500
NIFI-818: This closes #3926. Initial implementation of NiFi-Accumulo ( https://github.com/phrocker/nifi-accumulo ) with connectors to Apache Accumulo 2.x
Signed-off-by: Joe Witt <jo...@apache.org>
---
nifi-assembly/pom.xml | 27 +
nifi-nar-bundles/nifi-accumulo-bundle/README.md | 23 +
.../nifi-accumulo-bundle/nifi-accumulo-nar/pom.xml | 52 ++
.../nifi-accumulo-processors/pom.xml | 101 ++++
.../accumulo/data/AccumuloRecordConfiguration.java | 159 +++++
.../org/apache/nifi/accumulo/data/KeySchema.java | 115 ++++
.../accumulo/processors/BaseAccumuloProcessor.java | 76 +++
.../accumulo/processors/PutAccumuloRecord.java | 657 +++++++++++++++++++++
.../nifi/accumulo/processors/ScanAccumulo.java | 349 +++++++++++
.../services/org.apache.nifi.processor.Processor | 16 +
.../controllerservices/MockAccumuloService.java | 42 ++
.../nifi/accumulo/processors/TestPutRecord.java | 218 +++++++
.../nifi/accumulo/processors/TestScanAccumulo.java | 236 ++++++++
.../nifi-accumulo-services-api-nar/pom.xml | 52 ++
.../nifi-accumulo-services-api/pom.xml | 61 ++
.../controllerservices/BaseAccumuloService.java | 32 +
.../nifi-accumulo-services-nar/pom.xml | 53 ++
.../nifi-accumulo-services/pom.xml | 70 +++
.../controllerservices/AccumuloService.java | 210 +++++++
.../org.apache.nifi.controller.ControllerService | 15 +
nifi-nar-bundles/nifi-accumulo-bundle/pom.xml | 50 ++
.../reporting/datadog/DataDogReportingTask.java | 2 +-
nifi-nar-bundles/pom.xml | 3 +-
23 files changed, 2617 insertions(+), 2 deletions(-)
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index b69aa64..7c9c7d8 100755
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -897,6 +897,33 @@ language governing permissions and limitations under the License. -->
<type>nar</type>
</dependency>
</dependencies>
+ </profile>
+ <profile>
+ <id>include-accumulo</id>
+ <!-- This profile handles the inclusion of nifi-accumulo artifacts. -->
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-accumulo-nar</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ <type>nar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-accumulo-services-api-nar</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ <type>nar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-accumulo-services-nar</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ <type>nar</type>
+ </dependency>
+ </dependencies>
</profile>
<profile>
<id>rpm</id>
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/README.md b/nifi-nar-bundles/nifi-accumulo-bundle/README.md
new file mode 100644
index 0000000..e431586
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/README.md
@@ -0,0 +1,23 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+# nifi-accumulo
+
+This is a basic NiFi->Accumulo integration. Running `mvn install` will create your NAR, which can be added
+to Apache NiFi. This is intended to be created with Apache Accumulo 2.x.
+
+The resulting NAR will be named 'nifi-accumulo-nar'
+
+
+Note that some of this code was modeled after the HBase work.
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-nar/pom.xml b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-nar/pom.xml
new file mode 100644
index 0000000..b128110
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-nar/pom.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-accumulo-bundle</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>nifi-accumulo-nar</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ <packaging>nar</packaging>
+ <properties>
+ <maven.javadoc.skip>false</maven.javadoc.skip>
+ <source.skip>true</source.skip>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-api</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-accumulo-processors</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-accumulo-services-api-nar</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ <type>nar</type>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/pom.xml b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/pom.xml
new file mode 100644
index 0000000..23e9902
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/pom.xml
@@ -0,0 +1,101 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-accumulo-bundle</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>nifi-accumulo-processors</artifactId>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-accumulo-services-api</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.accumulo</groupId>
+ <artifactId>accumulo-core</artifactId>
+ <version>${accumulo.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>3.9</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-api</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mock</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mock-record-utils</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-utils</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-serialization-service-api</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-path</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.accumulo</groupId>
+ <artifactId>accumulo-minicluster</artifactId>
+ <version>${accumulo.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-accumulo-services</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/data/AccumuloRecordConfiguration.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/data/AccumuloRecordConfiguration.java
new file mode 100644
index 0000000..164f9d5
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/data/AccumuloRecordConfiguration.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.accumulo.data;
+
+/**
+ * Encapsulates configuring the session with some required parameters.
+ *
+ * Justification: Generally not a fan of this fluent API to configure other objects, but there is a lot encapsulated here
+ * so it helps minimize what we pass between the current set of classes and the upcoming features.
+ */
+public class AccumuloRecordConfiguration {
+ private String tableName;
+ private String rowFieldName;
+ private String columnFamily;
+ private String columnFamilyField;
+ private String timestampField;
+ private String fieldDelimiter;
+ private boolean encodeFieldDelimiter;
+ private boolean qualifierInKey;
+ private boolean deleteKeys;
+
+
+ protected AccumuloRecordConfiguration(final String tableName, final String rowFieldName, final String columnFamily,
+ final String columnFamilyField,
+ final String timestampField, final String fieldDelimiter,
+ final boolean encodeFieldDelimiter,
+ final boolean qualifierInKey, final boolean deleteKeys) {
+ this.tableName = tableName;
+ this.rowFieldName = rowFieldName;
+ this.columnFamily = columnFamily;
+ this.columnFamilyField = columnFamilyField;
+ this.timestampField = timestampField;
+ this.fieldDelimiter = fieldDelimiter;
+ this.encodeFieldDelimiter = encodeFieldDelimiter;
+ this.qualifierInKey = qualifierInKey;
+ this.deleteKeys = deleteKeys;
+ }
+
+ public String getTableName(){
+ return tableName;
+ }
+
+ public String getColumnFamily() {
+ return columnFamily;
+ }
+
+ public String getColumnFamilyField() {
+ return columnFamilyField;
+ }
+
+ public boolean getEncodeDelimiter(){
+ return encodeFieldDelimiter;
+ }
+
+ public String getTimestampField(){
+
+ return timestampField;
+ }
+
+ public String getFieldDelimiter(){
+ return fieldDelimiter;
+ }
+
+ public boolean getQualifierInKey(){
+ return qualifierInKey;
+ }
+
+ public boolean isDeleteKeys(){
+ return deleteKeys;
+ }
+
+
+ public String getRowField(){
+ return rowFieldName;
+ }
+
+ public static class Builder{
+
+ public static final Builder newBuilder(){
+ return new Builder();
+ }
+
+ public Builder setRowField(final String rowFieldName){
+ this.rowFieldName = rowFieldName;
+ return this;
+ }
+
+ public Builder setTableName(final String tableName){
+ this.tableName = tableName;
+ return this;
+ }
+
+ public Builder setEncodeFieldDelimiter(final boolean encodeFieldDelimiter){
+ this.encodeFieldDelimiter = encodeFieldDelimiter;
+ return this;
+ }
+
+
+ public Builder setColumnFamily(final String columnFamily){
+ this.columnFamily = columnFamily;
+ return this;
+ }
+
+ public Builder setColumnFamilyField(final String columnFamilyField){
+ this.columnFamilyField = columnFamilyField;
+ return this;
+ }
+
+ public Builder setTimestampField(final String timestampField){
+ this.timestampField = timestampField;
+ return this;
+ }
+
+ public Builder setQualifierInKey(final boolean qualifierInKey){
+ this.qualifierInKey = qualifierInKey;
+ return this;
+ }
+
+ public Builder setFieldDelimiter(final String fieldDelimiter){
+ this.fieldDelimiter = fieldDelimiter;
+ return this;
+ }
+
+ public Builder setDelete(final boolean deleteKeys){
+ this.deleteKeys = deleteKeys;
+ return this;
+ }
+
+ public AccumuloRecordConfiguration build(){
+ return new AccumuloRecordConfiguration(tableName,rowFieldName,columnFamily,columnFamilyField,timestampField,fieldDelimiter,encodeFieldDelimiter,qualifierInKey,deleteKeys);
+ }
+
+
+ private String tableName;
+ private String rowFieldName;
+ private String columnFamily;
+ private String columnFamilyField;
+ private String fieldDelimiter;
+ private boolean qualifierInKey=false;
+ private boolean encodeFieldDelimiter=false;
+ private String timestampField;
+ private boolean deleteKeys=false;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/data/KeySchema.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/data/KeySchema.java
new file mode 100644
index 0000000..7ac74b8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/data/KeySchema.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.accumulo.data;
+
+
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+public class KeySchema implements RecordSchema {
+ private static final List<RecordField> KEY_FIELDS = new ArrayList<>();
+
+ private static final List<DataType> DATA_TYPES = new ArrayList<>();
+
+ private static final List<String> FIELD_NAMES = new ArrayList<>();
+
+ static {
+ KEY_FIELDS.add(new RecordField("row", RecordFieldType.STRING.getDataType(),false));
+ KEY_FIELDS.add(new RecordField("columnFamily",RecordFieldType.STRING.getDataType(),true));
+ KEY_FIELDS.add(new RecordField("columnQualifier",RecordFieldType.STRING.getDataType(),true));
+ KEY_FIELDS.add(new RecordField("columnVisibility",RecordFieldType.STRING.getDataType(),true));
+ KEY_FIELDS.add(new RecordField("timestamp",RecordFieldType.LONG.getDataType(),true));
+ DATA_TYPES.add(RecordFieldType.STRING.getDataType());
+ DATA_TYPES.add(RecordFieldType.LONG.getDataType());
+ FIELD_NAMES.addAll(KEY_FIELDS.stream().map( x-> x.getFieldName()).collect(Collectors.toList()));
+ }
+ @Override
+ public List<RecordField> getFields() {
+ return KEY_FIELDS;
+ }
+
+ @Override
+ public int getFieldCount() {
+ return KEY_FIELDS.size();
+ }
+
+ @Override
+ public RecordField getField(int i) {
+ return KEY_FIELDS.get(i);
+ }
+
+ @Override
+ public List<DataType> getDataTypes() {
+ return DATA_TYPES;
+ }
+
+ @Override
+ public List<String> getFieldNames() {
+ return FIELD_NAMES;
+ }
+
+ @Override
+ public Optional<DataType> getDataType(String s) {
+ if (s.equalsIgnoreCase("timestamp")){
+ return Optional.of( RecordFieldType.LONG.getDataType() );
+ } else{
+ if (FIELD_NAMES.stream().filter(x -> s.equalsIgnoreCase(s)).count() > 0){
+ return Optional.of(RecordFieldType.STRING.getDataType());
+ }
+ }
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<String> getSchemaText() {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<String> getSchemaFormat() {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<RecordField> getField(final String s) {
+ return KEY_FIELDS.stream().filter(x -> x.getFieldName().equalsIgnoreCase(s)).findFirst();
+ }
+
+ @Override
+ public SchemaIdentifier getIdentifier() {
+ return SchemaIdentifier.builder().name("AccumuloKeySchema").version(1).branch("nifi-accumulo").build();
+ }
+
+ @Override
+ public Optional<String> getSchemaName() {
+ return Optional.of("AccumuloKeySchema");
+ }
+
+ @Override
+ public Optional<String> getSchemaNamespace() {
+ return Optional.of("nifi-accumulo");
+ }
+}
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/BaseAccumuloProcessor.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/BaseAccumuloProcessor.java
new file mode 100644
index 0000000..d0888ac
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/BaseAccumuloProcessor.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.accumulo.processors;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.nifi.accumulo.controllerservices.BaseAccumuloService;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.util.StandardValidators;
+
+/**
+ * Base Accumulo class that provides connector services, table name, and thread
+ * properties
+ */
+public abstract class BaseAccumuloProcessor extends AbstractProcessor {
+
+ protected static final PropertyDescriptor ACCUMULO_CONNECTOR_SERVICE = new PropertyDescriptor.Builder()
+ .name("accumulo-connector-service")
+ .displayName("Accumulo Connector Service")
+ .description("Specifies the Controller Service to use for accessing Accumulo.")
+ .required(true)
+ .identifiesControllerService(BaseAccumuloService.class)
+ .build();
+
+
+ protected static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
+ .name("Table Name")
+ .description("The name of the Accumulo Table into which data will be placed")
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ protected static final PropertyDescriptor CREATE_TABLE = new PropertyDescriptor.Builder()
+ .name("Create Table")
+ .description("Creates a table if it does not exist. This property will only be used when EL is not present in 'Table Name'")
+ .required(true)
+ .defaultValue("False")
+ .allowableValues("True", "False")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
+
+ protected static final PropertyDescriptor THREADS = new PropertyDescriptor.Builder()
+ .name("Threads")
+ .description("Number of threads used for reading and writing")
+ .required(false)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("10")
+ .build();
+
+ /**
+ * Implementations can decide to include all base properties or individually include them. List is immutable
+ * so that implementations must constructor their own lists knowingly
+ */
+
+ protected static final ImmutableList<PropertyDescriptor> baseProperties = ImmutableList.of(ACCUMULO_CONNECTOR_SERVICE,TABLE_NAME,CREATE_TABLE,THREADS);
+
+
+}
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/PutAccumuloRecord.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/PutAccumuloRecord.java
new file mode 100644
index 0000000..dbcc3e7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/PutAccumuloRecord.java
@@ -0,0 +1,657 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.accumulo.processors;
+
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.MultiTableBatchWriter;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.hadoop.io.Text;
+import org.apache.nifi.annotation.behavior.DynamicProperties;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.accumulo.controllerservices.BaseAccumuloService;
+import org.apache.nifi.accumulo.data.AccumuloRecordConfiguration;
+
+import javax.xml.bind.DatatypeConverter;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@EventDriven
+@SupportsBatching
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"hadoop", "accumulo", "put", "record"})
+@DynamicProperties({
+ @DynamicProperty(name = "visibility.<COLUMN FAMILY>", description = "Visibility label for everything under that column family " +
+ "when a specific label for a particular column qualifier is not available.", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+ value = "visibility label for <COLUMN FAMILY>"
+ ),
+ @DynamicProperty(name = "visibility.<COLUMN FAMILY>.<COLUMN QUALIFIER>", description = "Visibility label for the specified column qualifier " +
+ "qualified by a configured column family.", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+ value = "visibility label for <COLUMN FAMILY>:<COLUMN QUALIFIER>."
+ )
+})
+/**
+ * Purpose and Design: Requires a connector be defined by way of an AccumuloService object. This class
+ * simply extens BaseAccumuloProcessor to extract records from a flow file. The location of a record field value can be
+ * placed into the value or part of the column qualifier ( this can/may change )
+ *
+ * Supports deletes. If the delete flag is used we'll delete keys found within that flow file.
+ */
+public class PutAccumuloRecord extends BaseAccumuloProcessor {
+
+ protected static final PropertyDescriptor MEMORY_SIZE = new PropertyDescriptor.Builder()
+ .name("Memory Size")
+ .description("The maximum memory size Accumulo at any one time from the record set.")
+ .required(true)
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .defaultValue("10 MB")
+ .build();
+
+ protected static final PropertyDescriptor COLUMN_FAMILY = new PropertyDescriptor.Builder()
+ .name("Column Family")
+ .description("The Column Family to use when inserting data into Accumulo")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(Validator.VALID)
+ .build();
+
+ protected static final PropertyDescriptor COLUMN_FAMILY_FIELD = new PropertyDescriptor.Builder()
+ .name("Column Family Field")
+ .description("Field name used as the column family if one is not specified above.")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(Validator.VALID)
+ .build();
+
+ protected static final PropertyDescriptor DELETE_KEY = new PropertyDescriptor.Builder()
+ .name("delete-key")
+ .displayName("Delete Key")
+ .description("Deletes the key")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ protected static final PropertyDescriptor RECORD_IN_QUALIFIER = new PropertyDescriptor.Builder()
+ .name("record-value-in-qualifier")
+ .displayName("Record Value In Qualifier")
+ .description("Places the record value into the column qualifier instead of the value.")
+ .required(false)
+ .defaultValue("False")
+ .allowableValues("True", "False")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
+ protected static final PropertyDescriptor FLUSH_ON_FLOWFILE = new PropertyDescriptor.Builder()
+ .name("flush-on-flow-file")
+ .displayName("Flush Every FlowFile")
+ .description("Flushes the table writer on every flow file.")
+ .required(true)
+ .defaultValue("True")
+ .allowableValues("True", "False")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
+ protected static final PropertyDescriptor FIELD_DELIMITER_AS_HEX = new PropertyDescriptor.Builder()
+ .name("field-delimiter-as-hex")
+ .displayName("Hex Encode Field Delimiter")
+ .description("Allows you to hex encode the delimiter as a character. So 0x00 places a null character between the record name and value.")
+ .required(false)
+ .defaultValue("False")
+ .allowableValues("True", "False")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
+ protected static final PropertyDescriptor FIELD_DELIMITER = new PropertyDescriptor.Builder()
+ .name("field-delimiter")
+ .displayName("Field Delimiter")
+ .description("Delimiter between the record value and name. ")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+
+ protected static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
+ .name("record-reader")
+ .displayName("Record Reader")
+ .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema")
+ .identifiesControllerService(RecordReaderFactory.class)
+ .required(true)
+ .build();
+
+ protected static final PropertyDescriptor ROW_FIELD_NAME = new PropertyDescriptor.Builder()
+ .name("Row Identifier Field Name")
+ .description("Specifies the name of a record field whose value should be used as the row id for the given record." +
+ " If EL defines a value that is not a field name that will be used as the row identifier.")
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+
+ protected static final PropertyDescriptor TIMESTAMP_FIELD = new PropertyDescriptor.Builder()
+ .name("timestamp-field")
+ .displayName("Timestamp Field")
+ .description("Specifies the name of a record field whose value should be used as the timestamp. If empty a timestamp will be recorded as the time of insertion")
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+
+ protected static final PropertyDescriptor VISIBILITY_PATH = new PropertyDescriptor.Builder()
+ .name("visibility-path")
+ .displayName("Visibility String Record Path Root")
+ .description("A record path that points to part of the record which contains a path to a mapping of visibility strings to record paths")
+ .required(false)
+ .addValidator(Validator.VALID)
+ .build();
+
+ protected static final PropertyDescriptor DEFAULT_VISIBILITY = new PropertyDescriptor.Builder()
+ .name("default-visibility")
+ .displayName("Default Visibility")
+ .description("Default visibility when VISIBILITY_PATH is not defined. ")
+ .required(false)
+ .addValidator(Validator.VALID)
+ .build();
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("A FlowFile is routed to this relationship after it has been successfully stored in Accumulo")
+ .build();
+ public static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("A FlowFile is routed to this relationship if it cannot be sent to Accumulo")
+ .build();
+
+
+ /**
+ * Connector service which provides us a connector if the configuration is correct.
+ */
+ protected BaseAccumuloService accumuloConnectorService;
+
+ /**
+ * Connector that we need to persist while we are operational.
+ */
+ protected AccumuloClient client;
+
+ /**
+ * Table writer that will close when we shutdown or upon error.
+ */
+ private MultiTableBatchWriter tableWriter = null;
+
+ /**
+ * Record path cache
+ */
+ protected RecordPathCache recordPathCache;
+
+
+ /**
+ * Flushes the tableWriter on every flow file if true.
+ */
+ protected boolean flushOnEveryFlow;
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ final Set<Relationship> rels = new HashSet<>();
+ rels.add(REL_SUCCESS);
+ rels.add(REL_FAILURE);
+ return rels;
+ }
+
+ @Override
+ protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+ Collection<ValidationResult> set = Collections.emptySet();
+ if (!validationContext.getProperty(COLUMN_FAMILY).isSet() && !validationContext.getProperty(COLUMN_FAMILY_FIELD).isSet())
+ set.add(new ValidationResult.Builder().explanation("Column Family OR Column family field name must be defined").build());
+ else if (validationContext.getProperty(COLUMN_FAMILY).isSet() && validationContext.getProperty(COLUMN_FAMILY_FIELD).isSet())
+ set.add(new ValidationResult.Builder().explanation("Column Family OR Column family field name must be defined, but not both").build());
+ return set;
+ }
+
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) {
+ accumuloConnectorService = context.getProperty(ACCUMULO_CONNECTOR_SERVICE).asControllerService(BaseAccumuloService.class);
+ final Double maxBytes = context.getProperty(MEMORY_SIZE).asDataSize(DataUnit.B);
+ this.client = accumuloConnectorService.getClient();
+ BatchWriterConfig writerConfig = new BatchWriterConfig();
+ writerConfig.setMaxWriteThreads(context.getProperty(THREADS).asInteger());
+ writerConfig.setMaxMemory(maxBytes.longValue());
+ tableWriter = client.createMultiTableBatchWriter(writerConfig);
+ flushOnEveryFlow = context.getProperty(FLUSH_ON_FLOWFILE).asBoolean();
+ if (!flushOnEveryFlow){
+ writerConfig.setMaxLatency(60, TimeUnit.SECONDS);
+ }
+
+ if (context.getProperty(CREATE_TABLE).asBoolean() && !context.getProperty(TABLE_NAME).isExpressionLanguagePresent()) {
+ final Map<String, String> flowAttributes = new HashMap<>();
+ final String table = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowAttributes).getValue();
+ final TableOperations tableOps = this.client.tableOperations();
+ if (!tableOps.exists(table)) {
+ getLogger().info("Creating " + table + " table.");
+ try {
+ tableOps.create(table);
+ } catch (TableExistsException te) {
+ // can safely ignore
+ } catch (AccumuloSecurityException | AccumuloException e) {
+ getLogger().info("Accumulo or Security error creating. Continuing... " + table + ". ", e);
+ }
+ }
+ }
+ }
+
+
+ @OnUnscheduled
+ @OnDisabled
+ public synchronized void shutdown(){
+ /**
+ * Close the writer when we are shut down.
+ */
+ if (null != tableWriter){
+ try {
+ tableWriter.close();
+ } catch (MutationsRejectedException e) {
+ getLogger().error("Mutations were rejected",e);
+ }
+ tableWriter = null;
+ }
+ }
+
+ @Override
+ public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> properties = new ArrayList<>(baseProperties);
+ properties.add(RECORD_READER_FACTORY);
+ properties.add(ROW_FIELD_NAME);
+ properties.add(ROW_FIELD_NAME);
+ properties.add(COLUMN_FAMILY);
+ properties.add(COLUMN_FAMILY_FIELD);
+ properties.add(DELETE_KEY);
+ properties.add(FLUSH_ON_FLOWFILE);
+ properties.add(FIELD_DELIMITER);
+ properties.add(FIELD_DELIMITER_AS_HEX);
+ properties.add(MEMORY_SIZE);
+ properties.add(RECORD_IN_QUALIFIER);
+ properties.add(TIMESTAMP_FIELD);
+ properties.add(VISIBILITY_PATH);
+ properties.add(DEFAULT_VISIBILITY);
+ return properties;
+ }
+
+
+ @Override
+ public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
+ final FlowFile flowFile = processSession.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final RecordReaderFactory recordParserFactory = processContext.getProperty(RECORD_READER_FACTORY)
+ .asControllerService(RecordReaderFactory.class);
+
+ final String recordPathText = processContext.getProperty(VISIBILITY_PATH).getValue();
+ final String defaultVisibility = processContext.getProperty(DEFAULT_VISIBILITY).isSet() ? processContext.getProperty(DEFAULT_VISIBILITY).getValue() : null;
+
+ final String tableName = processContext.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+
+ // create the table if EL is present, create table is true and the table does not exist.
+ if (processContext.getProperty(TABLE_NAME).isExpressionLanguagePresent() && processContext.getProperty(CREATE_TABLE).asBoolean()) {
+ final TableOperations tableOps = this.client.tableOperations();
+ if (!tableOps.exists(tableName)) {
+ getLogger().info("Creating " + tableName + " table.");
+ try {
+ tableOps.create(tableName);
+ } catch (TableExistsException te) {
+ // can safely ignore, though we shouldn't arrive here due to table.exists called, but it's possible
+ // that with multiple threads two could attempt table creation concurrently. We don't want that
+ // to be a failure.
+ } catch (AccumuloSecurityException | AccumuloException e) {
+ throw new ProcessException("Accumulo or Security error creating. Continuing... " + tableName + ". ",e);
+ }
+ }
+ }
+
+ AccumuloRecordConfiguration builder = AccumuloRecordConfiguration.Builder.newBuilder()
+ .setTableName(tableName)
+ .setColumnFamily(processContext.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue())
+ .setColumnFamilyField(processContext.getProperty(COLUMN_FAMILY_FIELD).evaluateAttributeExpressions(flowFile).getValue())
+ .setRowField(processContext.getProperty(ROW_FIELD_NAME).evaluateAttributeExpressions(flowFile).getValue())
+ .setEncodeFieldDelimiter(processContext.getProperty(FIELD_DELIMITER_AS_HEX).asBoolean())
+ .setFieldDelimiter(processContext.getProperty(FIELD_DELIMITER).isSet() ? processContext.getProperty(FIELD_DELIMITER).evaluateAttributeExpressions(flowFile).getValue() : "")
+ .setQualifierInKey(processContext.getProperty(RECORD_IN_QUALIFIER).isSet() ? processContext.getProperty(RECORD_IN_QUALIFIER).asBoolean() : false)
+ .setDelete(processContext.getProperty(DELETE_KEY).isSet() ? processContext.getProperty(DELETE_KEY).evaluateAttributeExpressions(flowFile).asBoolean() : false)
+ .setTimestampField(processContext.getProperty(TIMESTAMP_FIELD).evaluateAttributeExpressions(flowFile).getValue()).build();
+
+
+ RecordPath recordPath = null;
+ if (recordPathCache != null && !StringUtils.isEmpty(recordPathText)) {
+ recordPath = recordPathCache.getCompiled(recordPathText);
+ }
+
+ boolean failed = false;
+ Mutation prevMutation=null;
+ try (final InputStream in = processSession.read(flowFile);
+ final RecordReader reader = recordParserFactory.createRecordReader(flowFile, in, getLogger())) {
+ Record record;
+ /**
+ * HBase supports a restart point. This may be something that we can/should add if needed.
+ */
+ while ((record = reader.nextRecord()) != null) {
+ prevMutation = createMutation(prevMutation, processContext, record, reader.getSchema(), recordPath, flowFile,defaultVisibility, builder);
+
+ }
+ addMutation(builder.getTableName(),prevMutation);
+ } catch (Exception ex) {
+ getLogger().error("Failed to put records to Accumulo.", ex);
+ failed = true;
+ }
+
+ if (flushOnEveryFlow){
+ try {
+ tableWriter.flush();
+ } catch (MutationsRejectedException e) {
+ throw new ProcessException(e);
+ }
+ }
+
+
+ if (!failed) {
+ processSession.transfer(flowFile, REL_SUCCESS);
+ } else {
+ processSession.transfer(processSession.penalize(flowFile), REL_FAILURE);
+ }
+
+ processSession.commit();
+ }
+
+ /**
+ * Adapted from HBASEUtils. Their approach seemed ideal for what our intent is here.
+ * @param columnFamily column family from which to extract the visibility or to execute an expression against
+ * @param columnQualifier column qualifier from which to extract the visibility or to execute an expression against
+ * @param flowFile flow file being written
+ * @param context process context
+ * @return
+ */
+ public static String produceVisibility(String columnFamily, String columnQualifier, FlowFile flowFile, ProcessContext context) {
+ if (org.apache.commons.lang3.StringUtils.isNotEmpty(columnFamily)) {
+ return null;
+ }
+ String lookupKey = String.format("visibility.%s%s%s", columnFamily, !org.apache.commons.lang3.StringUtils.isNotEmpty(columnQualifier) ? "." : "", columnQualifier);
+ String fromAttribute = flowFile.getAttribute(lookupKey);
+
+ if (fromAttribute == null && !org.apache.commons.lang3.StringUtils.isBlank(columnQualifier)) {
+ String lookupKeyFam = String.format("visibility.%s", columnFamily);
+ fromAttribute = flowFile.getAttribute(lookupKeyFam);
+ }
+
+ if (fromAttribute != null) {
+ return fromAttribute;
+ } else {
+ PropertyValue descriptor = context.getProperty(lookupKey);
+ if (descriptor == null || !descriptor.isSet()) {
+ descriptor = context.getProperty(String.format("visibility.%s", columnFamily));
+ }
+
+ String retVal = descriptor != null ? descriptor.evaluateAttributeExpressions(flowFile).getValue() : null;
+
+ return retVal;
+ }
+ }
+
+ private void addMutation(final String tableName, final Mutation m) throws AccumuloSecurityException, AccumuloException, TableNotFoundException {
+ tableWriter.getBatchWriter(tableName).addMutation(m);
+
+ }
+
+ /**
+ * Returns the row provided the record schema
+ * @param record record against which we are evaluating
+ * @param schema Record schema
+ * @param rowOrFieldName Row identifier or field name
+ * @return Text object containing the resulting row.
+ */
+ private Text getRow(final Record record,
+ final RecordSchema schema,
+ final String rowOrFieldName){
+ if ( !schema.getFieldNames().contains(rowOrFieldName) ){
+ return new Text(rowOrFieldName);
+ } else{
+ return new Text(record.getAsString(rowOrFieldName));
+ }
+ }
+
+ /**
+ * Creates a mutation with the provided arguments
+ * @param prevMutation previous mutation, to append to if in the same row.
+ * @param context process context.
+ * @param record record object extracted from the flow file
+ * @param schema schema for this record
+ * @param recordPath record path for visibility extraction
+ * @param flowFile flow file
+ * @param defaultVisibility default visibility
+ * @param config configuration of this instance.
+ * @return Returns the Mutation to insert
+ * @throws AccumuloSecurityException Error accessing Accumulo
+ * @throws AccumuloException Non security ( or table ) related Accumulo exceptions writing to the store.
+ * @throws TableNotFoundException Table not found on the cluster
+ */
+ protected Mutation createMutation(final Mutation prevMutation,
+ final ProcessContext context,
+ final Record record,
+ final RecordSchema schema,
+ final RecordPath recordPath,
+ final FlowFile flowFile,
+ final String defaultVisibility,
+ AccumuloRecordConfiguration config) throws AccumuloSecurityException, AccumuloException, TableNotFoundException {
+ Mutation m=null;
+ if (record != null) {
+
+ final Long timestamp;
+ Set<String> fieldsToSkip = new HashSet<>();
+ if (!StringUtils.isBlank(config.getTimestampField())) {
+ try {
+ timestamp = record.getAsLong(config.getTimestampField());
+ fieldsToSkip.add(config.getTimestampField());
+ } catch (Exception e) {
+ throw new AccumuloException("Could not convert " + config.getTimestampField() + " to a long", e);
+ }
+
+ if (timestamp == null) {
+ getLogger().warn("The value of timestamp field " + config.getTimestampField() + " was null, record will be inserted with latest timestamp");
+ }
+ } else {
+ timestamp = null;
+ }
+
+
+
+ RecordField visField = null;
+ Map visSettings = null;
+ if (recordPath != null) {
+ final RecordPathResult result = recordPath.evaluate(record);
+ FieldValue fv = result.getSelectedFields().findFirst().get();
+ visField = fv.getField();
+ if (null != visField)
+ fieldsToSkip.add(visField.getFieldName());
+ visSettings = (Map)fv.getValue();
+ }
+
+
+ if (null != prevMutation){
+ Text row = new Text(prevMutation.getRow());
+ Text curRow = getRow(record,schema,config.getRowField());
+ if (row.equals(curRow)){
+ m = prevMutation;
+ } else{
+ m = new Mutation(curRow);
+ addMutation(config.getTableName(),prevMutation);
+ }
+ } else{
+ Text row = getRow(record,schema,config.getRowField());
+ m = new Mutation(row);
+ }
+
+ fieldsToSkip.add(config.getRowField());
+
+ String columnFamily = config.getColumnFamily();
+ if (StringUtils.isBlank(columnFamily) && !StringUtils.isBlank(config.getColumnFamilyField())) {
+ final String cfField = config.getColumnFamilyField();
+ columnFamily = record.getAsString(cfField);
+ fieldsToSkip.add(cfField);
+ } else if (StringUtils.isBlank(columnFamily) && StringUtils.isBlank(config.getColumnFamilyField())){
+ throw new IllegalArgumentException("Invalid configuration for column family " + columnFamily + " and " + config.getColumnFamilyField());
+ }
+ final Text cf = new Text(columnFamily);
+
+ for (String name : schema.getFieldNames().stream().filter(p->!fieldsToSkip.contains(p)).collect(Collectors.toList())) {
+ String visString = (visField != null && visSettings != null && visSettings.containsKey(name))
+ ? (String)visSettings.get(name) : defaultVisibility;
+
+ Text cq = new Text(name);
+ final Value value;
+ String recordValue = record.getAsString(name);
+ if (config.getQualifierInKey()){
+ final String delim = config.getFieldDelimiter();
+ if (!StringUtils.isEmpty(delim)) {
+ if (config.getEncodeDelimiter()) {
+ byte [] asHex = DatatypeConverter.parseHexBinary(delim);
+ cq.append(asHex, 0, asHex.length);
+ }else{
+ cq.append(delim.getBytes(), 0, delim.length());
+ }
+ }
+ cq.append(recordValue.getBytes(),0,recordValue.length());
+ value = new Value();
+ } else{
+ value = new Value(recordValue.getBytes());
+ }
+
+ if (StringUtils.isBlank(visString)) {
+ visString = produceVisibility(cf.toString(), cq.toString(), flowFile, context);
+ }
+
+ ColumnVisibility cv = new ColumnVisibility();
+ if (StringUtils.isBlank(visString)) {
+ if (!StringUtils.isBlank(defaultVisibility)) {
+ cv = new ColumnVisibility(defaultVisibility);
+ }
+ } else {
+ cv = new ColumnVisibility(visString);
+ }
+
+ if (null != timestamp) {
+ if (config.isDeleteKeys()) {
+ m.putDelete(cf, cq, cv, timestamp);
+ } else {
+ m.put(cf, cq, cv, timestamp, value);
+ }
+ } else{
+ if (config.isDeleteKeys())
+ m.putDelete(cf, cq, cv);
+ else
+ m.put(cf, cq, cv, value);
+ }
+ }
+
+
+
+ }
+
+ return m;
+ }
+
+ @Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+ /**
+ * Adapted from HBase puts. This is a good approach and one that we should adopt here, too.
+ */
+ if (propertyDescriptorName.startsWith("visibility.")) {
+ String[] parts = propertyDescriptorName.split("\\.");
+ String displayName;
+ String description;
+
+ if (parts.length == 2) {
+ displayName = String.format("Column Family %s Default Visibility", parts[1]);
+ description = String.format("Default visibility setting for %s", parts[1]);
+ } else if (parts.length == 3) {
+ displayName = String.format("Column Qualifier %s.%s Default Visibility", parts[1], parts[2]);
+ description = String.format("Default visibility setting for %s.%s", parts[1], parts[2]);
+ } else {
+ return null;
+ }
+
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .displayName(displayName)
+ .description(description)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .dynamic(true)
+ .build();
+ }
+
+ return null;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/ScanAccumulo.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/ScanAccumulo.java
new file mode 100644
index 0000000..f31c15b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/ScanAccumulo.java
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.accumulo.processors;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.io.Text;
+import org.apache.nifi.accumulo.controllerservices.BaseAccumuloService;
+import org.apache.nifi.accumulo.data.KeySchema;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.StreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.LongAdder;
+
+@EventDriven
+@SupportsBatching
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@Tags({"hadoop", "accumulo", "scan", "record"})
+/**
+ * Purpose and Design: Requires a connector be defined by way of an AccumuloService object. This class
+ * simply extends BaseAccumuloProcessor to scan accumulo based on aspects and expression executed against
+ * a flow file
+ *
+ */
+public class ScanAccumulo extends BaseAccumuloProcessor {
+ static final PropertyDescriptor START_KEY = new PropertyDescriptor.Builder()
+ .displayName("Start key")
+ .name("start-key")
+ .description("Start row key")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(Validator.VALID)
+ .build();
+
+ static final PropertyDescriptor START_KEY_INCLUSIVE = new PropertyDescriptor.Builder()
+ .displayName("Start key Inclusive")
+ .name("start-key-inclusive")
+ .description("Determines if the start key is inclusive ")
+ .required(false)
+ .defaultValue("True")
+ .allowableValues("True", "False")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor END_KEY = new PropertyDescriptor.Builder()
+ .displayName("End key")
+ .name("end-key")
+ .description("End row key for this. If not specified or empty this will be infinite")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(Validator.VALID)
+ .build();
+
+ static final PropertyDescriptor END_KEY_INCLUSIVE = new PropertyDescriptor.Builder()
+ .displayName("End key Inclusive")
+ .name("end-key-inclusive")
+ .description("Determines if the end key is inclusive")
+ .required(false)
+ .defaultValue("False")
+ .allowableValues("True", "False")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor AUTHORIZATIONS = new PropertyDescriptor.Builder()
+ .name("accumulo-authorizations")
+ .displayName("Authorizations")
+ .description("The comma separated list of authorizations to pass to the scanner.")
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(Validator.VALID)
+ .build();
+
+ static final PropertyDescriptor COLUMNFAMILY = new PropertyDescriptor.Builder()
+ .name("column-family")
+ .displayName("Start Column Family")
+ .description("The column family that is part of the start key. If no column key is defined only this column family will be selected")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(Validator.VALID)
+ .build();
+
+ static final PropertyDescriptor COLUMNFAMILY_END = new PropertyDescriptor.Builder()
+ .name("column-family-end")
+ .displayName("End Column Family")
+ .description("The column family to select is part of end key")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(Validator.VALID)
+ .build();
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("A FlowFile is routed to this relationship after it has been successfully retrieved from Accumulo")
+ .build();
+ public static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("A FlowFile is routed to this relationship if it cannot be retrieved fromAccumulo")
+ .build();
+
+ static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+ .name("record-writer")
+ .displayName("Record Writer")
+ .description("Specifies the Controller Service to use for writing out the records")
+ .identifiesControllerService(RecordSetWriterFactory.class)
+ .required(true)
+ .build();
+
+ /**
+ * Connector service which provides us a connector if the configuration is correct.
+ */
+ protected BaseAccumuloService accumuloConnectorService;
+
+ /**
+ * Connector that we need to persist while we are operational.
+ */
+ protected AccumuloClient client;
+
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ final Set<Relationship> rels = new HashSet<>();
+ rels.add(REL_SUCCESS);
+ rels.add(REL_FAILURE);
+ return rels;
+ }
+
+
+ @Override
+ protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+ Collection<ValidationResult> set = new ArrayList<>();
+ if ((validationContext.getProperty(COLUMNFAMILY).isSet() && !validationContext.getProperty(COLUMNFAMILY_END).isSet())
+ || !validationContext.getProperty(COLUMNFAMILY).isSet() && validationContext.getProperty(COLUMNFAMILY_END).isSet() )
+ set.add(new ValidationResult.Builder().explanation("Column Family and Column family end must be defined").build());
+ return set;
+ }
+
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) {
+ accumuloConnectorService = context.getProperty(ACCUMULO_CONNECTOR_SERVICE).asControllerService(BaseAccumuloService.class);
+ this.client = accumuloConnectorService.getClient();
+ }
+
+ private Authorizations stringToAuth(final String authorizations){
+ if (!StringUtils.isBlank(authorizations))
+ return new Authorizations(authorizations.split(","));
+ else
+ return new Authorizations();
+ }
+
+
+ protected long scanAccumulo(final RecordSetWriterFactory writerFactory, final ProcessContext processContext, final ProcessSession processSession, final Optional<FlowFile> incomingFlowFile){
+
+ final Map<String, String> flowAttributes = incomingFlowFile.isPresent() ? incomingFlowFile.get().getAttributes() : new HashMap<>();
+ final String table = processContext.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowAttributes).getValue();
+ final String startKey = processContext.getProperty(START_KEY).evaluateAttributeExpressions(flowAttributes).getValue();
+ final boolean startKeyInclusive = processContext.getProperty(START_KEY_INCLUSIVE).asBoolean();
+ final boolean endKeyInclusive = processContext.getProperty(END_KEY_INCLUSIVE).asBoolean();
+ final String endKey = processContext.getProperty(END_KEY).evaluateAttributeExpressions(flowAttributes).getValue();
+ final String authorizations = processContext.getProperty(AUTHORIZATIONS).isSet()
+ ? processContext.getProperty(AUTHORIZATIONS).evaluateAttributeExpressions(flowAttributes).getValue() : "";
+ final int threads = processContext.getProperty(THREADS).asInteger();
+ final String startKeyCf = processContext.getProperty(COLUMNFAMILY).evaluateAttributeExpressions(flowAttributes).getValue();
+ final String endKeyCf = processContext.getProperty(COLUMNFAMILY_END).evaluateAttributeExpressions(flowAttributes).getValue();
+
+ final Authorizations auths = stringToAuth(authorizations);
+
+ final LongAdder recordCounter = new LongAdder();
+
+ final Range lookupRange = buildRange(startKey,startKeyCf,startKeyInclusive,endKey,endKeyCf,endKeyInclusive);
+
+ boolean cloneFlowFile = incomingFlowFile.isPresent();
+
+ try (BatchScanner scanner = client.createBatchScanner(table,auths,threads)) {
+ if (!StringUtils.isBlank(startKeyCf) && StringUtils.isBlank(endKeyCf))
+ scanner.fetchColumnFamily(new Text(startKeyCf));
+ scanner.setRanges(Collections.singleton(lookupRange));
+
+ final Iterator<Map.Entry<Key,Value>> kvIter = scanner.iterator();
+ if (!kvIter.hasNext()){
+ /**
+ * Create a flow file with a record count of zero.
+ */
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put("record.count", String.valueOf(0));
+ final FlowFile newFlow = processSession.create();
+ processSession.putAllAttributes(newFlow,attributes);
+ processSession.transfer(newFlow, REL_SUCCESS);
+ return 0;
+ } else{
+
+ while (kvIter.hasNext()) {
+ FlowFile iterationFlowFile = cloneFlowFile ? processSession.clone(incomingFlowFile.get()) : processSession.create();
+
+ final int keysPerFlowFile = 1000;
+ final Map<String, String> attributes = new HashMap<>();
+ iterationFlowFile = processSession.write(iterationFlowFile, new StreamCallback() {
+ @Override
+ public void process(final InputStream in, final OutputStream out) throws IOException {
+
+ try{
+ final RecordSchema writeSchema = writerFactory.getSchema(flowAttributes, new KeySchema());
+ try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out)) {
+
+ int i = 0;
+ writer.beginRecordSet();
+ for (; i < keysPerFlowFile && kvIter.hasNext(); i++) {
+
+ Map.Entry<Key, Value> kv = kvIter.next();
+
+ final Key key = kv.getKey();
+
+ Map<String, Object> data = new HashMap<>();
+ data.put("row", key.getRow().toString());
+ data.put("columnFamily", key.getColumnFamily().toString());
+ data.put("columnQualifier", key.getColumnQualifier().toString());
+ data.put("columnVisibility", key.getColumnVisibility().toString());
+ data.put("timestamp", key.getTimestamp());
+
+ MapRecord record = new MapRecord(new KeySchema(), data);
+ writer.write(record);
+
+
+ }
+ recordCounter.add(i);
+
+ final WriteResult writeResult = writer.finishRecordSet();
+ attributes.put("record.count", String.valueOf(i));
+ attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
+ attributes.putAll(writeResult.getAttributes());
+ }
+ } catch (SchemaNotFoundException e) {
+ getLogger().error("Failed to process {}; will route to failure", new Object[] {
+ incomingFlowFile.isPresent() ? incomingFlowFile.get() : "No incoming flow file", e});
+
+ throw new IOException(e);
+ }
+ }
+
+ });
+ processSession.putAllAttributes(iterationFlowFile,attributes);
+ processSession.transfer(iterationFlowFile, REL_SUCCESS);
+ }
+ }
+ } catch (final Exception e) {
+ getLogger().error("Failed to process {}; will route to failure", new Object[] {incomingFlowFile.isPresent() ? incomingFlowFile.get() : "No incoming flow file", e});
+ if (cloneFlowFile) {
+ processSession.transfer(incomingFlowFile.get(), REL_FAILURE);
+ }
+ return 0;
+ }
+
+ if (cloneFlowFile) {
+ processSession.remove(incomingFlowFile.get());
+ }
+
+ getLogger().info("Successfully converted {} records for {}", new Object[] {recordCounter.longValue(), incomingFlowFile.toString()});
+
+ return recordCounter.longValue();
+ }
+
+
+ Range buildRange(final String startRow, final String startKeyCf,boolean startKeyInclusive, final String endRow, final String endKeyCf,boolean endKeyInclusive){
+ Key start = StringUtils.isBlank(startRow) ? null : StringUtils.isBlank(startKeyCf) ? new Key(startRow) : new Key(startRow,startKeyCf);
+ Key end = StringUtils.isBlank(endRow) ? null : StringUtils.isBlank(endKeyCf) ? new Key(endRow) : new Key(endRow,endKeyCf);
+ return new Range(start,startKeyInclusive,end,endKeyInclusive);
+ }
+
+ @Override
+ public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
+ FlowFile flowFile = processSession.get();
+
+ final RecordSetWriterFactory writerFactory = processContext.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+ long recordCount = scanAccumulo(writerFactory,processContext,processSession,Optional.ofNullable(flowFile));
+
+ processSession.adjustCounter("Records Processed", recordCount, false);
+ }
+
+
+ @Override
+ public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> properties = new ArrayList<>(baseProperties);
+ properties.add(START_KEY);
+ properties.add(START_KEY_INCLUSIVE);
+ properties.add(END_KEY);
+ properties.add(COLUMNFAMILY);
+ properties.add(COLUMNFAMILY_END);
+ properties.add(END_KEY_INCLUSIVE);
+ properties.add(RECORD_WRITER);
+ properties.add(AUTHORIZATIONS);
+ return properties;
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000..a1ce072
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.accumulo.processors.PutAccumuloRecord
+org.apache.nifi.accumulo.processors.ScanAccumulo
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/controllerservices/MockAccumuloService.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/controllerservices/MockAccumuloService.java
new file mode 100644
index 0000000..4ad489a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/controllerservices/MockAccumuloService.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.accumulo.controllerservices;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class MockAccumuloService {
+
+
+ public static AccumuloService getService(final TestRunner runner, final String zk, final String instanceName, final String user, final String password) throws InitializationException {
+ final AccumuloService accclient = new AccumuloService();
+ Map<String,String> properties = new HashMap<>();
+ properties.put(AccumuloService.ACCUMULO_PASSWORD.getName(), password);
+ properties.put(AccumuloService.AUTHENTICATION_TYPE.getName(), "PASSWORD");
+ properties.put(AccumuloService.ACCUMULO_USER.getName(), user);
+ properties.put(AccumuloService.ZOOKEEPER_QUORUM.getName(), zk);
+ properties.put(AccumuloService.INSTANCE_NAME.getName(), instanceName);
+ runner.addControllerService("accclient", accclient, properties);
+ runner.enableControllerService(accclient);
+ runner.setProperty("accumulo-connector-service","accclient");
+ return accclient;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/processors/TestPutRecord.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/processors/TestPutRecord.java
new file mode 100644
index 0000000..2d45a48
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/processors/TestPutRecord.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.accumulo.processors;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.minicluster.MiniAccumuloCluster;
+import org.apache.hadoop.io.Text;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.apache.nifi.accumulo.controllerservices.AccumuloService;
+import org.apache.nifi.accumulo.controllerservices.MockAccumuloService;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+
+public class TestPutRecord {
+
+ public static final String DEFAULT_COLUMN_FAMILY = "family1";
+
+ /**
+ * Though deprecated in 2.0 it still functions very well
+ */
+ private static MiniAccumuloCluster accumulo;
+
+ private TestRunner getTestRunner(String table, String columnFamily) {
+ final TestRunner runner = TestRunners.newTestRunner(PutAccumuloRecord.class);
+ runner.enforceReadStreamsClosed(false);
+ runner.setProperty(PutAccumuloRecord.TABLE_NAME, table);
+ runner.setProperty(PutAccumuloRecord.COLUMN_FAMILY, columnFamily);
+ return runner;
+ }
+
+
+
+
+ @BeforeClass
+ public static void setupInstance() throws IOException, InterruptedException, AccumuloSecurityException, AccumuloException, TableExistsException {
+ Path tempDirectory = Files.createTempDirectory("acc"); // JUnit and Guava supply mechanisms for creating temp directories
+ accumulo = new MiniAccumuloCluster(tempDirectory.toFile(), "password");
+ accumulo.start();
+ }
+
+ private Set<Key> generateTestData(TestRunner runner, boolean valueincq, String delim, String cv) throws IOException {
+
+ final MockRecordParser parser = new MockRecordParser();
+ try {
+ runner.addControllerService("parser", parser);
+ } catch (InitializationException e) {
+ throw new IOException(e);
+ }
+ runner.enableControllerService(parser);
+ runner.setProperty(PutAccumuloRecord.RECORD_READER_FACTORY, "parser");
+
+ long ts = System.currentTimeMillis();
+
+ parser.addSchemaField("id", RecordFieldType.STRING);
+ parser.addSchemaField("name", RecordFieldType.STRING);
+ parser.addSchemaField("code", RecordFieldType.STRING);
+ parser.addSchemaField("timestamp", RecordFieldType.LONG);
+
+ Set<Key> expectedKeys = new HashSet<>();
+ ColumnVisibility colViz = new ColumnVisibility();
+ if (null != cv)
+ colViz = new ColumnVisibility(cv);
+ Random random = new Random();
+ for (int x = 0; x < 5; x++) {
+ //final int row = random.nextInt(10000000);
+ final String row = UUID.randomUUID().toString();
+ final String cf = UUID.randomUUID().toString();
+ final String cq = UUID.randomUUID().toString();
+ Text keyCq = new Text("name");
+ if (valueincq){
+ if (null != delim && !delim.isEmpty())
+ keyCq.append(delim.getBytes(),0,delim.length());
+ keyCq.append(cf.getBytes(),0,cf.length());
+ }
+ expectedKeys.add(new Key(new Text(row), new Text("family1"), keyCq, colViz,ts));
+ keyCq = new Text("code");
+ if (valueincq){
+ if (null != delim && !delim.isEmpty())
+ keyCq.append(delim.getBytes(),0,delim.length());
+ keyCq.append(cq.getBytes(),0,cq.length());
+ }
+ expectedKeys.add(new Key(new Text(row), new Text("family1"), keyCq, colViz, ts));
+ parser.addRecord(row, cf, cq, ts);
+ }
+
+ return expectedKeys;
+ }
+
+ void verifyKey(String tableName, Set<Key> expectedKeys, Authorizations auths) throws AccumuloSecurityException, AccumuloException, TableNotFoundException {
+ if (null == auths)
+ auths = new Authorizations();
+ try(BatchScanner scanner = accumulo.getConnector("root","password").createBatchScanner(tableName,auths,1)) {
+ List<Range> ranges = new ArrayList<>();
+ ranges.add(new Range());
+ scanner.setRanges(ranges);
+ for (Map.Entry<Key, Value> kv : scanner) {
+ Assert.assertTrue(kv.getKey() + " not in expected keys",expectedKeys.remove(kv.getKey()));
+ }
+ }
+ Assert.assertEquals(0, expectedKeys.size());
+
+ }
+
+ private void basicPutSetup(boolean valueincq) throws Exception {
+ basicPutSetup(valueincq,null,null,null,false);
+ }
+
+ private void basicPutSetup(boolean valueincq, final String delim) throws Exception {
+ basicPutSetup(valueincq,delim,null,null,false);
+ }
+
+ private void basicPutSetup(boolean valueincq,String delim, String auths, Authorizations defaultVis, boolean deletes) throws Exception {
+ String tableName = UUID.randomUUID().toString();
+ tableName=tableName.replace("-","a");
+ if (null != defaultVis)
+ accumulo.getConnector("root","password").securityOperations().changeUserAuthorizations("root",defaultVis);
+ TestRunner runner = getTestRunner(tableName, DEFAULT_COLUMN_FAMILY);
+ runner.setProperty(PutAccumuloRecord.CREATE_TABLE, "True");
+ runner.setProperty(PutAccumuloRecord.ROW_FIELD_NAME, "id");
+ runner.setProperty(PutAccumuloRecord.COLUMN_FAMILY, DEFAULT_COLUMN_FAMILY);
+ runner.setProperty(PutAccumuloRecord.TIMESTAMP_FIELD, "timestamp");
+ if (valueincq) {
+ if (null != delim){
+ runner.setProperty(PutAccumuloRecord.FIELD_DELIMITER, delim);
+ }
+ runner.setProperty(PutAccumuloRecord.RECORD_IN_QUALIFIER, "True");
+ }
+ if (null != defaultVis){
+ runner.setProperty(PutAccumuloRecord.DEFAULT_VISIBILITY, auths);
+ }
+ AccumuloService client = MockAccumuloService.getService(runner,accumulo.getZooKeepers(),accumulo.getInstanceName(),"root","password");
+ Set<Key> expectedKeys = generateTestData(runner,valueincq,delim, auths);
+ runner.enqueue("Test".getBytes("UTF-8")); // This is to coax the processor into reading the data in the reader.l
+ runner.run();
+
+ List<MockFlowFile> results = runner.getFlowFilesForRelationship(PutAccumuloRecord.REL_SUCCESS);
+ Assert.assertTrue("Wrong count", results.size() == 1);
+ verifyKey(tableName, expectedKeys, defaultVis);
+ if (deletes){
+ runner.setProperty(PutAccumuloRecord.DELETE_KEY, "true");
+ runner.enqueue("Test".getBytes("UTF-8")); // This is to coax the processor into reading the data in the reader.l
+ runner.run();
+ runner.getFlowFilesForRelationship(PutAccumuloRecord.REL_SUCCESS);
+ verifyKey(tableName, new HashSet<>(), defaultVis);
+ }
+
+ }
+
+
+
+
+ @Test
+ public void testByteEncodedPut() throws Exception {
+ basicPutSetup(false);
+ }
+
+ @Test
+ public void testByteEncodedPutThenDelete() throws Exception {
+ basicPutSetup(true,null,"A&B",new Authorizations("A","B"),true);
+ }
+
+
+ @Test
+ public void testByteEncodedPutCq() throws Exception {
+ basicPutSetup(true);
+ }
+
+ @Test
+ public void testByteEncodedPutCqDelim() throws Exception {
+ basicPutSetup(true,"\u0000");
+ }
+
+ @Test
+ public void testByteEncodedPutCqWithVis() throws Exception {
+ basicPutSetup(true,null,"A&B",new Authorizations("A","B"),false);
+ }
+}
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/processors/TestScanAccumulo.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/processors/TestScanAccumulo.java
new file mode 100644
index 0000000..3be8c72
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/processors/TestScanAccumulo.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.accumulo.processors;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.MultiTableBatchWriter;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.minicluster.MiniAccumuloCluster;
+import org.apache.hadoop.io.Text;
+import org.apache.nifi.accumulo.controllerservices.AccumuloService;
+import org.apache.nifi.accumulo.controllerservices.MockAccumuloService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+
+public class TestScanAccumulo {
+
+ public static final String DEFAULT_COLUMN_FAMILY = "family1";
+
+ /**
+ * Though deprecated in 2.0 it still functions very well
+ */
+ private static MiniAccumuloCluster accumulo;
+
+ private TestRunner getTestRunner(String table, String columnFamily) {
+ final TestRunner runner = TestRunners.newTestRunner(ScanAccumulo.class);
+ runner.enforceReadStreamsClosed(false);
+ runner.setProperty(ScanAccumulo.TABLE_NAME, table);
+ return runner;
+ }
+
+
+
+
+ @BeforeClass
+ public static void setupInstance() throws IOException, InterruptedException, AccumuloSecurityException, AccumuloException, TableExistsException {
+ Path tempDirectory = Files.createTempDirectory("acc"); // JUnit and Guava supply mechanisms for creating temp directories
+ accumulo = new MiniAccumuloCluster(tempDirectory.toFile(), "password");
+ accumulo.start();
+ }
+
+ private Set<Key> generateTestData(TestRunner runner, String definedRow, String table, boolean valueincq, String delim, String cv)
+ throws IOException, AccumuloSecurityException, AccumuloException, TableNotFoundException {
+
+ BatchWriterConfig writerConfig = new BatchWriterConfig();
+ writerConfig.setMaxWriteThreads(2);
+ writerConfig.setMaxMemory(1024*1024);
+ MultiTableBatchWriter writer = accumulo.getConnector("root","password").createMultiTableBatchWriter(writerConfig);
+
+ long ts = System.currentTimeMillis();
+
+
+ final MockRecordWriter parser = new MockRecordWriter();
+ try {
+ runner.addControllerService("parser", parser);
+ } catch (InitializationException e) {
+ throw new IOException(e);
+ }
+ runner.enableControllerService(parser);
+ runner.setProperty(ScanAccumulo.RECORD_WRITER,"parser");
+
+
+ Set<Key> expectedKeys = new HashSet<>();
+ ColumnVisibility colViz = new ColumnVisibility();
+ if (null != cv)
+ colViz = new ColumnVisibility(cv);
+ Random random = new Random();
+ for (int x = 0; x < 5; x++) {
+ //final int row = random.nextInt(10000000);
+ final String row = definedRow.isEmpty() ? UUID.randomUUID().toString() : definedRow;
+ final String cf = UUID.randomUUID().toString();
+ final String cq = UUID.randomUUID().toString();
+ Text keyCq = new Text("name");
+ if (valueincq){
+ if (null != delim && !delim.isEmpty())
+ keyCq.append(delim.getBytes(),0,delim.length());
+ keyCq.append(cf.getBytes(),0,cf.length());
+ }
+ expectedKeys.add(new Key(new Text(row), new Text(DEFAULT_COLUMN_FAMILY), keyCq, colViz,ts));
+ keyCq = new Text("code");
+ if (valueincq){
+ if (null != delim && !delim.isEmpty())
+ keyCq.append(delim.getBytes(),0,delim.length());
+ keyCq.append(cq.getBytes(),0,cq.length());
+ }
+ expectedKeys.add(new Key(new Text(row), new Text(DEFAULT_COLUMN_FAMILY), keyCq, colViz, ts));
+ Mutation m = new Mutation(row);
+ m.put(new Text(DEFAULT_COLUMN_FAMILY),new Text(keyCq),colViz,ts, new Value());
+ writer.getBatchWriter(table).addMutation(m);
+ }
+ writer.flush();
+ return expectedKeys;
+ }
+
+ void verifyKey(String tableName, Set<Key> expectedKeys, Authorizations auths) throws AccumuloSecurityException, AccumuloException, TableNotFoundException {
+ if (null == auths)
+ auths = new Authorizations();
+ try(BatchScanner scanner = accumulo.getConnector("root","password").createBatchScanner(tableName,auths,1)) {
+ List<Range> ranges = new ArrayList<>();
+ ranges.add(new Range());
+ scanner.setRanges(ranges);
+ for (Map.Entry<Key, Value> kv : scanner) {
+ Assert.assertTrue(kv.getKey() + " not in expected keys",expectedKeys.remove(kv.getKey()));
+ }
+ }
+ Assert.assertEquals(0, expectedKeys.size());
+
+ }
+
+ private void basicPutSetup(boolean sendFlowFile, boolean valueincq) throws Exception {
+ basicPutSetup(sendFlowFile,"","","","",valueincq,null,"",null,false,5);
+ }
+
+ private void basicPutSetup(boolean sendFlowFile, boolean valueincq, final String delim) throws Exception {
+ basicPutSetup(sendFlowFile,"","","","",valueincq,delim,"",null,false,5);
+ }
+
+ private void basicPutSetup(boolean sendFlowFile,String row,String endrow, String cf,String endcf, boolean valueincq,String delim,
+ String auths, Authorizations defaultVis, boolean deletes, int expected) throws Exception {
+ String tableName = UUID.randomUUID().toString();
+ tableName=tableName.replace("-","a");
+ accumulo.getConnector("root","password").tableOperations().create(tableName);
+ if (null != defaultVis)
+ accumulo.getConnector("root","password").securityOperations().changeUserAuthorizations("root",defaultVis);
+ TestRunner runner = getTestRunner(tableName, DEFAULT_COLUMN_FAMILY);
+ runner.setProperty(ScanAccumulo.START_KEY, row);
+ if (!cf.isEmpty())
+ runner.setProperty(ScanAccumulo.COLUMNFAMILY, cf);
+ if (!endcf.isEmpty())
+ runner.setProperty(ScanAccumulo.COLUMNFAMILY_END, endcf);
+ runner.setProperty(ScanAccumulo.AUTHORIZATIONS, auths);
+ runner.setProperty(ScanAccumulo.END_KEY, endrow);
+
+ AccumuloService client = MockAccumuloService.getService(runner,accumulo.getZooKeepers(),accumulo.getInstanceName(),"root","password");
+ Set<Key> expectedKeys = generateTestData(runner,row,tableName,valueincq,delim, auths);
+ if (sendFlowFile) {
+ runner.enqueue("Test".getBytes("UTF-8")); // This is to coax the processor into reading the data in the reader.l
+ }
+ runner.run();
+
+
+ List<MockFlowFile> results = runner.getFlowFilesForRelationship(ScanAccumulo.REL_SUCCESS);
+ for(MockFlowFile ff : results){
+ String attr = ff.getAttribute("record.count");
+ Assert.assertEquals(expected,Integer.valueOf(attr).intValue());
+ }
+ Assert.assertTrue("Wrong count, received " + results.size(), results.size() == 1);
+ }
+
+
+
+
+ @Test
+ public void testPullDatWithFlowFile() throws Exception {
+ basicPutSetup(true,false);
+ }
+
+ @Test
+ public void testPullDatWithOutFlowFile() throws Exception {
+ basicPutSetup(false,false);
+ }
+
+ @Test
+ public void testSameRowCf() throws Exception {
+ basicPutSetup(false,"2019","2019","family1","family2",false,null,"",null,false,1);
+ }
+
+ @Test
+ public void testSameRowCfValueInCq() throws Exception {
+ basicPutSetup(false,"2019","2019","family1","family2",true,null,"",null,false,5);
+ }
+
+ @Test
+ public void testSameRowCfValueInCqWithAuths() throws Exception {
+ basicPutSetup(false,"2019","2019","family1","family2",true,null,"abcd",new Authorizations("abcd"),false,5);
+ }
+
+ @Test(expected = AssertionError.class)
+ public void testSameRowCfValueInCqErrorCfEnd() throws Exception {
+ basicPutSetup(false,"2019","2019","family1","",true,null,"",null,false,5);
+ }
+
+ @Test(expected = AssertionError.class)
+ public void testSameRowCfValueInCqErrorCf() throws Exception {
+ basicPutSetup(false,"2019","2019","","family2",true,null,"",null,false,5);
+ }
+
+ @Test(expected = AssertionError.class)
+ public void testSameRowCfValueInCqErrorNotLess() throws Exception {
+ basicPutSetup(false,"2019","2019","family1","family1",true,null,"",null,false,5);
+ }
+
+
+
+}
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api-nar/pom.xml b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api-nar/pom.xml
new file mode 100644
index 0000000..c1a86ef
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api-nar/pom.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-accumulo-bundle</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>nifi-accumulo-services-api-nar</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ <packaging>nar</packaging>
+ <properties>
+ <maven.javadoc.skip>false</maven.javadoc.skip>
+ <source.skip>true</source.skip>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-api</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-standard-services-api-nar</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ <type>nar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-accumulo-services-api</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api/pom.xml b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api/pom.xml
new file mode 100644
index 0000000..6fe8641
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api/pom.xml
@@ -0,0 +1,61 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-accumulo-bundle</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>nifi-accumulo-services-api</artifactId>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.accumulo</groupId>
+ <artifactId>accumulo-core</artifactId>
+ <version>${accumulo.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>3.9</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-api</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-utils</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-serialization-service-api</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-path</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ </dependency>
+
+ </dependencies>
+</project>
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api/src/main/java/org/apache/nifi/accumulo/controllerservices/BaseAccumuloService.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api/src/main/java/org/apache/nifi/accumulo/controllerservices/BaseAccumuloService.java
new file mode 100644
index 0000000..d92b152
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api/src/main/java/org/apache/nifi/accumulo/controllerservices/BaseAccumuloService.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.accumulo.controllerservices;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.controller.ControllerService;
+
+@Tags({"accumulo", "client", "service"})
+@CapabilityDescription("Provides a basic connector to Accumulo services")
+public interface BaseAccumuloService extends ControllerService {
+
+
+ AccumuloClient getClient();
+
+}
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services-nar/pom.xml b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services-nar/pom.xml
new file mode 100644
index 0000000..c19e8ce
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services-nar/pom.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-accumulo-bundle</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>nifi-accumulo-services-nar</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ <packaging>nar</packaging>
+ <properties>
+ <maven.javadoc.skip>false</maven.javadoc.skip>
+ <source.skip>true</source.skip>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-api</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-accumulo-services</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-accumulo-services-api-nar</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ <type>nar</type>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/pom.xml b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/pom.xml
new file mode 100644
index 0000000..f6985c7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/pom.xml
@@ -0,0 +1,70 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-accumulo-bundle</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>nifi-accumulo-services</artifactId>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.accumulo</groupId>
+ <artifactId>accumulo-core</artifactId>
+ <version>${accumulo.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>3.9</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-api</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-utils</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-lookup-service-api</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-serialization-service-api</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-path</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-accumulo-services-api</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/java/org/apache/nifi/accumulo/controllerservices/AccumuloService.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/java/org/apache/nifi/accumulo/controllerservices/AccumuloService.java
new file mode 100644
index 0000000..91da7fe
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/java/org/apache/nifi/accumulo/controllerservices/AccumuloService.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.accumulo.controllerservices;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Purpose: Controller service that provides us a configured connector. Note that we don't need to close this
+ *
+ * Justification: Centralizes the configuration of the connecting accumulo code. This also will be used
+ * for any kerberos integration.
+ */
+@RequiresInstanceClassLoading
+@Tags({"accumulo", "client", "service"})
+@CapabilityDescription("A controller service for accessing an HBase client.")
+public class AccumuloService extends AbstractControllerService implements BaseAccumuloService {
+
+ private enum AuthenticationType{
+ PASSWORD,
+ NONE
+ }
+
+ protected static final PropertyDescriptor ZOOKEEPER_QUORUM = new PropertyDescriptor.Builder()
+ .name("ZooKeeper Quorum")
+ .description("Comma-separated list of ZooKeeper hosts for Accumulo.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ protected static final PropertyDescriptor INSTANCE_NAME = new PropertyDescriptor.Builder()
+ .name("Instance Name")
+ .description("Instance name of the Accumulo cluster")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+
+ protected static final PropertyDescriptor ACCUMULO_USER = new PropertyDescriptor.Builder()
+ .name("Accumulo User")
+ .description("Connecting user for Accumulo")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ protected static final PropertyDescriptor ACCUMULO_PASSWORD = new PropertyDescriptor.Builder()
+ .name("Accumulo Password")
+ .description("Connecting user's password when using the PASSWORD Authentication type")
+ .sensitive(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ protected static final PropertyDescriptor AUTHENTICATION_TYPE = new PropertyDescriptor.Builder()
+ .name("Authentication Type")
+ .description("Authentication Type")
+ .allowableValues(AuthenticationType.values())
+ .defaultValue(AuthenticationType.PASSWORD.toString())
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+
+ /**
+ * Reference to the accumulo client.
+ */
+ AccumuloClient client;
+
+ /**
+ * properties
+ */
+ private List<PropertyDescriptor> properties;
+
+ @Override
+ protected void init(ControllerServiceInitializationContext config) throws InitializationException {
+ List<PropertyDescriptor> props = new ArrayList<>();
+ props.add(ZOOKEEPER_QUORUM);
+ props.add(INSTANCE_NAME);
+ props.add(ACCUMULO_USER);
+ props.add(AUTHENTICATION_TYPE);
+ props.add(ACCUMULO_PASSWORD);
+ properties = Collections.unmodifiableList(props);
+ }
+
+ private AuthenticationToken getToken(final AuthenticationType type, final ConfigurationContext context){
+ switch(type){
+ case PASSWORD:
+ return new PasswordToken(context.getProperty(ACCUMULO_PASSWORD).getValue());
+ default:
+ return null;
+ }
+ }
+
+ @Override
+ public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(INSTANCE_NAME);
+ properties.add(ZOOKEEPER_QUORUM);
+ properties.add(ACCUMULO_USER);
+ properties.add(AUTHENTICATION_TYPE);
+ properties.add(ACCUMULO_PASSWORD);
+ return properties;
+ }
+
+ @Override
+ protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+ final List<ValidationResult> problems = new ArrayList<>();
+
+ if (!validationContext.getProperty(INSTANCE_NAME).isSet()){
+ problems.add(new ValidationResult.Builder().valid(false).subject(INSTANCE_NAME.getName()).explanation("Instance name must be supplied").build());
+ }
+
+ if (!validationContext.getProperty(ZOOKEEPER_QUORUM).isSet()){
+ problems.add(new ValidationResult.Builder().valid(false).subject(ZOOKEEPER_QUORUM.getName()).explanation("Zookeepers must be supplied").build());
+ }
+
+ if (!validationContext.getProperty(ACCUMULO_USER).isSet()){
+ problems.add(new ValidationResult.Builder().valid(false).subject(ACCUMULO_USER.getName()).explanation("Accumulo user must be supplied").build());
+ }
+
+ final AuthenticationType type = validationContext.getProperty(
+ AUTHENTICATION_TYPE).isSet() ? AuthenticationType.valueOf( validationContext.getProperty(AUTHENTICATION_TYPE).getValue() ) : AuthenticationType.PASSWORD;
+
+ switch(type){
+ case PASSWORD:
+ if (!validationContext.getProperty(ACCUMULO_PASSWORD).isSet()){
+ problems.add(
+ new ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName()).explanation("Password must be supplied for the Password Authentication type").build());
+ }
+ break;
+ default:
+ problems.add(new ValidationResult.Builder().valid(false).subject(ACCUMULO_PASSWORD.getName()).explanation("Non supported Authentication type").build());
+ }
+
+ return problems;
+ }
+
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, InterruptedException {
+ if (!context.getProperty(INSTANCE_NAME).isSet() || !context.getProperty(ZOOKEEPER_QUORUM).isSet() || !context.getProperty(ACCUMULO_USER).isSet()){
+ throw new InitializationException("Instance name and Zookeeper Quorum must be specified");
+ }
+
+
+
+ final String instanceName = context.getProperty(INSTANCE_NAME).evaluateAttributeExpressions().getValue();
+ final String zookeepers = context.getProperty(ZOOKEEPER_QUORUM).evaluateAttributeExpressions().getValue();
+ final String accumuloUser = context.getProperty(ACCUMULO_USER).evaluateAttributeExpressions().getValue();
+
+ final AuthenticationType type = AuthenticationType.valueOf( context.getProperty(AUTHENTICATION_TYPE).getValue() );
+
+
+
+ final AuthenticationToken token = getToken(type,context);
+
+ this.client = Accumulo.newClient().to(instanceName,zookeepers).as(accumuloUser,token).build();
+
+ if (null == token){
+ throw new InitializationException("Feature not implemented");
+ }
+
+ }
+
+ @Override
+ public AccumuloClient getClient(){
+ return client;
+ }
+
+ @OnDisabled
+ public void shutdown() {
+ client.close();
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
new file mode 100644
index 0000000..0e27be4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.accumulo.controllerservices.AccumuloService
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/pom.xml b/nifi-nar-bundles/nifi-accumulo-bundle/pom.xml
new file mode 100644
index 0000000..a43c010
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/pom.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-nar-bundles</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ </parent>
+
+ <properties>
+ <accumulo.version>2.0.0</accumulo.version>
+ </properties>
+
+ <artifactId>nifi-accumulo-bundle</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ <packaging>pom</packaging>
+
+ <modules>
+ <module>nifi-accumulo-services-api</module>
+ <module>nifi-accumulo-services-api-nar</module>
+ <module>nifi-accumulo-services</module>
+ <module>nifi-accumulo-services-nar</module>
+ <module>nifi-accumulo-processors</module>
+ <module>nifi-accumulo-nar</module>
+ </modules>
+
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-accumulo-processors</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-services</artifactId>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+</project>
diff --git a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/DataDogReportingTask.java b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/DataDogReportingTask.java
index 1c0704e..3747571 100644
--- a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/DataDogReportingTask.java
+++ b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/DataDogReportingTask.java
@@ -139,7 +139,7 @@ public class DataDogReportingTask extends AbstractReportingTask {
try {
updateDataDogTransport(context);
} catch (IOException e) {
- e.printStackTrace();
+ logger.warn("Unable to update data dog transport", e);
}
updateAllMetricGroups(status);
ddMetricRegistryBuilder.getDatadogReporter().report();
diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml
index d23aa21..522cdb4 100755
--- a/nifi-nar-bundles/pom.xml
+++ b/nifi-nar-bundles/pom.xml
@@ -99,7 +99,8 @@
<module>nifi-prometheus-bundle</module>
<module>nifi-easyrules-bundle</module>
<module>nifi-sql-reporting-bundle</module>
- <module>nifi-rules-action-handler-bundle</module>
+ <module>nifi-rules-action-handler-bundle</module>
+ <module>nifi-accumulo-bundle</module>
</modules>
<build>