You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2015/11/10 18:28:56 UTC

[3/3] nifi git commit: NIFI-817 Processors for interacting with HBase - Refactoring PutHBaseCell to batch Puts by table - Adding optional Columns property to GetHBase to return only selected column families or columns - Making GetHBase cluster friendly b

NIFI-817 Processors for interacting with HBase
- Refactoring PutHBaseCell to batch Puts by table
- Adding optional Columns property to GetHBase to return only selected column families or columns
- Making GetHBase cluster friendly by storing state in the distributed cache and a local file
- Adding Initial Time Range property to GetHBase
- Adding Filter Expression property and custom validate to prevent using columns and a filter at the same time
- Creating an HBaseClientService controller service to isolate the HBase client and support multiple versions
- Creating appropriate LICENSE/NOTICE files
- Adding @InputRequirement to processors
- Addressing comments from review, moving hbase client services under standard services
- Making sure result of session.penalize() is assinged to FlowFile variable before transferring


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e748fd58
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e748fd58
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e748fd58

Branch: refs/heads/master
Commit: e748fd5848eff2332761ef2e4ca2c7603df55ddb
Parents: 2a90bd5
Author: Bryan Bende <bb...@apache.org>
Authored: Fri Oct 2 17:22:29 2015 -0400
Committer: Bryan Bende <bb...@apache.org>
Committed: Tue Nov 10 11:13:51 2015 -0500

----------------------------------------------------------------------
 nifi-assembly/LICENSE                           |  88 +++
 nifi-assembly/NOTICE                            |  19 +
 nifi-assembly/pom.xml                           |  10 +
 .../nifi-hadoop-libraries-bundle/pom.xml        |  11 +
 .../nifi-hbase-bundle/nifi-hbase-nar/pom.xml    |  36 ++
 .../src/main/resources/META-INF/NOTICE          |  19 +
 .../nifi-hbase-processors/pom.xml               |  73 +++
 .../nifi/hbase/AbstractHBaseProcessor.java      |  23 +
 .../java/org/apache/nifi/hbase/GetHBase.java    | 547 +++++++++++++++++++
 .../org/apache/nifi/hbase/PutHBaseCell.java     | 202 +++++++
 .../apache/nifi/hbase/io/JsonRowSerializer.java |  69 +++
 .../org/apache/nifi/hbase/io/RowSerializer.java |  35 ++
 .../org/apache/nifi/hbase/util/ObjectSerDe.java |  56 ++
 .../org/apache/nifi/hbase/util/StringSerDe.java |  44 ++
 .../org.apache.nifi.processor.Processor         |  17 +
 .../nifi/hbase/MockHBaseClientService.java      | 102 ++++
 .../org/apache/nifi/hbase/TestGetHBase.java     | 459 ++++++++++++++++
 .../org/apache/nifi/hbase/TestPutHBaseCell.java | 274 ++++++++++
 .../apache/nifi/hbase/util/TestObjectSerDe.java |  71 +++
 nifi-nar-bundles/nifi-hbase-bundle/pom.xml      |  54 ++
 .../nifi-hbase-client-service-api/pom.xml       |  39 ++
 .../apache/nifi/hbase/HBaseClientService.java   |  64 +++
 .../org/apache/nifi/hbase/put/PutFlowFile.java  |  67 +++
 .../java/org/apache/nifi/hbase/scan/Column.java |  71 +++
 .../org/apache/nifi/hbase/scan/ResultCell.java  | 188 +++++++
 .../apache/nifi/hbase/scan/ResultHandler.java   |  26 +
 .../hbase/validate/ConfigFilesValidator.java    |  38 ++
 .../nifi-hbase_1_1_2-client-service-nar/pom.xml |  42 ++
 .../src/main/resources/META-INF/LICENSE         | 357 ++++++++++++
 .../src/main/resources/META-INF/NOTICE          | 334 +++++++++++
 .../nifi-hbase_1_1_2-client-service/pom.xml     |  78 +++
 .../nifi/hbase/HBase_1_1_2_ClientService.java   | 207 +++++++
 ...org.apache.nifi.controller.ControllerService |  15 +
 .../hbase/TestHBase_1_1_2_ClientService.java    | 380 +++++++++++++
 .../org/apache/nifi/hbase/TestProcessor.java    |  48 ++
 .../src/test/resources/core-site.xml            |  22 +
 .../pom.xml                                     |  50 ++
 .../nifi-standard-services-api-nar/pom.xml      |   5 +
 nifi-nar-bundles/nifi-standard-services/pom.xml |   6 +-
 nifi-nar-bundles/pom.xml                        |   1 +
 pom.xml                                         |  18 +
 41 files changed, 4263 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/e748fd58/nifi-assembly/LICENSE
----------------------------------------------------------------------
diff --git a/nifi-assembly/LICENSE b/nifi-assembly/LICENSE
index 5abc79a..5c499e3 100644
--- a/nifi-assembly/LICENSE
+++ b/nifi-assembly/LICENSE
@@ -792,6 +792,94 @@ This product bundles HexViewJS available under an MIT License
     OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
     WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 
+  The binary distribution of this product bundles 'Jcodings' under an MIT style
+  license.
+
+    Permission is hereby granted, free of charge, to any person obtaining a copy of
+    this software and associated documentation files (the "Software"), to deal in
+    the Software without restriction, including without limitation the rights to
+    use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
+    of the Software, and to permit persons to whom the Software is furnished to do
+    so, subject to the following conditions:
+
+    The above copyright notice and this permission notice shall be included in all
+    copies or substantial portions of the Software.
+
+    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+    AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+    OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+    SOFTWARE.
+
+  The binary distribution of this product bundles 'Joni' under an MIT style
+  license.
+
+    Permission is hereby granted, free of charge, to any person obtaining a copy of
+    this software and associated documentation files (the "Software"), to deal in
+    the Software without restriction, including without limitation the rights to
+    use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
+    of the Software, and to permit persons to whom the Software is furnished to do
+    so, subject to the following conditions:
+
+    The above copyright notice and this permission notice shall be included in all
+    copies or substantial portions of the Software.
+
+    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+    AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+    OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+    SOFTWARE.
+
+The binary distribution of this product bundles 'Google Protocol Buffers Java 2.5.0'
+which is licensed under a BSD license.
+
+  This license applies to all parts of Protocol Buffers except the following:
+
+    - Atomicops support for generic gcc, located in
+      src/google/protobuf/stubs/atomicops_internals_generic_gcc.h.
+      This file is copyrighted by Red Hat Inc.
+
+    - Atomicops support for AIX/POWER, located in
+      src/google/protobuf/stubs/atomicops_internals_aix.h.
+      This file is copyrighted by Bloomberg Finance LP.
+
+  Copyright 2014, Google Inc.  All rights reserved.
+
+  Redistribution and use in source and binary forms, with or without
+  modification, are permitted provided that the following conditions are
+  met:
+
+      * Redistributions of source code must retain the above copyright
+  notice, this list of conditions and the following disclaimer.
+      * Redistributions in binary form must reproduce the above
+  copyright notice, this list of conditions and the following disclaimer
+  in the documentation and/or other materials provided with the
+  distribution.
+      * Neither the name of Google Inc. nor the names of its
+  contributors may be used to endorse or promote products derived from
+  this software without specific prior written permission.
+
+  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+  Code generated by the Protocol Buffer compiler is owned by the owner
+  of the input file used when generating it.  This code is not
+  standalone and requires a support library to be linked with it.  This
+  support library is itself covered by the above license.
+
 This product bundles 'JCraft Jzlib' which is available under a 3-Clause BSD License.
 
     Copyright (c) 2002-2014 Atsuhiko Yamanaka, JCraft,Inc. 

http://git-wip-us.apache.org/repos/asf/nifi/blob/e748fd58/nifi-assembly/NOTICE
----------------------------------------------------------------------
diff --git a/nifi-assembly/NOTICE b/nifi-assembly/NOTICE
index 495c74c..02c9d7f 100644
--- a/nifi-assembly/NOTICE
+++ b/nifi-assembly/NOTICE
@@ -746,6 +746,25 @@ The following binary components are provided under the Apache Software License v
         Couchbase Java SDK
         Copyright 2012 Netflix, Inc.
 
+    (ASLv2) HBase Common
+      The following NOTICE information applies:
+        This product includes portions of the Guava project v14, specifically
+        'hbase-common/src/main/java/org/apache/hadoop/hbase/io/LimitInputStream.java'
+
+        Copyright (C) 2007 The Guava Authors
+
+        Licensed under the Apache License, Version 2.0
+
+    (ASLv2) HTrace Core
+      The following NOTICE information applies:
+        In addition, this product includes software dependencies. See
+        the accompanying LICENSE.txt for a listing of dependencies
+        that are NOT Apache licensed (with pointers to their licensing)
+
+        Apache HTrace includes an Apache Thrift connector to Zipkin. Zipkin
+        is a distributed tracing system that is Apache 2.0 Licensed.
+        Copyright 2012 Twitter, Inc.
+
 ************************
 Common Development and Distribution License 1.1
 ************************

http://git-wip-us.apache.org/repos/asf/nifi/blob/e748fd58/nifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index e80c2d5..8e4a175 100644
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -232,6 +232,16 @@ language governing permissions and limitations under the License. -->
             <artifactId>nifi-couchbase-nar</artifactId>
             <type>nar</type>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-hbase-nar</artifactId>
+            <type>nar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-hbase_1_1_2-client-service-nar</artifactId>
+            <type>nar</type>
+        </dependency>
     </dependencies>
 
     <properties>

http://git-wip-us.apache.org/repos/asf/nifi/blob/e748fd58/nifi-nar-bundles/nifi-hadoop-libraries-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-libraries-bundle/pom.xml b/nifi-nar-bundles/nifi-hadoop-libraries-bundle/pom.xml
index a01a114..0420a12 100644
--- a/nifi-nar-bundles/nifi-hadoop-libraries-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-hadoop-libraries-bundle/pom.xml
@@ -26,4 +26,15 @@
     <modules>
         <module>nifi-hadoop-libraries-nar</module>
     </modules>
+
+    <dependencyManagement>
+        <dependencies>
+            <!-- the top-level pom forces 18.0, but Hadoop 2.6 expects 12.0.1 -->
+            <dependency>
+                <groupId>com.google.guava</groupId>
+                <artifactId>guava</artifactId>
+                <version>${hadoop.guava.version}</version>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
 </project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/e748fd58/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-nar/pom.xml b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-nar/pom.xml
new file mode 100644
index 0000000..fa9870d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-nar/pom.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-hbase-bundle</artifactId>
+        <version>0.4.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>nifi-hbase-nar</artifactId>
+    <packaging>nar</packaging>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-standard-services-api-nar</artifactId>
+            <type>nar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-hbase-processors</artifactId>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/e748fd58/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-nar/src/main/resources/META-INF/NOTICE
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-nar/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000..a9f1f6f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-nar/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,19 @@
+nifi-hbase-nar
+Copyright 2014-2015 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+===========================================
+Apache Software License v2
+===========================================
+
+The following binary components are provided under the Apache Software License v2
+
+  (ASLv2) Apache Commons Lang
+    The following NOTICE information applies:
+      Apache Commons Lang
+      Copyright 2001-2015 The Apache Software Foundation
+
+      This product includes software from the Spring Framework,
+      under the Apache License 2.0 (see: StringUtils.containsWhitespace())
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/e748fd58/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml
new file mode 100644
index 0000000..b474c6a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml
@@ -0,0 +1,73 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-hbase-bundle</artifactId>
+        <version>0.4.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>nifi-hbase-processors</artifactId>
+    <description>Support for interacting with HBase</description>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-hbase-client-service-api</artifactId>
+            <version>0.4.0-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-distributed-cache-client-service-api</artifactId>
+            <version>0.4.0-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-processor-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>3.4</version>
+        </dependency>
+
+		<dependency>
+            <groupId>org.apache.nifi</groupId>
+			<artifactId>nifi-mock</artifactId>
+			<scope>test</scope>
+		</dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <version>1.10.19</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+            <version>2.5.4</version>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/e748fd58/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractHBaseProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractHBaseProcessor.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractHBaseProcessor.java
new file mode 100644
index 0000000..9cce35e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractHBaseProcessor.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+import org.apache.nifi.processor.AbstractProcessor;
+
+public abstract class AbstractHBaseProcessor extends AbstractProcessor {
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e748fd58/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java
new file mode 100644
index 0000000..5f08265
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java
@@ -0,0 +1,547 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnRemoved;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
+import org.apache.nifi.annotation.notification.PrimaryNodeState;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.hbase.io.JsonRowSerializer;
+import org.apache.nifi.hbase.io.RowSerializer;
+import org.apache.nifi.hbase.scan.Column;
+import org.apache.nifi.hbase.scan.ResultCell;
+import org.apache.nifi.hbase.scan.ResultHandler;
+import org.apache.nifi.hbase.util.ObjectSerDe;
+import org.apache.nifi.hbase.util.StringSerDe;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.ObjectHolder;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+@TriggerWhenEmpty
+@TriggerSerially
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@Tags({"hbase", "get", "ingest"})
+@CapabilityDescription("This Processor polls HBase for any records in the specified table. The processor keeps track of the timestamp of the cells that "
+        + "it receives, so that as new records are pushed to HBase, they will automatically be pulled. Each record is output in JSON format, as "
+        + "{\"row\": \"<row key>\", \"cells\": { \"<column 1 family>:<column 1 qualifier>\": \"<cell 1 value>\", \"<column 2 family>:<column 2 qualifier>\": \"<cell 2 value>\", ... }}. "
+        + "For each record received, a Provenance RECEIVE event is emitted with the format hbase://<table name>/<row key>, where <row key> is the UTF-8 encoded value of the row's key.")
+@WritesAttributes({
+    @WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the data was pulled from"),
+    @WritesAttribute(attribute = "mime.type", description = "Set to application/json to indicate that output is JSON")
+})
+public class GetHBase extends AbstractHBaseProcessor {
+
+    static final Pattern COLUMNS_PATTERN = Pattern.compile("\\w+(:\\w+)?(?:,\\w+(:\\w+)?)*");
+
+    static final AllowableValue NONE = new AllowableValue("None", "None");
+    static final AllowableValue CURRENT_TIME = new AllowableValue("Current Time", "Current Time");
+
+    static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder()
+            .name("HBase Client Service")
+            .description("Specifies the Controller Service to use for accessing HBase.")
+            .required(true)
+            .identifiesControllerService(HBaseClientService.class)
+            .build();
+    static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
+            .name("Distributed Cache Service")
+            .description("Specifies the Controller Service that should be used to maintain state about what has been pulled from HBase" +
+                    " so that if a new node begins pulling data, it won't duplicate all of the work that has been done.")
+            .required(true)
+            .identifiesControllerService(DistributedMapCacheClient.class)
+            .build();
+    static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
+            .name("Character Set")
+            .description("Specifies which character set is used to encode the data in HBase")
+            .required(true)
+            .defaultValue("UTF-8")
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .build();
+    static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
+            .name("Table Name")
+            .description("The name of the HBase Table to put data into")
+            .required(true)
+            .expressionLanguageSupported(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    static final PropertyDescriptor COLUMNS = new PropertyDescriptor.Builder()
+            .name("Columns")
+            .description("A comma-separated list of \"<colFamily>:<colQualifier>\" pairs to return when scanning. To return all columns " +
+                    "for a given family, leave off the qualifier such as \"<colFamily1>,<colFamily2>\".")
+            .required(false)
+            .expressionLanguageSupported(false)
+            .addValidator(StandardValidators.createRegexMatchingValidator(COLUMNS_PATTERN))
+            .build();
+    static final PropertyDescriptor FILTER_EXPRESSION = new PropertyDescriptor.Builder()
+            .name("Filter Expression")
+            .description("An HBase filter expression that will be applied to the scan. This property can not be used when also using the Columns property.")
+            .required(false)
+            .expressionLanguageSupported(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    static final PropertyDescriptor INITIAL_TIMERANGE = new PropertyDescriptor.Builder()
+            .name("Initial Time Range")
+            .description("The time range to use on the first scan of a table. None will pull the entire table on the first scan, " +
+                    "Current Time will pull entries from that point forward.")
+            .required(true)
+            .expressionLanguageSupported(false)
+            .allowableValues(NONE, CURRENT_TIME)
+            .defaultValue(NONE.getValue())
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("All FlowFiles are routed to this relationship")
+            .build();
+
+    private volatile ScanResult lastResult = null;
+    private volatile List<Column> columns = new ArrayList<>();
+    private volatile boolean electedPrimaryNode = false;
+    private volatile String previousTable = null;
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return Collections.singleton(REL_SUCCESS);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(HBASE_CLIENT_SERVICE);
+        properties.add(DISTRIBUTED_CACHE_SERVICE);
+        properties.add(TABLE_NAME);
+        properties.add(COLUMNS);
+        properties.add(FILTER_EXPRESSION);
+        properties.add(INITIAL_TIMERANGE);
+        properties.add(CHARSET);
+        return properties;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        final String columns = validationContext.getProperty(COLUMNS).getValue();
+        final String filter = validationContext.getProperty(FILTER_EXPRESSION).getValue();
+
+        final List<ValidationResult> problems = new ArrayList<>();
+
+        if (!StringUtils.isBlank(columns) && !StringUtils.isBlank(filter)) {
+            problems.add(new ValidationResult.Builder()
+                    .subject(FILTER_EXPRESSION.getDisplayName())
+                    .input(filter).valid(false)
+                    .explanation("a filter expression can not be used in conjunction with the Columns property")
+                    .build());
+        }
+
+        return problems;
+    }
+
+    @Override
+    public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
+        if (descriptor.equals(TABLE_NAME)) {
+            lastResult = null;
+        }
+    }
+
+    @OnScheduled
+    public void parseColumns(final ProcessContext context) {
+        final String columnsValue = context.getProperty(COLUMNS).getValue();
+        final String[] columns = (columnsValue == null || columnsValue.isEmpty() ? new String[0] : columnsValue.split(","));
+
+        this.columns.clear();
+        for (final String column : columns) {
+            if (column.contains(":"))  {
+                final String[] parts = column.split(":");
+                final byte[] cf = parts[0].getBytes(Charset.forName("UTF-8"));
+                final byte[] cq = parts[1].getBytes(Charset.forName("UTF-8"));
+                this.columns.add(new Column(cf, cq));
+            } else {
+                final byte[] cf = column.getBytes(Charset.forName("UTF-8"));
+                this.columns.add(new Column(cf, null));
+            }
+        }
+    }
+
+    @OnPrimaryNodeStateChange
+    public void onPrimaryNodeChange(final PrimaryNodeState newState) {
+        if (newState == PrimaryNodeState.ELECTED_PRIMARY_NODE) {
+            electedPrimaryNode = true;
+        }
+    }
+
+    @OnRemoved
+    public void onRemoved(final ProcessContext context) {
+        final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
+        clearState(client);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final String tableName = context.getProperty(TABLE_NAME).getValue();
+        final String initialTimeRange = context.getProperty(INITIAL_TIMERANGE).getValue();
+        final String filterExpression = context.getProperty(FILTER_EXPRESSION).getValue();
+        final HBaseClientService hBaseClientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
+        final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
+
+        // if the table was changed then remove any previous state
+        if (previousTable != null && !tableName.equals(previousTable)) {
+            clearState(client);
+            previousTable = tableName;
+        }
+
+        try {
+            final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue());
+            final RowSerializer serializer = new JsonRowSerializer(charset);
+
+            this.lastResult = getState(client);
+            final long defaultMinTime = (initialTimeRange.equals(NONE.getValue()) ? 0L : System.currentTimeMillis());
+            final long minTime = (lastResult == null ? defaultMinTime : lastResult.getTimestamp());
+
+            final Map<String, Set<String>> cellsMatchingTimestamp = new HashMap<>();
+
+            final ObjectHolder<Long> rowsPulledHolder = new ObjectHolder<>(0L);
+            final ObjectHolder<Long> latestTimestampHolder = new ObjectHolder<>(minTime);
+
+
+            hBaseClientService.scan(tableName, columns, filterExpression, minTime, new ResultHandler() {
+                @Override
+                public void handle(final byte[] rowKey, final ResultCell[] resultCells) {
+
+                    final String rowKeyString = new String(rowKey, StandardCharsets.UTF_8);
+
+                    // check if latest cell timestamp is equal to our cutoff.
+                    // if any of the cells have a timestamp later than our cutoff, then we
+                    // want the row. But if the cell with the latest timestamp is equal to
+                    // our cutoff, then we want to check if that's one of the cells that
+                    // we have already seen.
+                    long latestCellTimestamp = 0L;
+                    for (final ResultCell cell : resultCells) {
+                        if (cell.getTimestamp() > latestCellTimestamp) {
+                            latestCellTimestamp = cell.getTimestamp();
+                        }
+                    }
+
+                    // we've already seen this.
+                    if (latestCellTimestamp < minTime) {
+                        getLogger().debug("latest cell timestamp for row {} is {}, which is earlier than the minimum time of {}",
+                                new Object[] {rowKeyString, latestCellTimestamp, minTime});
+                        return;
+                    }
+
+                    if (latestCellTimestamp == minTime) {
+                        // latest cell timestamp is equal to our minimum time. Check if all cells that have
+                        // that timestamp are in our list of previously seen cells.
+                        boolean allSeen = true;
+                        for (final ResultCell cell : resultCells) {
+                            if (cell.getTimestamp() == latestCellTimestamp) {
+                                if (lastResult == null || !lastResult.contains(cell)) {
+                                    allSeen = false;
+                                    break;
+                                }
+                            }
+                        }
+
+                        if (allSeen) {
+                            // we have already seen all of the cells for this row. We do not want to
+                            // include this cell in our output.
+                            getLogger().debug("all cells for row {} have already been seen", new Object[] { rowKeyString });
+                            return;
+                        }
+                    }
+
+                    // If the latest timestamp of the cell is later than the latest timestamp we have already seen,
+                    // we want to keep track of the cells that match this timestamp so that the next time we scan,
+                    // we can ignore these cells.
+                    if (latestCellTimestamp >= latestTimestampHolder.get()) {
+                        // new timestamp, so clear all of the 'matching cells'
+                        if (latestCellTimestamp > latestTimestampHolder.get()) {
+                            latestTimestampHolder.set(latestCellTimestamp);
+                            cellsMatchingTimestamp.clear();
+                        }
+
+                        for (final ResultCell cell : resultCells) {
+                            final long ts = cell.getTimestamp();
+                            if (ts == latestCellTimestamp) {
+                                final byte[] rowValue = Arrays.copyOfRange(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength() + cell.getRowOffset());
+                                final byte[] cellValue = Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength() + cell.getValueOffset());
+
+                                final String rowHash = new String(rowValue, StandardCharsets.UTF_8);
+                                Set<String> cellHashes = cellsMatchingTimestamp.get(rowHash);
+                                if (cellHashes == null) {
+                                    cellHashes = new HashSet<>();
+                                    cellsMatchingTimestamp.put(rowHash, cellHashes);
+                                }
+                                cellHashes.add(new String(cellValue, StandardCharsets.UTF_8));
+                            }
+                        }
+                    }
+
+                    // write the row to a new FlowFile.
+                    FlowFile flowFile = session.create();
+                    flowFile = session.write(flowFile, new OutputStreamCallback() {
+                        @Override
+                        public void process(final OutputStream out) throws IOException {
+                            serializer.serialize(rowKey, resultCells, out);
+                        }
+                    });
+
+                    final Map<String, String> attributes = new HashMap<>();
+                    attributes.put("hbase.table", tableName);
+                    attributes.put("mime.type", "application/json");
+                    flowFile = session.putAllAttributes(flowFile, attributes);
+
+                    session.getProvenanceReporter().receive(flowFile, "hbase://" + tableName + "/" + rowKeyString);
+                    session.transfer(flowFile, REL_SUCCESS);
+                    getLogger().debug("Received {} from HBase with row key {}", new Object[]{flowFile, rowKeyString});
+
+                    // we could potentially have a huge number of rows. If we get to 500, go ahead and commit the
+                    // session so that we can avoid buffering tons of FlowFiles without ever sending any out.
+                    long rowsPulled = rowsPulledHolder.get();
+                    rowsPulledHolder.set(++rowsPulled);
+
+                    if (++rowsPulled % getBatchSize() == 0) {
+                        session.commit();
+                    }
+                }
+            });
+
+            final ScanResult scanResults = new ScanResult(latestTimestampHolder.get(), cellsMatchingTimestamp);
+
+            // Commit session before we replace the lastResult; if session commit fails, we want
+            // to pull these records again.
+            session.commit();
+            if (lastResult == null || scanResults.getTimestamp() > lastResult.getTimestamp()) {
+                lastResult = scanResults;
+            } else if (scanResults.getTimestamp() == lastResult.getTimestamp()) {
+                final Map<String, Set<String>> combinedResults = new HashMap<>(scanResults.getMatchingCells());
+
+                // copy the results of result.getMatchingCells() to combinedResults.
+                // do a deep copy because the Set may be modified below.
+                for (final Map.Entry<String, Set<String>> entry : scanResults.getMatchingCells().entrySet()) {
+                    combinedResults.put(entry.getKey(), new HashSet<>(entry.getValue()));
+                }
+
+                // combined the results from 'lastResult'
+                for (final Map.Entry<String, Set<String>> entry : lastResult.getMatchingCells().entrySet()) {
+                    final Set<String> existing = combinedResults.get(entry.getKey());
+                    if (existing == null) {
+                        combinedResults.put(entry.getKey(), new HashSet<>(entry.getValue()));
+                    } else {
+                        existing.addAll(entry.getValue());
+                    }
+                }
+                final ScanResult scanResult = new ScanResult(scanResults.getTimestamp(), combinedResults);
+                lastResult = scanResult;
+            }
+
+            // save state to local storage and to distributed cache
+            persistState(client, lastResult);
+
+        } catch (final IOException e) {
+            getLogger().error("Failed to receive data from HBase due to {}", e);
+            session.rollback();
+        } finally {
+            // if we failed, we want to yield so that we don't hammer hbase. If we succeed, then we have
+            // pulled all of the records, so we want to wait a bit before hitting hbase again anyway.
+            context.yield();
+        }
+    }
+
+    // present for tests
+    protected int getBatchSize() {
+        return 500;
+    }
+
+    protected File getStateDir() {
+        return new File("conf/state");
+    }
+
+    protected File getStateFile() {
+        return new File(getStateDir(), "getHBase-" + getIdentifier());
+    }
+
+    protected String getKey() {
+        return "getHBase-" + getIdentifier() + "-state";
+    }
+
+    protected List<Column> getColumns() {
+        return columns;
+    }
+
+    private void persistState(final DistributedMapCacheClient client, final ScanResult scanResult) {
+        final File stateDir = getStateDir();
+        if (!stateDir.exists()) {
+            stateDir.mkdirs();
+        }
+
+        final File file = getStateFile();
+        try (final OutputStream fos = new FileOutputStream(file);
+                final ObjectOutputStream oos = new ObjectOutputStream(fos)) {
+            oos.writeObject(scanResult);
+        } catch (final IOException ioe) {
+            getLogger().warn("Unable to save state locally. If the node is restarted now, data may be duplicated. Failure is due to {}", ioe);
+        }
+
+        try {
+            client.put(getKey(), scanResult, new StringSerDe(), new ObjectSerDe());
+        } catch (final IOException ioe) {
+            getLogger().warn("Unable to communicate with distributed cache server due to {}. Persisting state locally instead.", ioe);
+        }
+    }
+
+    private void clearState(final DistributedMapCacheClient client) {
+        final File localState = getStateFile();
+        if (localState.exists()) {
+            localState.delete();
+        }
+
+        try {
+            client.remove(getKey(), new StringSerDe());
+        } catch (IOException e) {
+            getLogger().warn("Processor state was not cleared from distributed cache due to {}", new Object[]{e});
+        }
+    }
+
+    private ScanResult getState(final DistributedMapCacheClient client) throws IOException {
+        final StringSerDe stringSerDe = new StringSerDe();
+        final ObjectSerDe objectSerDe = new ObjectSerDe();
+
+        ScanResult scanResult = lastResult;
+        // if we have no previous result, or we just became primary, pull from distributed cache
+        if (scanResult == null || electedPrimaryNode) {
+            final Object obj = client.get(getKey(), stringSerDe, objectSerDe);
+            if (obj == null || !(obj instanceof ScanResult)) {
+                scanResult = null;
+            } else {
+                scanResult = (ScanResult) obj;
+                getLogger().debug("Retrieved state from the distributed cache, previous timestamp was {}" , new Object[] {scanResult.getTimestamp()});
+            }
+
+            // no requirement to pull an update from the distributed cache anymore.
+            electedPrimaryNode = false;
+        }
+
+        // Check the persistence file. We want to use the latest timestamp that we have so that
+        // we don't duplicate data.
+        final File file = getStateFile();
+        if (file.exists()) {
+            try (final InputStream fis = new FileInputStream(file);
+                 final ObjectInputStream ois = new ObjectInputStream(fis)) {
+
+                final Object obj = ois.readObject();
+                if (obj != null && (obj instanceof ScanResult)) {
+                    final ScanResult localScanResult = (ScanResult) obj;
+                    if (scanResult == null || localScanResult.getTimestamp() > scanResult.getTimestamp()) {
+                        scanResult = localScanResult;
+                        getLogger().debug("Using last timestamp from local state because it was newer than the distributed cache, or no value existed in the cache");
+
+                        // Our local persistence file shows a later time than the Distributed service.
+                        // Update the distributed service to match our local state.
+                        try {
+                            client.put(getKey(), localScanResult, stringSerDe, objectSerDe);
+                        } catch (final IOException ioe) {
+                            getLogger().warn("Local timestamp is {}, which is later than Distributed state but failed to update Distributed "
+                                            + "state due to {}. If a new node performs GetHBase Listing, data duplication may occur",
+                                    new Object[] {localScanResult.getTimestamp(), ioe});
+                        }
+                    }
+                }
+            } catch (final IOException | ClassNotFoundException ioe) {
+                getLogger().warn("Failed to recover persisted state from {} due to {}. Assuming that state from distributed cache is correct.", new Object[]{file, ioe});
+            }
+        }
+
+        return scanResult;
+    }
+
+    public static class ScanResult implements Serializable {
+
+        private static final long serialVersionUID = 1L;
+
+        private final long latestTimestamp;
+        private final Map<String, Set<String>> matchingCellHashes;
+
+        public ScanResult(final long timestamp, final Map<String, Set<String>> cellHashes) {
+            latestTimestamp = timestamp;
+            matchingCellHashes = cellHashes;
+        }
+
+        public long getTimestamp() {
+            return latestTimestamp;
+        }
+
+        public Map<String, Set<String>> getMatchingCells() {
+            return matchingCellHashes;
+        }
+
+        public boolean contains(final ResultCell cell) {
+            if (cell.getTimestamp() != latestTimestamp) {
+                return false;
+            }
+
+            final byte[] row = Arrays.copyOfRange(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength() + cell.getRowOffset());
+            final String rowHash = new String(row, StandardCharsets.UTF_8);
+            final Set<String> cellHashes = matchingCellHashes.get(rowHash);
+            if (cellHashes == null) {
+                return false;
+            }
+
+            final byte[] cellValue = Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength() + cell.getValueOffset());
+            final String cellHash = new String(cellValue, StandardCharsets.UTF_8);
+            return cellHashes.contains(cellHash);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e748fd58/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseCell.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseCell.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseCell.java
new file mode 100644
index 0000000..0a2b763
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseCell.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.hbase.put.PutFlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.StreamUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@EventDriven
+@SupportsBatching
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"hadoop", "hbase"})
+@CapabilityDescription("Adds the Contents of a FlowFile to HBase as the value of a single cell")
+public class PutHBaseCell extends AbstractProcessor {
+
+    protected static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder()
+            .name("HBase Client Service")
+            .description("Specifies the Controller Service to use for accessing HBase.")
+            .required(true)
+            .identifiesControllerService(HBaseClientService.class)
+            .build();
+    protected static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
+            .name("Table Name")
+            .description("The name of the HBase Table to put data into")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    static final PropertyDescriptor ROW = new PropertyDescriptor.Builder()
+            .name("Row Identifier")
+            .description("Specifies the Row ID to use when inserting data into HBase")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    static final PropertyDescriptor COLUMN_FAMILY = new PropertyDescriptor.Builder()
+            .name("Column Family")
+            .description("The Column Family to use when inserting data into HBase")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    static final PropertyDescriptor COLUMN_QUALIFIER = new PropertyDescriptor.Builder()
+            .name("Column Qualifier")
+            .description("The Column Qualifier to use when inserting data into HBase")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+            .name("Batch Size")
+            .description("The maximum number of FlowFiles to process in a single execution. The FlowFiles will be " +
+                    "grouped by table, and a single Put per table will be performed.")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("25")
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is routed to this relationship after it has been successfully stored in HBase")
+            .build();
+    static final Relationship FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is routed to this relationship if it cannot be sent to HBase")
+            .build();
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(HBASE_CLIENT_SERVICE);
+        properties.add(TABLE_NAME);
+        properties.add(ROW);
+        properties.add(COLUMN_FAMILY);
+        properties.add(COLUMN_QUALIFIER);
+        properties.add(BATCH_SIZE);
+        return properties;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_SUCCESS);
+        rels.add(FAILURE);
+        return rels;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+        List<FlowFile> flowFiles = session.get(batchSize);
+        if (flowFiles == null || flowFiles.size() == 0) {
+            return;
+        }
+
+        final Map<String,List<PutFlowFile>> tablePuts = new HashMap<>();
+
+        // Group FlowFiles by HBase Table
+        for (final FlowFile flowFile : flowFiles) {
+            final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+            final String row = context.getProperty(ROW).evaluateAttributeExpressions(flowFile).getValue();
+            final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue();
+            final String columnQualifier = context.getProperty(COLUMN_QUALIFIER).evaluateAttributeExpressions(flowFile).getValue();
+
+            if (StringUtils.isBlank(tableName) || StringUtils.isBlank(row) || StringUtils.isBlank(columnFamily) || StringUtils.isBlank(columnQualifier)) {
+                getLogger().error("Invalid FlowFile {} missing table, row, column familiy, or column qualifier; routing to failure", new Object[]{flowFile});
+                session.transfer(flowFile, FAILURE);
+            } else {
+                final byte[] buffer = new byte[(int) flowFile.getSize()];
+                session.read(flowFile, new InputStreamCallback() {
+                    @Override
+                    public void process(final InputStream in) throws IOException {
+                        StreamUtils.fillBuffer(in, buffer);
+                    }
+                });
+
+                final PutFlowFile putFlowFile = new PutFlowFile(tableName, row, columnFamily, columnQualifier, buffer, flowFile);
+
+                List<PutFlowFile> putFlowFiles = tablePuts.get(tableName);
+                if (putFlowFiles == null) {
+                    putFlowFiles = new ArrayList<>();
+                    tablePuts.put(tableName, putFlowFiles);
+                }
+                putFlowFiles.add(putFlowFile);
+            }
+        }
+
+        getLogger().debug("Sending {} FlowFiles to HBase in {} put operations", new Object[] {flowFiles.size(), tablePuts.size()});
+
+        final long start = System.nanoTime();
+        final List<PutFlowFile> successes = new ArrayList<>();
+        final HBaseClientService hBaseClientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
+
+        for (Map.Entry<String, List<PutFlowFile>> entry : tablePuts.entrySet()) {
+            try {
+                hBaseClientService.put(entry.getKey(), entry.getValue());
+                successes.addAll(entry.getValue());
+            } catch (Exception e) {
+                getLogger().error(e.getMessage(), e);
+
+                for (PutFlowFile putFlowFile : entry.getValue()) {
+                    getLogger().error("Failed to send {} to HBase due to {}; routing to failure", new Object[]{putFlowFile.getFlowFile(), e});
+                    final FlowFile failure = session.penalize(putFlowFile.getFlowFile());
+                    session.transfer(failure, FAILURE);
+                }
+            }
+        }
+
+        final long sendMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+        getLogger().debug("Sent {} FlowFiles to HBase successfully in {} milliseconds", new Object[] {successes.size(), sendMillis});
+
+        for (PutFlowFile putFlowFile : successes) {
+            session.transfer(putFlowFile.getFlowFile(), REL_SUCCESS);
+            session.getProvenanceReporter().send(putFlowFile.getFlowFile(), getTransitUri(putFlowFile), sendMillis);
+        }
+
+    }
+
+    protected String getTransitUri(PutFlowFile putFlowFile) {
+        return "hbase://" + putFlowFile.getTableName() + "/" + putFlowFile.getRow() + "/" + putFlowFile.getColumnFamily()
+                + ":" + putFlowFile.getColumnQualifier();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e748fd58/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/io/JsonRowSerializer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/io/JsonRowSerializer.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/io/JsonRowSerializer.java
new file mode 100644
index 0000000..b624853
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/io/JsonRowSerializer.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase.io;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.nifi.hbase.scan.ResultCell;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+
+public class JsonRowSerializer implements RowSerializer {
+
+    private final Charset charset;
+
+    public JsonRowSerializer(final Charset charset) {
+        this.charset = charset;
+    }
+
+    @Override
+    public void serialize(final byte[] rowKey, final ResultCell[] cells, final OutputStream out) throws IOException {
+        final StringBuilder jsonBuilder = new StringBuilder();
+        jsonBuilder.append("{");
+
+        final String row = new String(rowKey, charset);
+        jsonBuilder.append("\"row\":")
+                .append("\"")
+                .append(StringEscapeUtils.escapeJson(row))
+                .append("\"");
+
+        jsonBuilder.append(", \"cells\": {");
+        int i = 0;
+        for (final ResultCell cell : cells) {
+            final String cellFamily = new String(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), charset);
+            final String cellQualifier = new String(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(), charset);
+
+            if (i > 0) {
+                jsonBuilder.append(", ");
+            }
+            jsonBuilder.append("\"")
+                    .append(StringEscapeUtils.escapeJson(cellFamily))
+                    .append(":")
+                    .append(StringEscapeUtils.escapeJson(cellQualifier))
+                    .append("\":\"")
+                    .append(StringEscapeUtils.escapeJson(new String(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(), charset)))
+                    .append("\"");
+            i++;
+        }
+
+        jsonBuilder.append("}}");
+        final String json = jsonBuilder.toString();
+        out.write(json.getBytes(charset));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e748fd58/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/io/RowSerializer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/io/RowSerializer.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/io/RowSerializer.java
new file mode 100644
index 0000000..292b9b6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/io/RowSerializer.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase.io;
+
+import org.apache.nifi.hbase.scan.ResultCell;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public interface RowSerializer {
+
+    /**
+     * Serializes the given row and cells to the provided OutputStream
+     *
+     * @param rowKey the row's key
+     * @param cells the cells to serialize
+     * @param out the OutputStream to serialize to
+     * @throws IOException if unable to serialize the row
+     */
+    void serialize(byte[] rowKey, ResultCell[] cells, OutputStream out) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e748fd58/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/util/ObjectSerDe.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/util/ObjectSerDe.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/util/ObjectSerDe.java
new file mode 100644
index 0000000..9c6e329
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/util/ObjectSerDe.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase.util;
+
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
+import org.apache.nifi.distributed.cache.client.exception.SerializationException;
+import org.apache.nifi.stream.io.ByteArrayInputStream;
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+
+public class ObjectSerDe implements Serializer<Object>, Deserializer<Object> {
+
+    @Override
+    public Object deserialize(byte[] input) throws DeserializationException, IOException {
+        if (input == null || input.length == 0) {
+            return null;
+        }
+
+        try (final ByteArrayInputStream in = new ByteArrayInputStream(input);
+            final ObjectInputStream objIn = new ObjectInputStream(in)) {
+            return objIn.readObject();
+        } catch (ClassNotFoundException e) {
+            throw new DeserializationException("Could not deserialize object due to ClassNotFoundException", e);
+        }
+    }
+
+    @Override
+    public void serialize(Object value, OutputStream output) throws SerializationException, IOException {
+        try (final ByteArrayOutputStream bOut = new ByteArrayOutputStream();
+                final ObjectOutputStream objOut = new ObjectOutputStream(bOut)) {
+            objOut.writeObject(value);
+            output.write(bOut.toByteArray());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e748fd58/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/util/StringSerDe.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/util/StringSerDe.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/util/StringSerDe.java
new file mode 100644
index 0000000..5dd9c51
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/util/StringSerDe.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase.util;
+
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
+import org.apache.nifi.distributed.cache.client.exception.SerializationException;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+
+public class StringSerDe implements Serializer<String>, Deserializer<String> {
+
+    @Override
+    public String deserialize(final byte[] value) throws DeserializationException, IOException {
+        if ( value == null ) {
+            return null;
+        }
+
+        return new String(value, StandardCharsets.UTF_8);
+    }
+
+    @Override
+    public void serialize(final String value, final OutputStream out) throws SerializationException, IOException {
+        out.write(value.getBytes(StandardCharsets.UTF_8));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e748fd58/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000..613515d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.nifi.hbase.GetHBase
+org.apache.nifi.hbase.PutHBaseCell
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/e748fd58/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
new file mode 100644
index 0000000..a2abf7e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.hbase.put.PutFlowFile;
+import org.apache.nifi.hbase.scan.Column;
+import org.apache.nifi.hbase.scan.ResultCell;
+import org.apache.nifi.hbase.scan.ResultHandler;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class MockHBaseClientService extends AbstractControllerService implements HBaseClientService {
+
+    private Map<String,ResultCell[]> results = new HashMap<>();
+    private Map<String, List<PutFlowFile>> puts = new HashMap<>();
+    private boolean throwException = false;
+
+    @Override
+    public void put(String tableName, Collection<PutFlowFile> puts) throws IOException {
+        if (throwException) {
+            throw new IOException("exception");
+        }
+
+        this.puts.put(tableName, new ArrayList<>(puts));
+    }
+
+    @Override
+    public void scan(String tableName, Collection<Column> columns, String filterExpression, long minTime, ResultHandler handler) throws IOException {
+        if (throwException) {
+            throw new IOException("exception");
+        }
+
+        // pass all the staged data to the handler
+        for (final Map.Entry<String,ResultCell[]> entry : results.entrySet()) {
+            handler.handle(entry.getKey().getBytes(StandardCharsets.UTF_8), entry.getValue());
+        }
+    }
+
+    public void addResult(final String rowKey, final Map<String, String> cells, final long timestamp) {
+        final byte[] rowArray = rowKey.getBytes(StandardCharsets.UTF_8);
+
+        final ResultCell[] cellArray = new ResultCell[cells.size()];
+        int i = 0;
+        for (final Map.Entry<String, String> cellEntry : cells.entrySet()) {
+            final ResultCell cell = new ResultCell();
+            cell.setRowArray(rowArray);
+            cell.setRowOffset(0);
+            cell.setRowLength((short) rowArray.length);
+
+            final String cellValue = cellEntry.getValue();
+            final byte[] valueArray = cellValue.getBytes(StandardCharsets.UTF_8);
+            cell.setValueArray(valueArray);
+            cell.setValueOffset(0);
+            cell.setValueLength(valueArray.length);
+
+            final byte[] familyArray = "nifi".getBytes(StandardCharsets.UTF_8);
+            cell.setFamilyArray(familyArray);
+            cell.setFamilyOffset(0);
+            cell.setFamilyLength((byte) familyArray.length);
+
+            final String qualifier = cellEntry.getKey();
+            final byte[] qualifierArray = qualifier.getBytes(StandardCharsets.UTF_8);
+            cell.setQualifierArray(qualifierArray);
+            cell.setQualifierOffset(0);
+            cell.setQualifierLength(qualifierArray.length);
+
+            cell.setTimestamp(timestamp);
+            cellArray[i++] = cell;
+        }
+
+        results.put(rowKey, cellArray);
+    }
+
+    public Map<String, List<PutFlowFile>> getPuts() {
+        return puts;
+    }
+
+    public void setThrowException(boolean throwException) {
+        this.throwException = throwException;
+    }
+}