You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2015/11/19 20:02:29 UTC

[1/2] nifi git commit: NIFI-1174 Refactoring the HBase client API and adding a PutHBaseJSON which can write a whole row from a single json document - Adding Complex Field Strategy to PutHBaseJSON to allow more control of complex fields - Improving error

Repository: nifi
Updated Branches:
  refs/heads/master 8c2323dc8 -> 40dd8a0a8


http://git-wip-us.apache.org/repos/asf/nifi/blob/40dd8a0a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java
index 1575f3c..513ea9c 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.hbase.put.PutColumn;
 import org.apache.nifi.hbase.put.PutFlowFile;
 import org.apache.nifi.hbase.scan.Column;
 import org.apache.nifi.hbase.scan.ResultCell;
@@ -41,6 +42,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -130,8 +132,9 @@ public class TestHBase_1_1_2_ClientService {
         final String columnQualifier = "qualifier1";
         final String content = "content1";
 
-        final PutFlowFile putFlowFile = new PutFlowFile(tableName, row, columnFamily, columnQualifier,
-                content.getBytes(StandardCharsets.UTF_8), null);
+        final Collection<PutColumn> columns = Collections.singletonList(new PutColumn(columnFamily, columnQualifier,
+                content.getBytes(StandardCharsets.UTF_8)));
+        final PutFlowFile putFlowFile = new PutFlowFile(tableName, row, columns, null);
 
         final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
 
@@ -168,11 +171,13 @@ public class TestHBase_1_1_2_ClientService {
         final String content1 = "content1";
         final String content2 = "content2";
 
-        final PutFlowFile putFlowFile1 = new PutFlowFile(tableName, row, columnFamily, columnQualifier,
-                content1.getBytes(StandardCharsets.UTF_8), null);
+        final Collection<PutColumn> columns1 = Collections.singletonList(new PutColumn(columnFamily, columnQualifier,
+                content1.getBytes(StandardCharsets.UTF_8)));
+        final PutFlowFile putFlowFile1 = new PutFlowFile(tableName, row, columns1, null);
 
-        final PutFlowFile putFlowFile2 = new PutFlowFile(tableName, row, columnFamily, columnQualifier,
-                content2.getBytes(StandardCharsets.UTF_8), null);
+        final Collection<PutColumn> columns2 = Collections.singletonList(new PutColumn(columnFamily, columnQualifier,
+                content2.getBytes(StandardCharsets.UTF_8)));
+        final PutFlowFile putFlowFile2 = new PutFlowFile(tableName, row, columns2, null);
 
         final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
 
@@ -214,11 +219,13 @@ public class TestHBase_1_1_2_ClientService {
         final String content1 = "content1";
         final String content2 = "content2";
 
-        final PutFlowFile putFlowFile1 = new PutFlowFile(tableName, row1, columnFamily, columnQualifier,
-                content1.getBytes(StandardCharsets.UTF_8), null);
+        final Collection<PutColumn> columns1 = Collections.singletonList(new PutColumn(columnFamily, columnQualifier,
+                content1.getBytes(StandardCharsets.UTF_8)));
+        final PutFlowFile putFlowFile1 = new PutFlowFile(tableName, row1, columns1, null);
 
-        final PutFlowFile putFlowFile2 = new PutFlowFile(tableName, row2, columnFamily, columnQualifier,
-                content2.getBytes(StandardCharsets.UTF_8), null);
+        final Collection<PutColumn> columns2 = Collections.singletonList(new PutColumn(columnFamily, columnQualifier,
+                content2.getBytes(StandardCharsets.UTF_8)));
+        final PutFlowFile putFlowFile2 = new PutFlowFile(tableName, row2, columns2, null);
 
         final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
 


[2/2] nifi git commit: NIFI-1174 Refactoring the HBase client API and adding a PutHBaseJSON which can write a whole row from a single json document - Adding Complex Field Strategy to PutHBaseJSON to allow more control of complex fields - Improving error

Posted by bb...@apache.org.
NIFI-1174 Refactoring the HBase client API and adding a PutHBaseJSON which can write a whole row from a single json document - Adding Complex Field Strategy to PutHBaseJSON to allow more control of complex fields - Improving error messages to indicate what the problem was with an invalid row

Signed-off-by: Bryan Bende <bb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/40dd8a0a
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/40dd8a0a
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/40dd8a0a

Branch: refs/heads/master
Commit: 40dd8a0a845ef5f4d4fde451f02376ab2fab9758
Parents: 8c2323d
Author: Bryan Bende <bb...@apache.org>
Authored: Wed Nov 18 17:24:49 2015 -0500
Committer: Bryan Bende <bb...@apache.org>
Committed: Thu Nov 19 13:49:02 2015 -0500

----------------------------------------------------------------------
 .../nifi-hbase-processors/pom.xml               |   4 +
 .../nifi/hbase/AbstractHBaseProcessor.java      |  23 -
 .../org/apache/nifi/hbase/AbstractPutHBase.java | 183 ++++++++
 .../java/org/apache/nifi/hbase/GetHBase.java    |   3 +-
 .../org/apache/nifi/hbase/PutHBaseCell.java     | 153 +------
 .../org/apache/nifi/hbase/PutHBaseJSON.java     | 230 ++++++++++
 .../org.apache.nifi.processor.Processor         |   3 +-
 .../org/apache/nifi/hbase/HBaseTestUtil.java    |  87 ++++
 .../nifi/hbase/MockHBaseClientService.java      |  14 +-
 .../org/apache/nifi/hbase/TestPutHBaseCell.java |  60 ++-
 .../org/apache/nifi/hbase/TestPutHBaseJSON.java | 423 +++++++++++++++++++
 .../apache/nifi/hbase/HBaseClientService.java   |  11 +
 .../org/apache/nifi/hbase/put/PutColumn.java    |  47 +++
 .../org/apache/nifi/hbase/put/PutFlowFile.java  |  38 +-
 .../nifi/hbase/HBase_1_1_2_ClientService.java   |  25 +-
 .../hbase/TestHBase_1_1_2_ClientService.java    |  27 +-
 16 files changed, 1119 insertions(+), 212 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/40dd8a0a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml
index b474c6a..abbe4c9 100644
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml
@@ -50,6 +50,10 @@
             <artifactId>commons-lang3</artifactId>
             <version>3.4</version>
         </dependency>
+        <dependency>
+            <groupId>org.codehaus.jackson</groupId>
+            <artifactId>jackson-mapper-asl</artifactId>
+        </dependency>
 
 		<dependency>
             <groupId>org.apache.nifi</groupId>

http://git-wip-us.apache.org/repos/asf/nifi/blob/40dd8a0a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractHBaseProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractHBaseProcessor.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractHBaseProcessor.java
deleted file mode 100644
index 9cce35e..0000000
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractHBaseProcessor.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.hbase;
-
-import org.apache.nifi.processor.AbstractProcessor;
-
-public abstract class AbstractHBaseProcessor extends AbstractProcessor {
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/40dd8a0a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java
new file mode 100644
index 0000000..87424f9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.hbase.put.PutFlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Base class for processors that put data to HBase.
+ */
+public abstract class AbstractPutHBase extends AbstractProcessor {
+
+    protected static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder()
+            .name("HBase Client Service")
+            .description("Specifies the Controller Service to use for accessing HBase.")
+            .required(true)
+            .identifiesControllerService(HBaseClientService.class)
+            .build();
+    protected static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
+            .name("Table Name")
+            .description("The name of the HBase Table to put data into")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    protected static final PropertyDescriptor ROW_ID = new PropertyDescriptor.Builder()
+            .name("Row Identifier")
+            .description("Specifies the Row ID to use when inserting data into HBase")
+            .required(false) // not all sub-classes will require this
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    protected static final PropertyDescriptor COLUMN_FAMILY = new PropertyDescriptor.Builder()
+            .name("Column Family")
+            .description("The Column Family to use when inserting data into HBase")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    protected static final PropertyDescriptor COLUMN_QUALIFIER = new PropertyDescriptor.Builder()
+            .name("Column Qualifier")
+            .description("The Column Qualifier to use when inserting data into HBase")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+            .name("Batch Size")
+            .description("The maximum number of FlowFiles to process in a single execution. The FlowFiles will be " +
+                    "grouped by table, and a single Put per table will be performed.")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("25")
+            .build();
+
+    protected static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is routed to this relationship after it has been successfully stored in HBase")
+            .build();
+    protected static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is routed to this relationship if it cannot be sent to HBase")
+            .build();
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+        List<FlowFile> flowFiles = session.get(batchSize);
+        if (flowFiles == null || flowFiles.size() == 0) {
+            return;
+        }
+
+        final Map<String,List<PutFlowFile>> tablePuts = new HashMap<>();
+
+        // Group FlowFiles by HBase Table
+        for (final FlowFile flowFile : flowFiles) {
+            final PutFlowFile putFlowFile = createPut(session, context, flowFile);
+
+            if (putFlowFile == null) {
+                // sub-classes should log appropriate error messages before returning null
+                session.transfer(flowFile, REL_FAILURE);
+            } else if (!putFlowFile.isValid()) {
+                if (StringUtils.isBlank(putFlowFile.getTableName())) {
+                    getLogger().error("Missing table name for FlowFile {}; routing to failure", new Object[]{flowFile});
+                } else if (StringUtils.isBlank(putFlowFile.getRow())) {
+                    getLogger().error("Missing row id for FlowFile {}; routing to failure", new Object[]{flowFile});
+                } else if (putFlowFile.getColumns() == null || putFlowFile.getColumns().isEmpty()) {
+                    getLogger().error("No columns provided for FlowFile {}; routing to failure", new Object[]{flowFile});
+                } else {
+                    // really shouldn't get here, but just in case
+                    getLogger().error("Failed to produce a put for FlowFile {}; routing to failure", new Object[]{flowFile});
+                }
+                session.transfer(flowFile, REL_FAILURE);
+            } else {
+                List<PutFlowFile> putFlowFiles = tablePuts.get(putFlowFile.getTableName());
+                if (putFlowFiles == null) {
+                    putFlowFiles = new ArrayList<>();
+                    tablePuts.put(putFlowFile.getTableName(), putFlowFiles);
+                }
+                putFlowFiles.add(putFlowFile);
+            }
+        }
+
+        getLogger().debug("Sending {} FlowFiles to HBase in {} put operations", new Object[]{flowFiles.size(), tablePuts.size()});
+
+        final long start = System.nanoTime();
+        final List<PutFlowFile> successes = new ArrayList<>();
+        final HBaseClientService hBaseClientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
+
+        for (Map.Entry<String, List<PutFlowFile>> entry : tablePuts.entrySet()) {
+            try {
+                hBaseClientService.put(entry.getKey(), entry.getValue());
+                successes.addAll(entry.getValue());
+            } catch (Exception e) {
+                getLogger().error(e.getMessage(), e);
+
+                for (PutFlowFile putFlowFile : entry.getValue()) {
+                    getLogger().error("Failed to send {} to HBase due to {}; routing to failure", new Object[]{putFlowFile.getFlowFile(), e});
+                    final FlowFile failure = session.penalize(putFlowFile.getFlowFile());
+                    session.transfer(failure, REL_FAILURE);
+                }
+            }
+        }
+
+        final long sendMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+        getLogger().debug("Sent {} FlowFiles to HBase successfully in {} milliseconds", new Object[]{successes.size(), sendMillis});
+
+        for (PutFlowFile putFlowFile : successes) {
+            session.transfer(putFlowFile.getFlowFile(), REL_SUCCESS);
+            final String details = "Put " + putFlowFile.getColumns().size() + " cells to HBase";
+            session.getProvenanceReporter().send(putFlowFile.getFlowFile(), getTransitUri(putFlowFile), details, sendMillis);
+        }
+
+    }
+
+    protected String getTransitUri(PutFlowFile putFlowFile) {
+        return "hbase://" + putFlowFile.getTableName() + "/" + putFlowFile.getRow();
+    }
+
+    /**
+     * Sub-classes provide the implementation to create a put from a FlowFile.
+     *
+     * @param session
+     *              the current session
+     * @param context
+     *              the current context
+     * @param flowFile
+     *              the FlowFile to create a Put from
+     *
+     * @return a PutFlowFile instance for the given FlowFile
+     */
+    protected abstract PutFlowFile createPut(final ProcessSession session, final ProcessContext context, final FlowFile flowFile);
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/40dd8a0a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java
index 5f08265..98a612c 100644
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java
@@ -41,6 +41,7 @@ import org.apache.nifi.hbase.scan.ResultCell;
 import org.apache.nifi.hbase.scan.ResultHandler;
 import org.apache.nifi.hbase.util.ObjectSerDe;
 import org.apache.nifi.hbase.util.StringSerDe;
+import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
@@ -83,7 +84,7 @@ import java.util.regex.Pattern;
     @WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the data was pulled from"),
     @WritesAttribute(attribute = "mime.type", description = "Set to application/json to indicate that output is JSON")
 })
-public class GetHBase extends AbstractHBaseProcessor {
+public class GetHBase extends AbstractProcessor {
 
     static final Pattern COLUMNS_PATTERN = Pattern.compile("\\w+(:\\w+)?(?:,\\w+(:\\w+)?)*");
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/40dd8a0a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseCell.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseCell.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseCell.java
index 0a2b763..759d91e 100644
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseCell.java
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseCell.java
@@ -16,7 +16,6 @@
  */
 package org.apache.nifi.hbase;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
@@ -24,91 +23,36 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.hbase.put.PutColumn;
 import org.apache.nifi.hbase.put.PutFlowFile;
-import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.stream.io.StreamUtils;
 
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
 
 @EventDriven
 @SupportsBatching
 @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
 @Tags({"hadoop", "hbase"})
 @CapabilityDescription("Adds the Contents of a FlowFile to HBase as the value of a single cell")
-public class PutHBaseCell extends AbstractProcessor {
-
-    protected static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder()
-            .name("HBase Client Service")
-            .description("Specifies the Controller Service to use for accessing HBase.")
-            .required(true)
-            .identifiesControllerService(HBaseClientService.class)
-            .build();
-    protected static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
-            .name("Table Name")
-            .description("The name of the HBase Table to put data into")
-            .required(true)
-            .expressionLanguageSupported(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-    static final PropertyDescriptor ROW = new PropertyDescriptor.Builder()
-            .name("Row Identifier")
-            .description("Specifies the Row ID to use when inserting data into HBase")
-            .required(true)
-            .expressionLanguageSupported(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-    static final PropertyDescriptor COLUMN_FAMILY = new PropertyDescriptor.Builder()
-            .name("Column Family")
-            .description("The Column Family to use when inserting data into HBase")
-            .required(true)
-            .expressionLanguageSupported(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-    static final PropertyDescriptor COLUMN_QUALIFIER = new PropertyDescriptor.Builder()
-            .name("Column Qualifier")
-            .description("The Column Qualifier to use when inserting data into HBase")
-            .required(true)
-            .expressionLanguageSupported(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-    static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
-            .name("Batch Size")
-            .description("The maximum number of FlowFiles to process in a single execution. The FlowFiles will be " +
-                    "grouped by table, and a single Put per table will be performed.")
-            .required(true)
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-            .defaultValue("25")
-            .build();
-
-    static final Relationship REL_SUCCESS = new Relationship.Builder()
-            .name("success")
-            .description("A FlowFile is routed to this relationship after it has been successfully stored in HBase")
-            .build();
-    static final Relationship FAILURE = new Relationship.Builder()
-            .name("failure")
-            .description("A FlowFile is routed to this relationship if it cannot be sent to HBase")
-            .build();
+public class PutHBaseCell extends AbstractPutHBase {
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> properties = new ArrayList<>();
         properties.add(HBASE_CLIENT_SERVICE);
         properties.add(TABLE_NAME);
-        properties.add(ROW);
+        properties.add(ROW_ID);
         properties.add(COLUMN_FAMILY);
         properties.add(COLUMN_QUALIFIER);
         properties.add(BATCH_SIZE);
@@ -119,84 +63,27 @@ public class PutHBaseCell extends AbstractProcessor {
     public Set<Relationship> getRelationships() {
         final Set<Relationship> rels = new HashSet<>();
         rels.add(REL_SUCCESS);
-        rels.add(FAILURE);
+        rels.add(REL_FAILURE);
         return rels;
     }
 
     @Override
-    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
-        List<FlowFile> flowFiles = session.get(batchSize);
-        if (flowFiles == null || flowFiles.size() == 0) {
-            return;
-        }
-
-        final Map<String,List<PutFlowFile>> tablePuts = new HashMap<>();
-
-        // Group FlowFiles by HBase Table
-        for (final FlowFile flowFile : flowFiles) {
-            final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
-            final String row = context.getProperty(ROW).evaluateAttributeExpressions(flowFile).getValue();
-            final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue();
-            final String columnQualifier = context.getProperty(COLUMN_QUALIFIER).evaluateAttributeExpressions(flowFile).getValue();
-
-            if (StringUtils.isBlank(tableName) || StringUtils.isBlank(row) || StringUtils.isBlank(columnFamily) || StringUtils.isBlank(columnQualifier)) {
-                getLogger().error("Invalid FlowFile {} missing table, row, column familiy, or column qualifier; routing to failure", new Object[]{flowFile});
-                session.transfer(flowFile, FAILURE);
-            } else {
-                final byte[] buffer = new byte[(int) flowFile.getSize()];
-                session.read(flowFile, new InputStreamCallback() {
-                    @Override
-                    public void process(final InputStream in) throws IOException {
-                        StreamUtils.fillBuffer(in, buffer);
-                    }
-                });
-
-                final PutFlowFile putFlowFile = new PutFlowFile(tableName, row, columnFamily, columnQualifier, buffer, flowFile);
-
-                List<PutFlowFile> putFlowFiles = tablePuts.get(tableName);
-                if (putFlowFiles == null) {
-                    putFlowFiles = new ArrayList<>();
-                    tablePuts.put(tableName, putFlowFiles);
-                }
-                putFlowFiles.add(putFlowFile);
-            }
-        }
-
-        getLogger().debug("Sending {} FlowFiles to HBase in {} put operations", new Object[] {flowFiles.size(), tablePuts.size()});
-
-        final long start = System.nanoTime();
-        final List<PutFlowFile> successes = new ArrayList<>();
-        final HBaseClientService hBaseClientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
-
-        for (Map.Entry<String, List<PutFlowFile>> entry : tablePuts.entrySet()) {
-            try {
-                hBaseClientService.put(entry.getKey(), entry.getValue());
-                successes.addAll(entry.getValue());
-            } catch (Exception e) {
-                getLogger().error(e.getMessage(), e);
-
-                for (PutFlowFile putFlowFile : entry.getValue()) {
-                    getLogger().error("Failed to send {} to HBase due to {}; routing to failure", new Object[]{putFlowFile.getFlowFile(), e});
-                    final FlowFile failure = session.penalize(putFlowFile.getFlowFile());
-                    session.transfer(failure, FAILURE);
-                }
+    protected PutFlowFile createPut(final ProcessSession session, final ProcessContext context, final FlowFile flowFile) {
+        final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+        final String row = context.getProperty(ROW_ID).evaluateAttributeExpressions(flowFile).getValue();
+        final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue();
+        final String columnQualifier = context.getProperty(COLUMN_QUALIFIER).evaluateAttributeExpressions(flowFile).getValue();
+
+        final byte[] buffer = new byte[(int) flowFile.getSize()];
+        session.read(flowFile, new InputStreamCallback() {
+            @Override
+            public void process(final InputStream in) throws IOException {
+                StreamUtils.fillBuffer(in, buffer);
             }
-        }
-
-        final long sendMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
-        getLogger().debug("Sent {} FlowFiles to HBase successfully in {} milliseconds", new Object[] {successes.size(), sendMillis});
-
-        for (PutFlowFile putFlowFile : successes) {
-            session.transfer(putFlowFile.getFlowFile(), REL_SUCCESS);
-            session.getProvenanceReporter().send(putFlowFile.getFlowFile(), getTransitUri(putFlowFile), sendMillis);
-        }
-
-    }
+        });
 
-    protected String getTransitUri(PutFlowFile putFlowFile) {
-        return "hbase://" + putFlowFile.getTableName() + "/" + putFlowFile.getRow() + "/" + putFlowFile.getColumnFamily()
-                + ":" + putFlowFile.getColumnQualifier();
+        final Collection<PutColumn> columns = Collections.singletonList(new PutColumn(columnFamily, columnQualifier, buffer));
+        return new PutFlowFile(tableName, row, columns, flowFile);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/40dd8a0a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java
new file mode 100644
index 0000000..0dba7ee
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.hbase.put.PutColumn;
+import org.apache.nifi.hbase.put.PutFlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.ObjectHolder;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+@EventDriven
+@SupportsBatching
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"hadoop", "hbase", "put", "json"})
+@CapabilityDescription("Adds rows to HBase based on the contents of incoming JSON documents. Each FlowFile must contain a single " +
+        "UTF-8 encoded JSON document, and any FlowFiles where the root element is not a single document will be routed to failure. " +
+        "Each JSON field name and value will become a column qualifier and value of the HBase row. Any fields with a null value " +
+        "will be skipped, and fields with a complex value will be handled according to the Complex Field Strategy. " +
+        "The row id can be specified either directly on the processor through the Row Identifier property, or can be extracted from the JSON " +
+        "document by specifying the Row Identifier Field Name property. This processor will hold the contents of all FlowFiles for the given batch " +
+        "in memory at one time.")
+public class PutHBaseJSON extends AbstractPutHBase {
+
+    protected static final PropertyDescriptor ROW_FIELD_NAME = new PropertyDescriptor.Builder()
+            .name("Row Identifier Field Name")
+            .description("Specifies the name of a JSON element whose value should be used as the row id for the given JSON document.")
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    protected static final String FAIL_VALUE = "Fail";
+    protected static final String WARN_VALUE = "Warn";
+    protected static final String IGNORE_VALUE = "Ignore";
+    protected static final String TEXT_VALUE = "Text";
+
+    protected static final AllowableValue COMPLEX_FIELD_FAIL = new AllowableValue(FAIL_VALUE, FAIL_VALUE, "Route entire FlowFile to failure if any elements contain complex values.");
+    protected static final AllowableValue COMPLEX_FIELD_WARN = new AllowableValue(WARN_VALUE, WARN_VALUE, "Provide a warning and do not include field in row sent to HBase.");
+    protected static final AllowableValue COMPLEX_FIELD_IGNORE = new AllowableValue(IGNORE_VALUE, IGNORE_VALUE, "Silently ignore and do not include in row sent to HBase.");
+    protected static final AllowableValue COMPLEX_FIELD_TEXT = new AllowableValue(TEXT_VALUE, TEXT_VALUE, "Use the string representation of the complex field as the value of the given column.");
+
+    protected static final PropertyDescriptor COMPLEX_FIELD_STRATEGY = new PropertyDescriptor.Builder()
+            .name("Complex Field Strategy")
+            .description("Indicates how to handle complex fields, i.e. fields that do not have a single text value.")
+            .expressionLanguageSupported(false)
+            .required(true)
+            .allowableValues(COMPLEX_FIELD_FAIL, COMPLEX_FIELD_WARN, COMPLEX_FIELD_IGNORE, COMPLEX_FIELD_TEXT)
+            .defaultValue(COMPLEX_FIELD_TEXT.getValue())
+            .build();
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(HBASE_CLIENT_SERVICE);
+        properties.add(TABLE_NAME);
+        properties.add(ROW_ID);
+        properties.add(ROW_FIELD_NAME);
+        properties.add(COLUMN_FAMILY);
+        properties.add(BATCH_SIZE);
+        properties.add(COMPLEX_FIELD_STRATEGY);
+        return properties;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_SUCCESS);
+        rels.add(REL_FAILURE);
+        return rels;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        final Collection<ValidationResult> results = new ArrayList<>();
+
+        final String rowId = validationContext.getProperty(ROW_ID).getValue();
+        final String rowFieldName = validationContext.getProperty(ROW_FIELD_NAME).getValue();
+
+        if (StringUtils.isBlank(rowId) && StringUtils.isBlank(rowFieldName)) {
+            results.add(new ValidationResult.Builder()
+                    .subject(this.getClass().getSimpleName())
+                    .explanation("Row Identifier or Row Identifier Field Name is required")
+                    .valid(false)
+                    .build());
+        } else if (!StringUtils.isBlank(rowId) && !StringUtils.isBlank(rowFieldName)) {
+            results.add(new ValidationResult.Builder()
+                    .subject(this.getClass().getSimpleName())
+                    .explanation("Row Identifier and Row Identifier Field Name can not be used together")
+                    .valid(false)
+                    .build());
+        }
+
+        return results;
+    }
+
+    @Override
+    protected PutFlowFile createPut(final ProcessSession session, final ProcessContext context, final FlowFile flowFile) {
+        final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+        final String rowId = context.getProperty(ROW_ID).evaluateAttributeExpressions(flowFile).getValue();
+        final String rowFieldName = context.getProperty(ROW_FIELD_NAME).evaluateAttributeExpressions(flowFile).getValue();
+        final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue();
+        final boolean extractRowId = !StringUtils.isBlank(rowFieldName);
+        final String complexFieldStrategy = context.getProperty(COMPLEX_FIELD_STRATEGY).getValue();
+
+        // Parse the JSON document
+        final ObjectMapper mapper = new ObjectMapper();
+        final ObjectHolder<JsonNode> rootNodeRef = new ObjectHolder<>(null);
+        try {
+            session.read(flowFile, new InputStreamCallback() {
+                @Override
+                public void process(final InputStream in) throws IOException {
+                    try (final InputStream bufferedIn = new BufferedInputStream(in)) {
+                        rootNodeRef.set(mapper.readTree(bufferedIn));
+                    }
+                }
+            });
+        } catch (final ProcessException pe) {
+            getLogger().error("Failed to parse {} as JSON due to {}; routing to failure", new Object[]{flowFile, pe.toString()}, pe);
+            return null;
+        }
+
+        final JsonNode rootNode = rootNodeRef.get();
+
+        if (rootNode.isArray()) {
+            getLogger().error("Root node of JSON must be a single document, found array for {}; routing to failure", new Object[]{flowFile});
+            return null;
+        }
+
+        final Collection<PutColumn> columns = new ArrayList<>();
+        final ObjectHolder<String> rowIdHolder = new ObjectHolder<>(null);
+
+        // convert each field/value to a column for the put, skip over nulls and arrays
+        final Iterator<String> fieldNames = rootNode.getFieldNames();
+        while (fieldNames.hasNext()) {
+            final String fieldName = fieldNames.next();
+            final ObjectHolder<String> fieldValueHolder = new ObjectHolder<>(null);
+
+            final JsonNode fieldNode = rootNode.get(fieldName);
+            if (fieldNode.isNull()) {
+                getLogger().debug("Skipping {} because value was null", new Object[]{fieldName});
+            } else if (fieldNode.isValueNode()) {
+                fieldValueHolder.set(fieldNode.asText());
+            } else {
+                // for non-null, non-value nodes, determine what to do based on the handling strategy
+                switch (complexFieldStrategy) {
+                    case FAIL_VALUE:
+                        getLogger().error("Complex value found for {}; routing to failure", new Object[]{fieldName});
+                        return null;
+                    case WARN_VALUE:
+                        getLogger().warn("Complex value found for {}; skipping", new Object[]{fieldName});
+                        break;
+                    case TEXT_VALUE:
+                        // use toString() here because asText() is only guaranteed to be supported on value nodes
+                        // some other types of nodes, like ArrayNode, provide toString implementations
+                        fieldValueHolder.set(fieldNode.toString());
+                        break;
+                    case IGNORE_VALUE:
+                        // silently skip
+                        break;
+                    default:
+                        break;
+                }
+            }
+
+            // if we have a field value, then see if this is the row id field, if so store the value for later
+            // otherwise add a new column where the fieldName and fieldValue are the column qualifier and value
+            if (fieldValueHolder.get() != null) {
+                if (extractRowId && fieldName.equals(rowFieldName)) {
+                    rowIdHolder.set(fieldValueHolder.get());
+                } else {
+                    columns.add(new PutColumn(columnFamily, fieldName, fieldValueHolder.get().getBytes(StandardCharsets.UTF_8)));
+                }
+            }
+        }
+
+        // if we are expecting a field name to use for the row id and the incoming document doesn't have it
+        // log an error message so the user can see what the field names were and return null so it gets routed to failure
+        if (extractRowId && rowIdHolder.get() == null) {
+            final String fieldNameStr = StringUtils.join(rootNode.getFieldNames(), ",");
+            getLogger().error("Row ID field named '{}' not found in field names '{}'; routing to failure", new Object[] {rowFieldName, fieldNameStr});
+            return null;
+        }
+
+        final String putRowId = (extractRowId ? rowIdHolder.get() : rowId);
+        return new PutFlowFile(tableName, putRowId, columns, flowFile);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/40dd8a0a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 613515d..6e2af81 100644
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -14,4 +14,5 @@
 # limitations under the License.
 
 org.apache.nifi.hbase.GetHBase
-org.apache.nifi.hbase.PutHBaseCell
\ No newline at end of file
+org.apache.nifi.hbase.PutHBaseCell
+org.apache.nifi.hbase.PutHBaseJSON
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/40dd8a0a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java
new file mode 100644
index 0000000..fc30f73
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+import org.apache.nifi.hbase.put.PutColumn;
+import org.apache.nifi.hbase.put.PutFlowFile;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertTrue;
+
+public class HBaseTestUtil {
+
+    public static void verifyPut(final String row, final String columnFamily, final Map<String,String> columns, final List<PutFlowFile> puts) {
+        boolean foundPut = false;
+
+        for (final PutFlowFile put : puts) {
+            if (!row.equals(put.getRow())) {
+                continue;
+            }
+
+            if (put.getColumns() == null || put.getColumns().size() != columns.size()) {
+                continue;
+            }
+
+            // start off assuming we have all the columns
+            boolean foundAllColumns = true;
+
+            for (Map.Entry<String, String> entry : columns.entrySet()) {
+                // determine if we have the current expected column
+                boolean foundColumn = false;
+                for (PutColumn putColumn : put.getColumns()) {
+                    final String colVal = new String(putColumn.getBuffer(), StandardCharsets.UTF_8);
+                    if (columnFamily.equals(putColumn.getColumnFamily()) && entry.getKey().equals(putColumn.getColumnQualifier())
+                            && entry.getValue().equals(colVal)) {
+                        foundColumn = true;
+                        break;
+                    }
+                }
+
+                // if we didn't have the current expected column we know we don't have all expected columns
+                if (!foundColumn) {
+                    foundAllColumns = false;
+                    break;
+                }
+            }
+
+            // if we found all the expected columns this was a match so we can break
+            if (foundAllColumns) {
+                foundPut = true;
+                break;
+            }
+        }
+
+        assertTrue(foundPut);
+    }
+
+    public static void verifyEvent(final List<ProvenanceEventRecord> events, final String uri, final ProvenanceEventType eventType) {
+        boolean foundEvent = false;
+        for (final ProvenanceEventRecord event : events) {
+            if (event.getTransitUri().equals(uri) && event.getEventType().equals(eventType)) {
+                foundEvent = true;
+                break;
+            }
+        }
+        assertTrue(foundEvent);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/40dd8a0a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
index a2abf7e..bca8b4f 100644
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.hbase;
 
 import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.hbase.put.PutColumn;
 import org.apache.nifi.hbase.put.PutFlowFile;
 import org.apache.nifi.hbase.scan.Column;
 import org.apache.nifi.hbase.scan.ResultCell;
@@ -33,7 +34,7 @@ import java.util.Map;
 public class MockHBaseClientService extends AbstractControllerService implements HBaseClientService {
 
     private Map<String,ResultCell[]> results = new HashMap<>();
-    private Map<String, List<PutFlowFile>> puts = new HashMap<>();
+    private Map<String, List<PutFlowFile>> flowFilePuts = new HashMap<>();
     private boolean throwException = false;
 
     @Override
@@ -42,7 +43,12 @@ public class MockHBaseClientService extends AbstractControllerService implements
             throw new IOException("exception");
         }
 
-        this.puts.put(tableName, new ArrayList<>(puts));
+        this.flowFilePuts.put(tableName, new ArrayList<>(puts));
+    }
+
+    @Override
+    public void put(String tableName, String rowId, Collection<PutColumn> columns) throws IOException {
+       throw new UnsupportedOperationException();
     }
 
     @Override
@@ -92,8 +98,8 @@ public class MockHBaseClientService extends AbstractControllerService implements
         results.put(rowKey, cellArray);
     }
 
-    public Map<String, List<PutFlowFile>> getPuts() {
-        return puts;
+    public Map<String, List<PutFlowFile>> getFlowFilePuts() {
+        return flowFilePuts;
     }
 
     public void setThrowException(boolean throwException) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/40dd8a0a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseCell.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseCell.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseCell.java
index 62fa9a6..0cd8ff7 100644
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseCell.java
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseCell.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.hbase;
 
+import org.apache.nifi.hbase.put.PutColumn;
 import org.apache.nifi.hbase.put.PutFlowFile;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.MockFlowFile;
@@ -43,7 +44,7 @@ public class TestPutHBaseCell {
 
         final TestRunner runner = TestRunners.newTestRunner(PutHBaseCell.class);
         runner.setProperty(PutHBaseCell.TABLE_NAME, tableName);
-        runner.setProperty(PutHBaseCell.ROW, row);
+        runner.setProperty(PutHBaseCell.ROW_ID, row);
         runner.setProperty(PutHBaseCell.COLUMN_FAMILY, columnFamily);
         runner.setProperty(PutHBaseCell.COLUMN_QUALIFIER, columnQualifier);
         runner.setProperty(PutHBaseCell.BATCH_SIZE, "1");
@@ -58,12 +59,14 @@ public class TestPutHBaseCell {
         final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
         outFile.assertContentEquals(content);
 
-        assertNotNull(hBaseClient.getPuts());
-        assertEquals(1, hBaseClient.getPuts().size());
+        assertNotNull(hBaseClient.getFlowFilePuts());
+        assertEquals(1, hBaseClient.getFlowFilePuts().size());
 
-        List<PutFlowFile> puts = hBaseClient.getPuts().get(tableName);
+        List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(tableName);
         assertEquals(1, puts.size());
         verifyPut(row, columnFamily, columnQualifier, content, puts.get(0));
+
+        assertEquals(1, runner.getProvenanceEvents().size());
     }
 
     @Test
@@ -89,12 +92,14 @@ public class TestPutHBaseCell {
         final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
         outFile.assertContentEquals(content);
 
-        assertNotNull(hBaseClient.getPuts());
-        assertEquals(1, hBaseClient.getPuts().size());
+        assertNotNull(hBaseClient.getFlowFilePuts());
+        assertEquals(1, hBaseClient.getFlowFilePuts().size());
 
-        List<PutFlowFile> puts = hBaseClient.getPuts().get(tableName);
+        List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(tableName);
         assertEquals(1, puts.size());
         verifyPut(row, columnFamily, columnQualifier, content, puts.get(0));
+
+        assertEquals(1, runner.getProvenanceEvents().size());
     }
 
     @Test
@@ -115,7 +120,9 @@ public class TestPutHBaseCell {
         runner.run();
 
         runner.assertTransferCount(PutHBaseCell.REL_SUCCESS, 0);
-        runner.assertTransferCount(PutHBaseCell.FAILURE, 1);
+        runner.assertTransferCount(PutHBaseCell.REL_FAILURE, 1);
+
+        assertEquals(0, runner.getProvenanceEvents().size());
     }
 
     @Test
@@ -142,7 +149,9 @@ public class TestPutHBaseCell {
 
         runner.run();
         runner.assertTransferCount(PutHBaseCell.REL_SUCCESS, 1);
-        runner.assertTransferCount(PutHBaseCell.FAILURE, 1);
+        runner.assertTransferCount(PutHBaseCell.REL_FAILURE, 1);
+
+        assertEquals(1, runner.getProvenanceEvents().size());
     }
 
     @Test
@@ -171,13 +180,15 @@ public class TestPutHBaseCell {
         final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
         outFile.assertContentEquals(content1);
 
-        assertNotNull(hBaseClient.getPuts());
-        assertEquals(1, hBaseClient.getPuts().size());
+        assertNotNull(hBaseClient.getFlowFilePuts());
+        assertEquals(1, hBaseClient.getFlowFilePuts().size());
 
-        List<PutFlowFile> puts = hBaseClient.getPuts().get(tableName);
+        List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(tableName);
         assertEquals(2, puts.size());
         verifyPut(row1, columnFamily, columnQualifier, content1, puts.get(0));
         verifyPut(row2, columnFamily, columnQualifier, content2, puts.get(1));
+
+        assertEquals(2, runner.getProvenanceEvents().size());
     }
 
     @Test
@@ -202,7 +213,9 @@ public class TestPutHBaseCell {
         runner.enqueue(content2.getBytes("UTF-8"), attributes2);
 
         runner.run();
-        runner.assertAllFlowFilesTransferred(PutHBaseCell.FAILURE, 2);
+        runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_FAILURE, 2);
+
+        assertEquals(0, runner.getProvenanceEvents().size());
     }
 
     @Test
@@ -229,13 +242,15 @@ public class TestPutHBaseCell {
         final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
         outFile.assertContentEquals(content1);
 
-        assertNotNull(hBaseClient.getPuts());
-        assertEquals(1, hBaseClient.getPuts().size());
+        assertNotNull(hBaseClient.getFlowFilePuts());
+        assertEquals(1, hBaseClient.getFlowFilePuts().size());
 
-        List<PutFlowFile> puts = hBaseClient.getPuts().get(tableName);
+        List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(tableName);
         assertEquals(2, puts.size());
         verifyPut(row, columnFamily, columnQualifier, content1, puts.get(0));
         verifyPut(row, columnFamily, columnQualifier, content2, puts.get(1));
+
+        assertEquals(2, runner.getProvenanceEvents().size());
     }
 
     private Map<String, String> getAtrributeMapWithEL(String tableName, String row, String columnFamily, String columnQualifier) {
@@ -250,7 +265,7 @@ public class TestPutHBaseCell {
     private TestRunner getTestRunnerWithEL(PutHBaseCell proc) {
         final TestRunner runner = TestRunners.newTestRunner(proc);
         runner.setProperty(PutHBaseCell.TABLE_NAME, "${hbase.tableName}");
-        runner.setProperty(PutHBaseCell.ROW, "${hbase.row}");
+        runner.setProperty(PutHBaseCell.ROW_ID, "${hbase.row}");
         runner.setProperty(PutHBaseCell.COLUMN_FAMILY, "${hbase.columnFamily}");
         runner.setProperty(PutHBaseCell.COLUMN_QUALIFIER, "${hbase.columnQualifier}");
         return runner;
@@ -266,9 +281,14 @@ public class TestPutHBaseCell {
 
     private void verifyPut(String row, String columnFamily, String columnQualifier, String content, PutFlowFile put) {
         assertEquals(row, put.getRow());
-        assertEquals(columnFamily, put.getColumnFamily());
-        assertEquals(columnQualifier, put.getColumnQualifier());
-        assertEquals(content, new String(put.getBuffer(), StandardCharsets.UTF_8));
+
+        assertNotNull(put.getColumns());
+        assertEquals(1, put.getColumns().size());
+
+        final PutColumn column = put.getColumns().iterator().next();
+        assertEquals(columnFamily, column.getColumnFamily());
+        assertEquals(columnQualifier, column.getColumnQualifier());
+        assertEquals(content, new String(column.getBuffer(), StandardCharsets.UTF_8));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/40dd8a0a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java
new file mode 100644
index 0000000..7b59919
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+import org.apache.nifi.hbase.put.PutFlowFile;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class TestPutHBaseJSON {
+
+    public static final String DEFAULT_TABLE_NAME = "nifi";
+    public static final String DEFAULT_ROW = "row1";
+    public static final String DEFAULT_COLUMN_FAMILY = "family1";
+
+    @Test
+    public void testCustomValidate() throws InitializationException {
+        // missing row id and row id field name should be invalid
+        TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
+        getHBaseClientService(runner);
+        runner.assertNotValid();
+
+        // setting both properties should still be invalid
+        runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
+        getHBaseClientService(runner);
+        runner.setProperty(PutHBaseJSON.ROW_ID, "rowId");
+        runner.setProperty(PutHBaseJSON.ROW_FIELD_NAME, "rowFieldName");
+        runner.assertNotValid();
+
+        // only a row id field name should make it valid
+        runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
+        getHBaseClientService(runner);
+        runner.setProperty(PutHBaseJSON.ROW_FIELD_NAME, "rowFieldName");
+        runner.assertValid();
+
+        // only a row id should make it valid
+        runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
+        getHBaseClientService(runner);
+        runner.setProperty(PutHBaseJSON.ROW_ID, "rowId");
+        runner.assertValid();
+    }
+
+    @Test
+    public void testSingleJsonDocAndProvidedRowId() throws IOException, InitializationException {
+        final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
+        final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
+        runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
+
+        final String content = "{ \"field1\" : \"value1\", \"field2\" : \"value2\" }";
+        runner.enqueue(content.getBytes("UTF-8"));
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
+
+        final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
+        outFile.assertContentEquals(content);
+
+        assertNotNull(hBaseClient.getFlowFilePuts());
+        assertEquals(1, hBaseClient.getFlowFilePuts().size());
+
+        final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME);
+        assertEquals(1, puts.size());
+
+        final Map<String,String> expectedColumns = new HashMap<>();
+        expectedColumns.put("field1", "value1");
+        expectedColumns.put("field2", "value2");
+        HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts);
+
+        final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
+        assertEquals(1, events.size());
+
+        final ProvenanceEventRecord event = events.get(0);
+        assertEquals("hbase://" + DEFAULT_TABLE_NAME + "/" + DEFAULT_ROW, event.getTransitUri());
+    }
+
+    @Test
+    public void testSingJsonDocAndExtractedRowId() throws IOException, InitializationException {
+        final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
+        final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
+        runner.setProperty(PutHBaseJSON.ROW_FIELD_NAME, "rowField");
+
+        final String content = "{ \"rowField\" : \"myRowId\", \"field1\" : \"value1\", \"field2\" : \"value2\" }";
+        runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
+
+        final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
+        outFile.assertContentEquals(content);
+
+        assertNotNull(hBaseClient.getFlowFilePuts());
+        assertEquals(1, hBaseClient.getFlowFilePuts().size());
+
+        final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME);
+        assertEquals(1, puts.size());
+
+        // should be a put with row id of myRowId, and rowField shouldn't end up in the columns
+        final Map<String,String> expectedColumns1 = new HashMap<>();
+        expectedColumns1.put("field1", "value1");
+        expectedColumns1.put("field2", "value2");
+        HBaseTestUtil.verifyPut("myRowId", DEFAULT_COLUMN_FAMILY, expectedColumns1, puts);
+
+        final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
+        assertEquals(1, events.size());
+        HBaseTestUtil.verifyEvent(runner.getProvenanceEvents(), "hbase://" + DEFAULT_TABLE_NAME + "/myRowId", ProvenanceEventType.SEND);
+    }
+
+    @Test
+    public void testSingJsonDocAndExtractedRowIdMissingField() throws IOException, InitializationException {
+        final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
+        final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
+        runner.setProperty(PutHBaseJSON.ROW_FIELD_NAME, "rowField");
+
+        final String content = "{ \"field1\" : \"value1\", \"field2\" : \"value2\" }";
+        runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_FAILURE, 1);
+
+        final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_FAILURE).get(0);
+        outFile.assertContentEquals(content);
+
+        // should be no provenance events
+        assertEquals(0, runner.getProvenanceEvents().size());
+
+        // no puts should have made it to the client
+        assertEquals(0, hBaseClient.getFlowFilePuts().size());
+    }
+
+    @Test
+    public void testMultipleJsonDocsRouteToFailure() throws IOException, InitializationException {
+        final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
+        final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
+        runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
+
+        final String content1 = "{ \"field1\" : \"value1\", \"field2\" : \"value2\" }";
+        final String content2 = "{ \"field3\" : \"value3\", \"field4\" : \"value4\" }";
+        final String content = "[ " + content1 + " , " + content2 + " ]";
+
+        runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_FAILURE, 1);
+
+        final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_FAILURE).get(0);
+        outFile.assertContentEquals(content);
+
+        // should be no provenance events
+        assertEquals(0, runner.getProvenanceEvents().size());
+
+        // no puts should have made it to the client
+        assertEquals(0, hBaseClient.getFlowFilePuts().size());
+    }
+
+    @Test
+    public void testELWithProvidedRowId() throws IOException, InitializationException {
+        final TestRunner runner = getTestRunner("${hbase.table}", "${hbase.colFamily}", "1");
+        final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
+        runner.setProperty(PutHBaseJSON.ROW_ID, "${hbase.rowId}");
+
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put("hbase.table", "myTable");
+        attributes.put("hbase.colFamily", "myColFamily");
+        attributes.put("hbase.rowId", "myRowId");
+
+        final String content = "{ \"field1\" : \"value1\", \"field2\" : \"value2\" }";
+        runner.enqueue(content.getBytes("UTF-8"), attributes);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
+
+        final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
+        outFile.assertContentEquals(content);
+
+        assertNotNull(hBaseClient.getFlowFilePuts());
+        assertEquals(1, hBaseClient.getFlowFilePuts().size());
+
+        final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get("myTable");
+        assertEquals(1, puts.size());
+
+        final Map<String,String> expectedColumns = new HashMap<>();
+        expectedColumns.put("field1", "value1");
+        expectedColumns.put("field2", "value2");
+        HBaseTestUtil.verifyPut("myRowId", "myColFamily", expectedColumns, puts);
+
+        final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
+        assertEquals(1, events.size());
+        HBaseTestUtil.verifyEvent(runner.getProvenanceEvents(), "hbase://myTable/myRowId", ProvenanceEventType.SEND);
+    }
+
+    @Test
+    public void testELWithExtractedRowId() throws IOException, InitializationException {
+        final TestRunner runner = getTestRunner("${hbase.table}", "${hbase.colFamily}", "1");
+        final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
+        runner.setProperty(PutHBaseJSON.ROW_FIELD_NAME, "${hbase.rowField}");
+
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put("hbase.table", "myTable");
+        attributes.put("hbase.colFamily", "myColFamily");
+        attributes.put("hbase.rowField", "field1");
+
+        final String content = "{ \"field1\" : \"value1\", \"field2\" : \"value2\" }";
+        runner.enqueue(content.getBytes("UTF-8"), attributes);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
+
+        final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
+        outFile.assertContentEquals(content);
+
+        assertNotNull(hBaseClient.getFlowFilePuts());
+        assertEquals(1, hBaseClient.getFlowFilePuts().size());
+
+        final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get("myTable");
+        assertEquals(1, puts.size());
+
+        final Map<String,String> expectedColumns = new HashMap<>();
+        expectedColumns.put("field2", "value2");
+        HBaseTestUtil.verifyPut("value1", "myColFamily", expectedColumns, puts);
+
+        final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
+        assertEquals(1, events.size());
+        HBaseTestUtil.verifyEvent(runner.getProvenanceEvents(), "hbase://myTable/value1", ProvenanceEventType.SEND);
+    }
+
+    @Test
+    public void testNullAndArrayElementsWithWarnStrategy() throws InitializationException {
+        final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
+        final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
+        runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
+        runner.setProperty(PutHBaseJSON.COMPLEX_FIELD_STRATEGY, PutHBaseJSON.COMPLEX_FIELD_WARN.getValue());
+
+        // should route to success because there is at least one valid field
+        final String content = "{ \"field1\" : [{ \"child_field1\" : \"child_value1\" }], \"field2\" : \"value2\", \"field3\" : null }";
+        runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
+
+        assertNotNull(hBaseClient.getFlowFilePuts());
+        assertEquals(1, hBaseClient.getFlowFilePuts().size());
+
+        final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME);
+        assertEquals(1, puts.size());
+
+        // should have skipped field1 and field3
+        final Map<String,String> expectedColumns = new HashMap<>();
+        expectedColumns.put("field2", "value2");
+        HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts);
+    }
+
+    @Test
+    public void testNullAndArrayElementsWithIgnoreStrategy() throws InitializationException {
+        final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
+        final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
+        runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
+        runner.setProperty(PutHBaseJSON.COMPLEX_FIELD_STRATEGY, PutHBaseJSON.COMPLEX_FIELD_IGNORE.getValue());
+
+        // should route to success because there is at least one valid field
+        final String content = "{ \"field1\" : [{ \"child_field1\" : \"child_value1\" }], \"field2\" : \"value2\", \"field3\" : null }";
+        runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
+
+        assertNotNull(hBaseClient.getFlowFilePuts());
+        assertEquals(1, hBaseClient.getFlowFilePuts().size());
+
+        final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME);
+        assertEquals(1, puts.size());
+
+        // should have skipped field1 and field3
+        final Map<String,String> expectedColumns = new HashMap<>();
+        expectedColumns.put("field2", "value2");
+        HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts);
+    }
+
+    @Test
+    public void testNullAndArrayElementsWithFailureStrategy() throws InitializationException {
+        final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
+        final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
+        runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
+        runner.setProperty(PutHBaseJSON.COMPLEX_FIELD_STRATEGY, PutHBaseJSON.COMPLEX_FIELD_FAIL.getValue());
+
+        // should route to success because there is at least one valid field
+        final String content = "{ \"field1\" : [{ \"child_field1\" : \"child_value1\" }], \"field2\" : \"value2\", \"field3\" : null }";
+        runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_FAILURE, 1);
+
+        final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_FAILURE).get(0);
+        outFile.assertContentEquals(content);
+
+        // should be no provenance events
+        assertEquals(0, runner.getProvenanceEvents().size());
+
+        // no puts should have made it to the client
+        assertEquals(0, hBaseClient.getFlowFilePuts().size());
+    }
+
+    @Test
+    public void testNullAndArrayElementsWithTextStrategy() throws InitializationException {
+        final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
+        final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
+        runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
+        runner.setProperty(PutHBaseJSON.COMPLEX_FIELD_STRATEGY, PutHBaseJSON.COMPLEX_FIELD_TEXT.getValue());
+
+        // should route to success because there is at least one valid field
+        final String content = "{ \"field1\" : [{ \"child_field1\" : \"child_value1\" }], \"field2\" : \"value2\", \"field3\" : null }";
+        runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
+
+        assertNotNull(hBaseClient.getFlowFilePuts());
+        assertEquals(1, hBaseClient.getFlowFilePuts().size());
+
+        final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME);
+        assertEquals(1, puts.size());
+
+        // should have skipped field1 and field3
+        final Map<String,String> expectedColumns = new HashMap<>();
+        expectedColumns.put("field1", "[{\"child_field1\":\"child_value1\"}]");
+        expectedColumns.put("field2", "value2");
+        HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts);
+    }
+
+    @Test
+    public void testNestedDocWithTextStrategy() throws InitializationException {
+        final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
+        final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
+        runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
+        runner.setProperty(PutHBaseJSON.COMPLEX_FIELD_STRATEGY, PutHBaseJSON.COMPLEX_FIELD_TEXT.getValue());
+
+        // should route to success because there is at least one valid field
+        final String content = "{ \"field1\" : { \"child_field1\" : \"child_value1\" }, \"field2\" : \"value2\", \"field3\" : null }";
+        runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
+
+        assertNotNull(hBaseClient.getFlowFilePuts());
+        assertEquals(1, hBaseClient.getFlowFilePuts().size());
+
+        final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME);
+        assertEquals(1, puts.size());
+
+        // should have skipped field1 and field3
+        final Map<String,String> expectedColumns = new HashMap<>();
+        expectedColumns.put("field1", "{\"child_field1\":\"child_value1\"}");
+        expectedColumns.put("field2", "value2");
+        HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts);
+    }
+
+    @Test
+    public void testAllElementsAreNullOrArrays() throws InitializationException {
+        final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
+        final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
+        runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
+        runner.setProperty(PutHBaseJSON.COMPLEX_FIELD_STRATEGY, PutHBaseJSON.COMPLEX_FIELD_WARN.getValue());
+
+        // should route to failure since it would produce a put with no columns
+        final String content = "{ \"field1\" : [{ \"child_field1\" : \"child_value1\" }],  \"field2\" : null }";
+        runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_FAILURE, 1);
+
+        final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_FAILURE).get(0);
+        outFile.assertContentEquals(content);
+
+        // should be no provenance events
+        assertEquals(0, runner.getProvenanceEvents().size());
+
+        // no puts should have made it to the client
+        assertEquals(0, hBaseClient.getFlowFilePuts().size());
+    }
+
+    @Test
+    public void testInvalidJson() throws InitializationException {
+        final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
+        getHBaseClientService(runner);
+        runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
+
+        final String content = "NOT JSON";
+        runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_FAILURE, 1);
+    }
+
+    private TestRunner getTestRunner(String table, String columnFamily, String batchSize) {
+        final TestRunner runner = TestRunners.newTestRunner(PutHBaseJSON.class);
+        runner.setProperty(PutHBaseJSON.TABLE_NAME, table);
+        runner.setProperty(PutHBaseJSON.COLUMN_FAMILY, columnFamily);
+        runner.setProperty(PutHBaseJSON.BATCH_SIZE, batchSize);
+        return runner;
+    }
+
+    private MockHBaseClientService getHBaseClientService(final TestRunner runner) throws InitializationException {
+        final MockHBaseClientService hBaseClient = new MockHBaseClientService();
+        runner.addControllerService("hbaseClient", hBaseClient);
+        runner.enableControllerService(hBaseClient);
+        runner.setProperty(PutHBaseCell.HBASE_CLIENT_SERVICE, "hbaseClient");
+        return hBaseClient;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/40dd8a0a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
index 9ff2c46..79eef92 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
@@ -20,6 +20,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.hbase.put.PutColumn;
 import org.apache.nifi.hbase.put.PutFlowFile;
 import org.apache.nifi.hbase.scan.Column;
 import org.apache.nifi.hbase.scan.ResultHandler;
@@ -74,6 +75,16 @@ public interface HBaseClientService extends ControllerService {
     void put(String tableName, Collection<PutFlowFile> puts) throws IOException;
 
     /**
+     * Puts the given row to HBase with the provided columns.
+     *
+     * @param tableName the name of an HBase table
+     * @param rowId the id of the row to put
+     * @param columns the columns of the row to put
+     * @throws IOException thrown when there are communication errors with HBase
+     */
+    void put(String tableName, String rowId, Collection<PutColumn> columns) throws IOException;
+
+    /**
      * Scans the given table using the optional filter criteria and passing each result to the provided handler.
      *
      * @param tableName the name of an HBase table to scan

http://git-wip-us.apache.org/repos/asf/nifi/blob/40dd8a0a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutColumn.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutColumn.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutColumn.java
new file mode 100644
index 0000000..0971f94
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutColumn.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase.put;
+
+/**
+ * Encapsulates the information for one column of a put operation.
+ */
+public class PutColumn {
+
+    private final String columnFamily;
+    private final String columnQualifier;
+    private final byte[] buffer;
+
+
+    public PutColumn(final String columnFamily, final String columnQualifier, final byte[] buffer) {
+        this.columnFamily = columnFamily;
+        this.columnQualifier = columnQualifier;
+        this.buffer = buffer;
+    }
+
+    public String getColumnFamily() {
+        return columnFamily;
+    }
+
+    public String getColumnQualifier() {
+        return columnQualifier;
+    }
+
+    public byte[] getBuffer() {
+        return buffer;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/40dd8a0a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutFlowFile.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutFlowFile.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutFlowFile.java
index ed6319e..a97e3a4 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutFlowFile.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutFlowFile.java
@@ -16,8 +16,11 @@
  */
 package org.apache.nifi.hbase.put;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.flowfile.FlowFile;
 
+import java.util.Collection;
+
 /**
  * Wrapper to encapsulate all of the information for the Put along with the FlowFile.
  */
@@ -25,18 +28,13 @@ public class PutFlowFile {
 
     private final String tableName;
     private final String row;
-    private final String columnFamily;
-    private final String columnQualifier;
-    private final byte[] buffer;
+    private final Collection<PutColumn> columns;
     private final FlowFile flowFile;
 
-    public PutFlowFile(String tableName, String row, String columnFamily, String columnQualifier,
-                       byte[] buffer, FlowFile flowFile) {
+    public PutFlowFile(String tableName, String row, Collection<PutColumn> columns, FlowFile flowFile) {
         this.tableName = tableName;
         this.row = row;
-        this.columnFamily = columnFamily;
-        this.columnQualifier = columnQualifier;
-        this.buffer = buffer;
+        this.columns = columns;
         this.flowFile = flowFile;
     }
 
@@ -48,20 +46,26 @@ public class PutFlowFile {
         return row;
     }
 
-    public String getColumnFamily() {
-        return columnFamily;
+    public Collection<PutColumn> getColumns() {
+        return columns;
     }
 
-    public String getColumnQualifier() {
-        return columnQualifier;
+    public FlowFile getFlowFile() {
+        return flowFile;
     }
 
-    public byte[] getBuffer() {
-        return buffer;
-    }
+    public boolean isValid() {
+        if (StringUtils.isBlank(tableName) || StringUtils.isBlank(row) || flowFile == null || columns == null || columns.isEmpty()) {
+            return false;
+        }
 
-    public FlowFile getFlowFile() {
-        return flowFile;
+        for (PutColumn column : columns) {
+            if (StringUtils.isBlank(column.getColumnQualifier()) || StringUtils.isBlank(column.getColumnFamily()) || column.getBuffer() == null) {
+                return false;
+            }
+        }
+
+        return true;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/40dd8a0a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
index 42590c2..b207191 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
@@ -43,6 +43,7 @@ import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.hbase.put.PutColumn;
 import org.apache.nifi.hbase.put.PutFlowFile;
 import org.apache.nifi.hbase.scan.Column;
 import org.apache.nifi.hbase.scan.ResultCell;
@@ -195,9 +196,13 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
                     put = new Put(putFlowFile.getRow().getBytes(StandardCharsets.UTF_8));
                     rowPuts.put(putFlowFile.getRow(), put);
                 }
-                put.addColumn(putFlowFile.getColumnFamily().getBytes(StandardCharsets.UTF_8),
-                        putFlowFile.getColumnQualifier().getBytes(StandardCharsets.UTF_8),
-                        putFlowFile.getBuffer());
+
+                for (final PutColumn column : putFlowFile.getColumns()) {
+                    put.addColumn(
+                            column.getColumnFamily().getBytes(StandardCharsets.UTF_8),
+                            column.getColumnQualifier().getBytes(StandardCharsets.UTF_8),
+                            column.getBuffer());
+                }
             }
 
             table.put(new ArrayList<>(rowPuts.values()));
@@ -205,6 +210,20 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
     }
 
     @Override
+    public void put(final String tableName, final String rowId, final Collection<PutColumn> columns) throws IOException {
+        try (final Table table = connection.getTable(TableName.valueOf(tableName))) {
+            Put put = new Put(rowId.getBytes(StandardCharsets.UTF_8));
+            for (final PutColumn column : columns) {
+                put.addColumn(
+                        column.getColumnFamily().getBytes(StandardCharsets.UTF_8),
+                        column.getColumnQualifier().getBytes(StandardCharsets.UTF_8),
+                        column.getBuffer());
+            }
+            table.put(put);
+        }
+    }
+
+    @Override
     public void scan(final String tableName, final Collection<Column> columns, final String filterExpression, final long minTime, final ResultHandler handler)
             throws IOException {