You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2015/11/19 20:02:29 UTC
[1/2] nifi git commit: NIFI-1174 Refactoring the HBase client API and
adding a PutHBaseJSON which can write a whole row from a single json document
- Adding Complex Field Strategy to PutHBaseJSON to allow more control of
complex fields - Improving error
Repository: nifi
Updated Branches:
refs/heads/master 8c2323dc8 -> 40dd8a0a8
http://git-wip-us.apache.org/repos/asf/nifi/blob/40dd8a0a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java
index 1575f3c..513ea9c 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.hbase.put.PutColumn;
import org.apache.nifi.hbase.put.PutFlowFile;
import org.apache.nifi.hbase.scan.Column;
import org.apache.nifi.hbase.scan.ResultCell;
@@ -41,6 +42,7 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
@@ -130,8 +132,9 @@ public class TestHBase_1_1_2_ClientService {
final String columnQualifier = "qualifier1";
final String content = "content1";
- final PutFlowFile putFlowFile = new PutFlowFile(tableName, row, columnFamily, columnQualifier,
- content.getBytes(StandardCharsets.UTF_8), null);
+ final Collection<PutColumn> columns = Collections.singletonList(new PutColumn(columnFamily, columnQualifier,
+ content.getBytes(StandardCharsets.UTF_8)));
+ final PutFlowFile putFlowFile = new PutFlowFile(tableName, row, columns, null);
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
@@ -168,11 +171,13 @@ public class TestHBase_1_1_2_ClientService {
final String content1 = "content1";
final String content2 = "content2";
- final PutFlowFile putFlowFile1 = new PutFlowFile(tableName, row, columnFamily, columnQualifier,
- content1.getBytes(StandardCharsets.UTF_8), null);
+ final Collection<PutColumn> columns1 = Collections.singletonList(new PutColumn(columnFamily, columnQualifier,
+ content1.getBytes(StandardCharsets.UTF_8)));
+ final PutFlowFile putFlowFile1 = new PutFlowFile(tableName, row, columns1, null);
- final PutFlowFile putFlowFile2 = new PutFlowFile(tableName, row, columnFamily, columnQualifier,
- content2.getBytes(StandardCharsets.UTF_8), null);
+ final Collection<PutColumn> columns2 = Collections.singletonList(new PutColumn(columnFamily, columnQualifier,
+ content2.getBytes(StandardCharsets.UTF_8)));
+ final PutFlowFile putFlowFile2 = new PutFlowFile(tableName, row, columns2, null);
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
@@ -214,11 +219,13 @@ public class TestHBase_1_1_2_ClientService {
final String content1 = "content1";
final String content2 = "content2";
- final PutFlowFile putFlowFile1 = new PutFlowFile(tableName, row1, columnFamily, columnQualifier,
- content1.getBytes(StandardCharsets.UTF_8), null);
+ final Collection<PutColumn> columns1 = Collections.singletonList(new PutColumn(columnFamily, columnQualifier,
+ content1.getBytes(StandardCharsets.UTF_8)));
+ final PutFlowFile putFlowFile1 = new PutFlowFile(tableName, row1, columns1, null);
- final PutFlowFile putFlowFile2 = new PutFlowFile(tableName, row2, columnFamily, columnQualifier,
- content2.getBytes(StandardCharsets.UTF_8), null);
+ final Collection<PutColumn> columns2 = Collections.singletonList(new PutColumn(columnFamily, columnQualifier,
+ content2.getBytes(StandardCharsets.UTF_8)));
+ final PutFlowFile putFlowFile2 = new PutFlowFile(tableName, row2, columns2, null);
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
[2/2] nifi git commit: NIFI-1174 Refactoring the HBase client API and
adding a PutHBaseJSON which can write a whole row from a single json document
- Adding Complex Field Strategy to PutHBaseJSON to allow more control of
complex fields - Improving error
Posted by bb...@apache.org.
NIFI-1174 Refactoring the HBase client API and adding a PutHBaseJSON which can write a whole row from a single json document - Adding Complex Field Strategy to PutHBaseJSON to allow more control of complex fields - Improving error messages to indicate what the problem was with an invalid row
Signed-off-by: Bryan Bende <bb...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/40dd8a0a
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/40dd8a0a
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/40dd8a0a
Branch: refs/heads/master
Commit: 40dd8a0a845ef5f4d4fde451f02376ab2fab9758
Parents: 8c2323d
Author: Bryan Bende <bb...@apache.org>
Authored: Wed Nov 18 17:24:49 2015 -0500
Committer: Bryan Bende <bb...@apache.org>
Committed: Thu Nov 19 13:49:02 2015 -0500
----------------------------------------------------------------------
.../nifi-hbase-processors/pom.xml | 4 +
.../nifi/hbase/AbstractHBaseProcessor.java | 23 -
.../org/apache/nifi/hbase/AbstractPutHBase.java | 183 ++++++++
.../java/org/apache/nifi/hbase/GetHBase.java | 3 +-
.../org/apache/nifi/hbase/PutHBaseCell.java | 153 +------
.../org/apache/nifi/hbase/PutHBaseJSON.java | 230 ++++++++++
.../org.apache.nifi.processor.Processor | 3 +-
.../org/apache/nifi/hbase/HBaseTestUtil.java | 87 ++++
.../nifi/hbase/MockHBaseClientService.java | 14 +-
.../org/apache/nifi/hbase/TestPutHBaseCell.java | 60 ++-
.../org/apache/nifi/hbase/TestPutHBaseJSON.java | 423 +++++++++++++++++++
.../apache/nifi/hbase/HBaseClientService.java | 11 +
.../org/apache/nifi/hbase/put/PutColumn.java | 47 +++
.../org/apache/nifi/hbase/put/PutFlowFile.java | 38 +-
.../nifi/hbase/HBase_1_1_2_ClientService.java | 25 +-
.../hbase/TestHBase_1_1_2_ClientService.java | 27 +-
16 files changed, 1119 insertions(+), 212 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/40dd8a0a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml
index b474c6a..abbe4c9 100644
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml
@@ -50,6 +50,10 @@
<artifactId>commons-lang3</artifactId>
<version>3.4</version>
</dependency>
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-mapper-asl</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
http://git-wip-us.apache.org/repos/asf/nifi/blob/40dd8a0a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractHBaseProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractHBaseProcessor.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractHBaseProcessor.java
deleted file mode 100644
index 9cce35e..0000000
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractHBaseProcessor.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.hbase;
-
-import org.apache.nifi.processor.AbstractProcessor;
-
-public abstract class AbstractHBaseProcessor extends AbstractProcessor {
-
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/40dd8a0a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java
new file mode 100644
index 0000000..87424f9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.hbase.put.PutFlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Base class for processors that put data to HBase.
+ */
+public abstract class AbstractPutHBase extends AbstractProcessor {
+
+ protected static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder()
+ .name("HBase Client Service")
+ .description("Specifies the Controller Service to use for accessing HBase.")
+ .required(true)
+ .identifiesControllerService(HBaseClientService.class)
+ .build();
+ protected static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
+ .name("Table Name")
+ .description("The name of the HBase Table to put data into")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+ protected static final PropertyDescriptor ROW_ID = new PropertyDescriptor.Builder()
+ .name("Row Identifier")
+ .description("Specifies the Row ID to use when inserting data into HBase")
+ .required(false) // not all sub-classes will require this
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+ protected static final PropertyDescriptor COLUMN_FAMILY = new PropertyDescriptor.Builder()
+ .name("Column Family")
+ .description("The Column Family to use when inserting data into HBase")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+ protected static final PropertyDescriptor COLUMN_QUALIFIER = new PropertyDescriptor.Builder()
+ .name("Column Qualifier")
+ .description("The Column Qualifier to use when inserting data into HBase")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+ protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+ .name("Batch Size")
+ .description("The maximum number of FlowFiles to process in a single execution. The FlowFiles will be " +
+ "grouped by table, and a single Put per table will be performed.")
+ .required(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("25")
+ .build();
+
+ protected static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("A FlowFile is routed to this relationship after it has been successfully stored in HBase")
+ .build();
+ protected static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("A FlowFile is routed to this relationship if it cannot be sent to HBase")
+ .build();
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+ List<FlowFile> flowFiles = session.get(batchSize);
+ if (flowFiles == null || flowFiles.size() == 0) {
+ return;
+ }
+
+ final Map<String,List<PutFlowFile>> tablePuts = new HashMap<>();
+
+ // Group FlowFiles by HBase Table
+ for (final FlowFile flowFile : flowFiles) {
+ final PutFlowFile putFlowFile = createPut(session, context, flowFile);
+
+ if (putFlowFile == null) {
+ // sub-classes should log appropriate error messages before returning null
+ session.transfer(flowFile, REL_FAILURE);
+ } else if (!putFlowFile.isValid()) {
+ if (StringUtils.isBlank(putFlowFile.getTableName())) {
+ getLogger().error("Missing table name for FlowFile {}; routing to failure", new Object[]{flowFile});
+ } else if (StringUtils.isBlank(putFlowFile.getRow())) {
+ getLogger().error("Missing row id for FlowFile {}; routing to failure", new Object[]{flowFile});
+ } else if (putFlowFile.getColumns() == null || putFlowFile.getColumns().isEmpty()) {
+ getLogger().error("No columns provided for FlowFile {}; routing to failure", new Object[]{flowFile});
+ } else {
+ // really shouldn't get here, but just in case
+ getLogger().error("Failed to produce a put for FlowFile {}; routing to failure", new Object[]{flowFile});
+ }
+ session.transfer(flowFile, REL_FAILURE);
+ } else {
+ List<PutFlowFile> putFlowFiles = tablePuts.get(putFlowFile.getTableName());
+ if (putFlowFiles == null) {
+ putFlowFiles = new ArrayList<>();
+ tablePuts.put(putFlowFile.getTableName(), putFlowFiles);
+ }
+ putFlowFiles.add(putFlowFile);
+ }
+ }
+
+ getLogger().debug("Sending {} FlowFiles to HBase in {} put operations", new Object[]{flowFiles.size(), tablePuts.size()});
+
+ final long start = System.nanoTime();
+ final List<PutFlowFile> successes = new ArrayList<>();
+ final HBaseClientService hBaseClientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
+
+ for (Map.Entry<String, List<PutFlowFile>> entry : tablePuts.entrySet()) {
+ try {
+ hBaseClientService.put(entry.getKey(), entry.getValue());
+ successes.addAll(entry.getValue());
+ } catch (Exception e) {
+ getLogger().error(e.getMessage(), e);
+
+ for (PutFlowFile putFlowFile : entry.getValue()) {
+ getLogger().error("Failed to send {} to HBase due to {}; routing to failure", new Object[]{putFlowFile.getFlowFile(), e});
+ final FlowFile failure = session.penalize(putFlowFile.getFlowFile());
+ session.transfer(failure, REL_FAILURE);
+ }
+ }
+ }
+
+ final long sendMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+ getLogger().debug("Sent {} FlowFiles to HBase successfully in {} milliseconds", new Object[]{successes.size(), sendMillis});
+
+ for (PutFlowFile putFlowFile : successes) {
+ session.transfer(putFlowFile.getFlowFile(), REL_SUCCESS);
+ final String details = "Put " + putFlowFile.getColumns().size() + " cells to HBase";
+ session.getProvenanceReporter().send(putFlowFile.getFlowFile(), getTransitUri(putFlowFile), details, sendMillis);
+ }
+
+ }
+
+ protected String getTransitUri(PutFlowFile putFlowFile) {
+ return "hbase://" + putFlowFile.getTableName() + "/" + putFlowFile.getRow();
+ }
+
+ /**
+ * Sub-classes provide the implementation to create a put from a FlowFile.
+ *
+ * @param session
+ * the current session
+ * @param context
+ * the current context
+ * @param flowFile
+ * the FlowFile to create a Put from
+ *
+ * @return a PutFlowFile instance for the given FlowFile
+ */
+ protected abstract PutFlowFile createPut(final ProcessSession session, final ProcessContext context, final FlowFile flowFile);
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/40dd8a0a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java
index 5f08265..98a612c 100644
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java
@@ -41,6 +41,7 @@ import org.apache.nifi.hbase.scan.ResultCell;
import org.apache.nifi.hbase.scan.ResultHandler;
import org.apache.nifi.hbase.util.ObjectSerDe;
import org.apache.nifi.hbase.util.StringSerDe;
+import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
@@ -83,7 +84,7 @@ import java.util.regex.Pattern;
@WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the data was pulled from"),
@WritesAttribute(attribute = "mime.type", description = "Set to application/json to indicate that output is JSON")
})
-public class GetHBase extends AbstractHBaseProcessor {
+public class GetHBase extends AbstractProcessor {
static final Pattern COLUMNS_PATTERN = Pattern.compile("\\w+(:\\w+)?(?:,\\w+(:\\w+)?)*");
http://git-wip-us.apache.org/repos/asf/nifi/blob/40dd8a0a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseCell.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseCell.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseCell.java
index 0a2b763..759d91e 100644
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseCell.java
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseCell.java
@@ -16,7 +16,6 @@
*/
package org.apache.nifi.hbase;
-import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
@@ -24,91 +23,36 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.hbase.put.PutColumn;
import org.apache.nifi.hbase.put.PutFlowFile;
-import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Set;
-import java.util.concurrent.TimeUnit;
@EventDriven
@SupportsBatching
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"hadoop", "hbase"})
@CapabilityDescription("Adds the Contents of a FlowFile to HBase as the value of a single cell")
-public class PutHBaseCell extends AbstractProcessor {
-
- protected static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder()
- .name("HBase Client Service")
- .description("Specifies the Controller Service to use for accessing HBase.")
- .required(true)
- .identifiesControllerService(HBaseClientService.class)
- .build();
- protected static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
- .name("Table Name")
- .description("The name of the HBase Table to put data into")
- .required(true)
- .expressionLanguageSupported(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
- static final PropertyDescriptor ROW = new PropertyDescriptor.Builder()
- .name("Row Identifier")
- .description("Specifies the Row ID to use when inserting data into HBase")
- .required(true)
- .expressionLanguageSupported(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
- static final PropertyDescriptor COLUMN_FAMILY = new PropertyDescriptor.Builder()
- .name("Column Family")
- .description("The Column Family to use when inserting data into HBase")
- .required(true)
- .expressionLanguageSupported(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
- static final PropertyDescriptor COLUMN_QUALIFIER = new PropertyDescriptor.Builder()
- .name("Column Qualifier")
- .description("The Column Qualifier to use when inserting data into HBase")
- .required(true)
- .expressionLanguageSupported(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
- static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
- .name("Batch Size")
- .description("The maximum number of FlowFiles to process in a single execution. The FlowFiles will be " +
- "grouped by table, and a single Put per table will be performed.")
- .required(true)
- .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
- .defaultValue("25")
- .build();
-
- static final Relationship REL_SUCCESS = new Relationship.Builder()
- .name("success")
- .description("A FlowFile is routed to this relationship after it has been successfully stored in HBase")
- .build();
- static final Relationship FAILURE = new Relationship.Builder()
- .name("failure")
- .description("A FlowFile is routed to this relationship if it cannot be sent to HBase")
- .build();
+public class PutHBaseCell extends AbstractPutHBase {
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(HBASE_CLIENT_SERVICE);
properties.add(TABLE_NAME);
- properties.add(ROW);
+ properties.add(ROW_ID);
properties.add(COLUMN_FAMILY);
properties.add(COLUMN_QUALIFIER);
properties.add(BATCH_SIZE);
@@ -119,84 +63,27 @@ public class PutHBaseCell extends AbstractProcessor {
public Set<Relationship> getRelationships() {
final Set<Relationship> rels = new HashSet<>();
rels.add(REL_SUCCESS);
- rels.add(FAILURE);
+ rels.add(REL_FAILURE);
return rels;
}
@Override
- public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
- final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
- List<FlowFile> flowFiles = session.get(batchSize);
- if (flowFiles == null || flowFiles.size() == 0) {
- return;
- }
-
- final Map<String,List<PutFlowFile>> tablePuts = new HashMap<>();
-
- // Group FlowFiles by HBase Table
- for (final FlowFile flowFile : flowFiles) {
- final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
- final String row = context.getProperty(ROW).evaluateAttributeExpressions(flowFile).getValue();
- final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue();
- final String columnQualifier = context.getProperty(COLUMN_QUALIFIER).evaluateAttributeExpressions(flowFile).getValue();
-
- if (StringUtils.isBlank(tableName) || StringUtils.isBlank(row) || StringUtils.isBlank(columnFamily) || StringUtils.isBlank(columnQualifier)) {
- getLogger().error("Invalid FlowFile {} missing table, row, column familiy, or column qualifier; routing to failure", new Object[]{flowFile});
- session.transfer(flowFile, FAILURE);
- } else {
- final byte[] buffer = new byte[(int) flowFile.getSize()];
- session.read(flowFile, new InputStreamCallback() {
- @Override
- public void process(final InputStream in) throws IOException {
- StreamUtils.fillBuffer(in, buffer);
- }
- });
-
- final PutFlowFile putFlowFile = new PutFlowFile(tableName, row, columnFamily, columnQualifier, buffer, flowFile);
-
- List<PutFlowFile> putFlowFiles = tablePuts.get(tableName);
- if (putFlowFiles == null) {
- putFlowFiles = new ArrayList<>();
- tablePuts.put(tableName, putFlowFiles);
- }
- putFlowFiles.add(putFlowFile);
- }
- }
-
- getLogger().debug("Sending {} FlowFiles to HBase in {} put operations", new Object[] {flowFiles.size(), tablePuts.size()});
-
- final long start = System.nanoTime();
- final List<PutFlowFile> successes = new ArrayList<>();
- final HBaseClientService hBaseClientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
-
- for (Map.Entry<String, List<PutFlowFile>> entry : tablePuts.entrySet()) {
- try {
- hBaseClientService.put(entry.getKey(), entry.getValue());
- successes.addAll(entry.getValue());
- } catch (Exception e) {
- getLogger().error(e.getMessage(), e);
-
- for (PutFlowFile putFlowFile : entry.getValue()) {
- getLogger().error("Failed to send {} to HBase due to {}; routing to failure", new Object[]{putFlowFile.getFlowFile(), e});
- final FlowFile failure = session.penalize(putFlowFile.getFlowFile());
- session.transfer(failure, FAILURE);
- }
+ protected PutFlowFile createPut(final ProcessSession session, final ProcessContext context, final FlowFile flowFile) {
+ final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+ final String row = context.getProperty(ROW_ID).evaluateAttributeExpressions(flowFile).getValue();
+ final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue();
+ final String columnQualifier = context.getProperty(COLUMN_QUALIFIER).evaluateAttributeExpressions(flowFile).getValue();
+
+ final byte[] buffer = new byte[(int) flowFile.getSize()];
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream in) throws IOException {
+ StreamUtils.fillBuffer(in, buffer);
}
- }
-
- final long sendMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
- getLogger().debug("Sent {} FlowFiles to HBase successfully in {} milliseconds", new Object[] {successes.size(), sendMillis});
-
- for (PutFlowFile putFlowFile : successes) {
- session.transfer(putFlowFile.getFlowFile(), REL_SUCCESS);
- session.getProvenanceReporter().send(putFlowFile.getFlowFile(), getTransitUri(putFlowFile), sendMillis);
- }
-
- }
+ });
- protected String getTransitUri(PutFlowFile putFlowFile) {
- return "hbase://" + putFlowFile.getTableName() + "/" + putFlowFile.getRow() + "/" + putFlowFile.getColumnFamily()
- + ":" + putFlowFile.getColumnQualifier();
+ final Collection<PutColumn> columns = Collections.singletonList(new PutColumn(columnFamily, columnQualifier, buffer));
+ return new PutFlowFile(tableName, row, columns, flowFile);
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/40dd8a0a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java
new file mode 100644
index 0000000..0dba7ee
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.hbase.put.PutColumn;
+import org.apache.nifi.hbase.put.PutFlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.ObjectHolder;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+@EventDriven
+@SupportsBatching
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"hadoop", "hbase", "put", "json"})
+@CapabilityDescription("Adds rows to HBase based on the contents of incoming JSON documents. Each FlowFile must contain a single " +
+ "UTF-8 encoded JSON document, and any FlowFiles where the root element is not a single document will be routed to failure. " +
+ "Each JSON field name and value will become a column qualifier and value of the HBase row. Any fields with a null value " +
+ "will be skipped, and fields with a complex value will be handled according to the Complex Field Strategy. " +
+ "The row id can be specified either directly on the processor through the Row Identifier property, or can be extracted from the JSON " +
+ "document by specifying the Row Identifier Field Name property. This processor will hold the contents of all FlowFiles for the given batch " +
+ "in memory at one time.")
+public class PutHBaseJSON extends AbstractPutHBase {
+
+ protected static final PropertyDescriptor ROW_FIELD_NAME = new PropertyDescriptor.Builder()
+ .name("Row Identifier Field Name")
+ .description("Specifies the name of a JSON element whose value should be used as the row id for the given JSON document.")
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ protected static final String FAIL_VALUE = "Fail";
+ protected static final String WARN_VALUE = "Warn";
+ protected static final String IGNORE_VALUE = "Ignore";
+ protected static final String TEXT_VALUE = "Text";
+
+ protected static final AllowableValue COMPLEX_FIELD_FAIL = new AllowableValue(FAIL_VALUE, FAIL_VALUE, "Route entire FlowFile to failure if any elements contain complex values.");
+ protected static final AllowableValue COMPLEX_FIELD_WARN = new AllowableValue(WARN_VALUE, WARN_VALUE, "Provide a warning and do not include field in row sent to HBase.");
+ protected static final AllowableValue COMPLEX_FIELD_IGNORE = new AllowableValue(IGNORE_VALUE, IGNORE_VALUE, "Silently ignore and do not include in row sent to HBase.");
+ protected static final AllowableValue COMPLEX_FIELD_TEXT = new AllowableValue(TEXT_VALUE, TEXT_VALUE, "Use the string representation of the complex field as the value of the given column.");
+
+ protected static final PropertyDescriptor COMPLEX_FIELD_STRATEGY = new PropertyDescriptor.Builder()
+ .name("Complex Field Strategy")
+ .description("Indicates how to handle complex fields, i.e. fields that do not have a single text value.")
+ .expressionLanguageSupported(false)
+ .required(true)
+ .allowableValues(COMPLEX_FIELD_FAIL, COMPLEX_FIELD_WARN, COMPLEX_FIELD_IGNORE, COMPLEX_FIELD_TEXT)
+ .defaultValue(COMPLEX_FIELD_TEXT.getValue())
+ .build();
+
+ @Override
+ public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(HBASE_CLIENT_SERVICE);
+ properties.add(TABLE_NAME);
+ properties.add(ROW_ID);
+ properties.add(ROW_FIELD_NAME);
+ properties.add(COLUMN_FAMILY);
+ properties.add(BATCH_SIZE);
+ properties.add(COMPLEX_FIELD_STRATEGY);
+ return properties;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ final Set<Relationship> rels = new HashSet<>();
+ rels.add(REL_SUCCESS);
+ rels.add(REL_FAILURE);
+ return rels;
+ }
+
+ @Override
+ protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+ final Collection<ValidationResult> results = new ArrayList<>();
+
+ final String rowId = validationContext.getProperty(ROW_ID).getValue();
+ final String rowFieldName = validationContext.getProperty(ROW_FIELD_NAME).getValue();
+
+ if (StringUtils.isBlank(rowId) && StringUtils.isBlank(rowFieldName)) {
+ results.add(new ValidationResult.Builder()
+ .subject(this.getClass().getSimpleName())
+ .explanation("Row Identifier or Row Identifier Field Name is required")
+ .valid(false)
+ .build());
+ } else if (!StringUtils.isBlank(rowId) && !StringUtils.isBlank(rowFieldName)) {
+ results.add(new ValidationResult.Builder()
+ .subject(this.getClass().getSimpleName())
+ .explanation("Row Identifier and Row Identifier Field Name can not be used together")
+ .valid(false)
+ .build());
+ }
+
+ return results;
+ }
+
+ @Override
+ protected PutFlowFile createPut(final ProcessSession session, final ProcessContext context, final FlowFile flowFile) {
+ final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+ final String rowId = context.getProperty(ROW_ID).evaluateAttributeExpressions(flowFile).getValue();
+ final String rowFieldName = context.getProperty(ROW_FIELD_NAME).evaluateAttributeExpressions(flowFile).getValue();
+ final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue();
+ final boolean extractRowId = !StringUtils.isBlank(rowFieldName);
+ final String complexFieldStrategy = context.getProperty(COMPLEX_FIELD_STRATEGY).getValue();
+
+ // Parse the JSON document
+ final ObjectMapper mapper = new ObjectMapper();
+ final ObjectHolder<JsonNode> rootNodeRef = new ObjectHolder<>(null);
+ try {
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream in) throws IOException {
+ try (final InputStream bufferedIn = new BufferedInputStream(in)) {
+ rootNodeRef.set(mapper.readTree(bufferedIn));
+ }
+ }
+ });
+ } catch (final ProcessException pe) {
+ getLogger().error("Failed to parse {} as JSON due to {}; routing to failure", new Object[]{flowFile, pe.toString()}, pe);
+ return null;
+ }
+
+ final JsonNode rootNode = rootNodeRef.get();
+
+ if (rootNode.isArray()) {
+ getLogger().error("Root node of JSON must be a single document, found array for {}; routing to failure", new Object[]{flowFile});
+ return null;
+ }
+
+ final Collection<PutColumn> columns = new ArrayList<>();
+ final ObjectHolder<String> rowIdHolder = new ObjectHolder<>(null);
+
+ // convert each field/value to a column for the put, skip over nulls and arrays
+ final Iterator<String> fieldNames = rootNode.getFieldNames();
+ while (fieldNames.hasNext()) {
+ final String fieldName = fieldNames.next();
+ final ObjectHolder<String> fieldValueHolder = new ObjectHolder<>(null);
+
+ final JsonNode fieldNode = rootNode.get(fieldName);
+ if (fieldNode.isNull()) {
+ getLogger().debug("Skipping {} because value was null", new Object[]{fieldName});
+ } else if (fieldNode.isValueNode()) {
+ fieldValueHolder.set(fieldNode.asText());
+ } else {
+ // for non-null, non-value nodes, determine what to do based on the handling strategy
+ switch (complexFieldStrategy) {
+ case FAIL_VALUE:
+ getLogger().error("Complex value found for {}; routing to failure", new Object[]{fieldName});
+ return null;
+ case WARN_VALUE:
+ getLogger().warn("Complex value found for {}; skipping", new Object[]{fieldName});
+ break;
+ case TEXT_VALUE:
+ // use toString() here because asText() is only guaranteed to be supported on value nodes
+ // some other types of nodes, like ArrayNode, provide toString implementations
+ fieldValueHolder.set(fieldNode.toString());
+ break;
+ case IGNORE_VALUE:
+ // silently skip
+ break;
+ default:
+ break;
+ }
+ }
+
+ // if we have a field value, then see if this is the row id field, if so store the value for later
+ // otherwise add a new column where the fieldName and fieldValue are the column qualifier and value
+ if (fieldValueHolder.get() != null) {
+ if (extractRowId && fieldName.equals(rowFieldName)) {
+ rowIdHolder.set(fieldValueHolder.get());
+ } else {
+ columns.add(new PutColumn(columnFamily, fieldName, fieldValueHolder.get().getBytes(StandardCharsets.UTF_8)));
+ }
+ }
+ }
+
+ // if we are expecting a field name to use for the row id and the incoming document doesn't have it
+ // log an error message so the user can see what the field names were and return null so it gets routed to failure
+ if (extractRowId && rowIdHolder.get() == null) {
+ final String fieldNameStr = StringUtils.join(rootNode.getFieldNames(), ",");
+ getLogger().error("Row ID field named '{}' not found in field names '{}'; routing to failure", new Object[] {rowFieldName, fieldNameStr});
+ return null;
+ }
+
+ final String putRowId = (extractRowId ? rowIdHolder.get() : rowId);
+ return new PutFlowFile(tableName, putRowId, columns, flowFile);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/40dd8a0a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 613515d..6e2af81 100644
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -14,4 +14,5 @@
# limitations under the License.
org.apache.nifi.hbase.GetHBase
-org.apache.nifi.hbase.PutHBaseCell
\ No newline at end of file
+org.apache.nifi.hbase.PutHBaseCell
+org.apache.nifi.hbase.PutHBaseJSON
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/40dd8a0a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java
new file mode 100644
index 0000000..fc30f73
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+import org.apache.nifi.hbase.put.PutColumn;
+import org.apache.nifi.hbase.put.PutFlowFile;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertTrue;
+
+public class HBaseTestUtil {
+
+ public static void verifyPut(final String row, final String columnFamily, final Map<String,String> columns, final List<PutFlowFile> puts) {
+ boolean foundPut = false;
+
+ for (final PutFlowFile put : puts) {
+ if (!row.equals(put.getRow())) {
+ continue;
+ }
+
+ if (put.getColumns() == null || put.getColumns().size() != columns.size()) {
+ continue;
+ }
+
+ // start off assuming we have all the columns
+ boolean foundAllColumns = true;
+
+ for (Map.Entry<String, String> entry : columns.entrySet()) {
+ // determine if we have the current expected column
+ boolean foundColumn = false;
+ for (PutColumn putColumn : put.getColumns()) {
+ final String colVal = new String(putColumn.getBuffer(), StandardCharsets.UTF_8);
+ if (columnFamily.equals(putColumn.getColumnFamily()) && entry.getKey().equals(putColumn.getColumnQualifier())
+ && entry.getValue().equals(colVal)) {
+ foundColumn = true;
+ break;
+ }
+ }
+
+ // if we didn't have the current expected column we know we don't have all expected columns
+ if (!foundColumn) {
+ foundAllColumns = false;
+ break;
+ }
+ }
+
+ // if we found all the expected columns this was a match so we can break
+ if (foundAllColumns) {
+ foundPut = true;
+ break;
+ }
+ }
+
+ assertTrue(foundPut);
+ }
+
+ public static void verifyEvent(final List<ProvenanceEventRecord> events, final String uri, final ProvenanceEventType eventType) {
+ boolean foundEvent = false;
+ for (final ProvenanceEventRecord event : events) {
+ if (event.getTransitUri().equals(uri) && event.getEventType().equals(eventType)) {
+ foundEvent = true;
+ break;
+ }
+ }
+ assertTrue(foundEvent);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/40dd8a0a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
index a2abf7e..bca8b4f 100644
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
@@ -17,6 +17,7 @@
package org.apache.nifi.hbase;
import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.hbase.put.PutColumn;
import org.apache.nifi.hbase.put.PutFlowFile;
import org.apache.nifi.hbase.scan.Column;
import org.apache.nifi.hbase.scan.ResultCell;
@@ -33,7 +34,7 @@ import java.util.Map;
public class MockHBaseClientService extends AbstractControllerService implements HBaseClientService {
private Map<String,ResultCell[]> results = new HashMap<>();
- private Map<String, List<PutFlowFile>> puts = new HashMap<>();
+ private Map<String, List<PutFlowFile>> flowFilePuts = new HashMap<>();
private boolean throwException = false;
@Override
@@ -42,7 +43,12 @@ public class MockHBaseClientService extends AbstractControllerService implements
throw new IOException("exception");
}
- this.puts.put(tableName, new ArrayList<>(puts));
+ this.flowFilePuts.put(tableName, new ArrayList<>(puts));
+ }
+
+ @Override
+ public void put(String tableName, String rowId, Collection<PutColumn> columns) throws IOException {
+ throw new UnsupportedOperationException();
}
@Override
@@ -92,8 +98,8 @@ public class MockHBaseClientService extends AbstractControllerService implements
results.put(rowKey, cellArray);
}
- public Map<String, List<PutFlowFile>> getPuts() {
- return puts;
+ public Map<String, List<PutFlowFile>> getFlowFilePuts() {
+ return flowFilePuts;
}
public void setThrowException(boolean throwException) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/40dd8a0a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseCell.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseCell.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseCell.java
index 62fa9a6..0cd8ff7 100644
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseCell.java
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseCell.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.hbase;
+import org.apache.nifi.hbase.put.PutColumn;
import org.apache.nifi.hbase.put.PutFlowFile;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
@@ -43,7 +44,7 @@ public class TestPutHBaseCell {
final TestRunner runner = TestRunners.newTestRunner(PutHBaseCell.class);
runner.setProperty(PutHBaseCell.TABLE_NAME, tableName);
- runner.setProperty(PutHBaseCell.ROW, row);
+ runner.setProperty(PutHBaseCell.ROW_ID, row);
runner.setProperty(PutHBaseCell.COLUMN_FAMILY, columnFamily);
runner.setProperty(PutHBaseCell.COLUMN_QUALIFIER, columnQualifier);
runner.setProperty(PutHBaseCell.BATCH_SIZE, "1");
@@ -58,12 +59,14 @@ public class TestPutHBaseCell {
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
outFile.assertContentEquals(content);
- assertNotNull(hBaseClient.getPuts());
- assertEquals(1, hBaseClient.getPuts().size());
+ assertNotNull(hBaseClient.getFlowFilePuts());
+ assertEquals(1, hBaseClient.getFlowFilePuts().size());
- List<PutFlowFile> puts = hBaseClient.getPuts().get(tableName);
+ List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(tableName);
assertEquals(1, puts.size());
verifyPut(row, columnFamily, columnQualifier, content, puts.get(0));
+
+ assertEquals(1, runner.getProvenanceEvents().size());
}
@Test
@@ -89,12 +92,14 @@ public class TestPutHBaseCell {
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
outFile.assertContentEquals(content);
- assertNotNull(hBaseClient.getPuts());
- assertEquals(1, hBaseClient.getPuts().size());
+ assertNotNull(hBaseClient.getFlowFilePuts());
+ assertEquals(1, hBaseClient.getFlowFilePuts().size());
- List<PutFlowFile> puts = hBaseClient.getPuts().get(tableName);
+ List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(tableName);
assertEquals(1, puts.size());
verifyPut(row, columnFamily, columnQualifier, content, puts.get(0));
+
+ assertEquals(1, runner.getProvenanceEvents().size());
}
@Test
@@ -115,7 +120,9 @@ public class TestPutHBaseCell {
runner.run();
runner.assertTransferCount(PutHBaseCell.REL_SUCCESS, 0);
- runner.assertTransferCount(PutHBaseCell.FAILURE, 1);
+ runner.assertTransferCount(PutHBaseCell.REL_FAILURE, 1);
+
+ assertEquals(0, runner.getProvenanceEvents().size());
}
@Test
@@ -142,7 +149,9 @@ public class TestPutHBaseCell {
runner.run();
runner.assertTransferCount(PutHBaseCell.REL_SUCCESS, 1);
- runner.assertTransferCount(PutHBaseCell.FAILURE, 1);
+ runner.assertTransferCount(PutHBaseCell.REL_FAILURE, 1);
+
+ assertEquals(1, runner.getProvenanceEvents().size());
}
@Test
@@ -171,13 +180,15 @@ public class TestPutHBaseCell {
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
outFile.assertContentEquals(content1);
- assertNotNull(hBaseClient.getPuts());
- assertEquals(1, hBaseClient.getPuts().size());
+ assertNotNull(hBaseClient.getFlowFilePuts());
+ assertEquals(1, hBaseClient.getFlowFilePuts().size());
- List<PutFlowFile> puts = hBaseClient.getPuts().get(tableName);
+ List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(tableName);
assertEquals(2, puts.size());
verifyPut(row1, columnFamily, columnQualifier, content1, puts.get(0));
verifyPut(row2, columnFamily, columnQualifier, content2, puts.get(1));
+
+ assertEquals(2, runner.getProvenanceEvents().size());
}
@Test
@@ -202,7 +213,9 @@ public class TestPutHBaseCell {
runner.enqueue(content2.getBytes("UTF-8"), attributes2);
runner.run();
- runner.assertAllFlowFilesTransferred(PutHBaseCell.FAILURE, 2);
+ runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_FAILURE, 2);
+
+ assertEquals(0, runner.getProvenanceEvents().size());
}
@Test
@@ -229,13 +242,15 @@ public class TestPutHBaseCell {
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
outFile.assertContentEquals(content1);
- assertNotNull(hBaseClient.getPuts());
- assertEquals(1, hBaseClient.getPuts().size());
+ assertNotNull(hBaseClient.getFlowFilePuts());
+ assertEquals(1, hBaseClient.getFlowFilePuts().size());
- List<PutFlowFile> puts = hBaseClient.getPuts().get(tableName);
+ List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(tableName);
assertEquals(2, puts.size());
verifyPut(row, columnFamily, columnQualifier, content1, puts.get(0));
verifyPut(row, columnFamily, columnQualifier, content2, puts.get(1));
+
+ assertEquals(2, runner.getProvenanceEvents().size());
}
private Map<String, String> getAtrributeMapWithEL(String tableName, String row, String columnFamily, String columnQualifier) {
@@ -250,7 +265,7 @@ public class TestPutHBaseCell {
private TestRunner getTestRunnerWithEL(PutHBaseCell proc) {
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutHBaseCell.TABLE_NAME, "${hbase.tableName}");
- runner.setProperty(PutHBaseCell.ROW, "${hbase.row}");
+ runner.setProperty(PutHBaseCell.ROW_ID, "${hbase.row}");
runner.setProperty(PutHBaseCell.COLUMN_FAMILY, "${hbase.columnFamily}");
runner.setProperty(PutHBaseCell.COLUMN_QUALIFIER, "${hbase.columnQualifier}");
return runner;
@@ -266,9 +281,14 @@ public class TestPutHBaseCell {
private void verifyPut(String row, String columnFamily, String columnQualifier, String content, PutFlowFile put) {
assertEquals(row, put.getRow());
- assertEquals(columnFamily, put.getColumnFamily());
- assertEquals(columnQualifier, put.getColumnQualifier());
- assertEquals(content, new String(put.getBuffer(), StandardCharsets.UTF_8));
+
+ assertNotNull(put.getColumns());
+ assertEquals(1, put.getColumns().size());
+
+ final PutColumn column = put.getColumns().iterator().next();
+ assertEquals(columnFamily, column.getColumnFamily());
+ assertEquals(columnQualifier, column.getColumnQualifier());
+ assertEquals(content, new String(column.getBuffer(), StandardCharsets.UTF_8));
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/40dd8a0a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java
new file mode 100644
index 0000000..7b59919
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+import org.apache.nifi.hbase.put.PutFlowFile;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class TestPutHBaseJSON {
+
+ public static final String DEFAULT_TABLE_NAME = "nifi";
+ public static final String DEFAULT_ROW = "row1";
+ public static final String DEFAULT_COLUMN_FAMILY = "family1";
+
+ @Test
+ public void testCustomValidate() throws InitializationException {
+ // missing row id and row id field name should be invalid
+ TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
+ getHBaseClientService(runner);
+ runner.assertNotValid();
+
+ // setting both properties should still be invalid
+ runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
+ getHBaseClientService(runner);
+ runner.setProperty(PutHBaseJSON.ROW_ID, "rowId");
+ runner.setProperty(PutHBaseJSON.ROW_FIELD_NAME, "rowFieldName");
+ runner.assertNotValid();
+
+ // only a row id field name should make it valid
+ runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
+ getHBaseClientService(runner);
+ runner.setProperty(PutHBaseJSON.ROW_FIELD_NAME, "rowFieldName");
+ runner.assertValid();
+
+ // only a row id should make it valid
+ runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
+ getHBaseClientService(runner);
+ runner.setProperty(PutHBaseJSON.ROW_ID, "rowId");
+ runner.assertValid();
+ }
+
+ @Test
+ public void testSingleJsonDocAndProvidedRowId() throws IOException, InitializationException {
+ final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
+ final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
+ runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
+
+ final String content = "{ \"field1\" : \"value1\", \"field2\" : \"value2\" }";
+ runner.enqueue(content.getBytes("UTF-8"));
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
+
+ final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
+ outFile.assertContentEquals(content);
+
+ assertNotNull(hBaseClient.getFlowFilePuts());
+ assertEquals(1, hBaseClient.getFlowFilePuts().size());
+
+ final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME);
+ assertEquals(1, puts.size());
+
+ final Map<String,String> expectedColumns = new HashMap<>();
+ expectedColumns.put("field1", "value1");
+ expectedColumns.put("field2", "value2");
+ HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts);
+
+ final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
+ assertEquals(1, events.size());
+
+ final ProvenanceEventRecord event = events.get(0);
+ assertEquals("hbase://" + DEFAULT_TABLE_NAME + "/" + DEFAULT_ROW, event.getTransitUri());
+ }
+
+ @Test
+ public void testSingJsonDocAndExtractedRowId() throws IOException, InitializationException {
+ final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
+ final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
+ runner.setProperty(PutHBaseJSON.ROW_FIELD_NAME, "rowField");
+
+ final String content = "{ \"rowField\" : \"myRowId\", \"field1\" : \"value1\", \"field2\" : \"value2\" }";
+ runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
+
+ final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
+ outFile.assertContentEquals(content);
+
+ assertNotNull(hBaseClient.getFlowFilePuts());
+ assertEquals(1, hBaseClient.getFlowFilePuts().size());
+
+ final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME);
+ assertEquals(1, puts.size());
+
+ // should be a put with row id of myRowId, and rowField shouldn't end up in the columns
+ final Map<String,String> expectedColumns1 = new HashMap<>();
+ expectedColumns1.put("field1", "value1");
+ expectedColumns1.put("field2", "value2");
+ HBaseTestUtil.verifyPut("myRowId", DEFAULT_COLUMN_FAMILY, expectedColumns1, puts);
+
+ final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
+ assertEquals(1, events.size());
+ HBaseTestUtil.verifyEvent(runner.getProvenanceEvents(), "hbase://" + DEFAULT_TABLE_NAME + "/myRowId", ProvenanceEventType.SEND);
+ }
+
+ @Test
+ public void testSingJsonDocAndExtractedRowIdMissingField() throws IOException, InitializationException {
+ final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
+ final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
+ runner.setProperty(PutHBaseJSON.ROW_FIELD_NAME, "rowField");
+
+ final String content = "{ \"field1\" : \"value1\", \"field2\" : \"value2\" }";
+ runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_FAILURE, 1);
+
+ final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_FAILURE).get(0);
+ outFile.assertContentEquals(content);
+
+ // should be no provenance events
+ assertEquals(0, runner.getProvenanceEvents().size());
+
+ // no puts should have made it to the client
+ assertEquals(0, hBaseClient.getFlowFilePuts().size());
+ }
+
+ @Test
+ public void testMultipleJsonDocsRouteToFailure() throws IOException, InitializationException {
+ final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
+ final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
+ runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
+
+ final String content1 = "{ \"field1\" : \"value1\", \"field2\" : \"value2\" }";
+ final String content2 = "{ \"field3\" : \"value3\", \"field4\" : \"value4\" }";
+ final String content = "[ " + content1 + " , " + content2 + " ]";
+
+ runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_FAILURE, 1);
+
+ final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_FAILURE).get(0);
+ outFile.assertContentEquals(content);
+
+ // should be no provenance events
+ assertEquals(0, runner.getProvenanceEvents().size());
+
+ // no puts should have made it to the client
+ assertEquals(0, hBaseClient.getFlowFilePuts().size());
+ }
+
+ @Test
+ public void testELWithProvidedRowId() throws IOException, InitializationException {
+ final TestRunner runner = getTestRunner("${hbase.table}", "${hbase.colFamily}", "1");
+ final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
+ runner.setProperty(PutHBaseJSON.ROW_ID, "${hbase.rowId}");
+
+ final Map<String,String> attributes = new HashMap<>();
+ attributes.put("hbase.table", "myTable");
+ attributes.put("hbase.colFamily", "myColFamily");
+ attributes.put("hbase.rowId", "myRowId");
+
+ final String content = "{ \"field1\" : \"value1\", \"field2\" : \"value2\" }";
+ runner.enqueue(content.getBytes("UTF-8"), attributes);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
+
+ final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
+ outFile.assertContentEquals(content);
+
+ assertNotNull(hBaseClient.getFlowFilePuts());
+ assertEquals(1, hBaseClient.getFlowFilePuts().size());
+
+ final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get("myTable");
+ assertEquals(1, puts.size());
+
+ final Map<String,String> expectedColumns = new HashMap<>();
+ expectedColumns.put("field1", "value1");
+ expectedColumns.put("field2", "value2");
+ HBaseTestUtil.verifyPut("myRowId", "myColFamily", expectedColumns, puts);
+
+ final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
+ assertEquals(1, events.size());
+ HBaseTestUtil.verifyEvent(runner.getProvenanceEvents(), "hbase://myTable/myRowId", ProvenanceEventType.SEND);
+ }
+
+ @Test
+ public void testELWithExtractedRowId() throws IOException, InitializationException {
+ final TestRunner runner = getTestRunner("${hbase.table}", "${hbase.colFamily}", "1");
+ final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
+ runner.setProperty(PutHBaseJSON.ROW_FIELD_NAME, "${hbase.rowField}");
+
+ final Map<String,String> attributes = new HashMap<>();
+ attributes.put("hbase.table", "myTable");
+ attributes.put("hbase.colFamily", "myColFamily");
+ attributes.put("hbase.rowField", "field1");
+
+ final String content = "{ \"field1\" : \"value1\", \"field2\" : \"value2\" }";
+ runner.enqueue(content.getBytes("UTF-8"), attributes);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
+
+ final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
+ outFile.assertContentEquals(content);
+
+ assertNotNull(hBaseClient.getFlowFilePuts());
+ assertEquals(1, hBaseClient.getFlowFilePuts().size());
+
+ final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get("myTable");
+ assertEquals(1, puts.size());
+
+ final Map<String,String> expectedColumns = new HashMap<>();
+ expectedColumns.put("field2", "value2");
+ HBaseTestUtil.verifyPut("value1", "myColFamily", expectedColumns, puts);
+
+ final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
+ assertEquals(1, events.size());
+ HBaseTestUtil.verifyEvent(runner.getProvenanceEvents(), "hbase://myTable/value1", ProvenanceEventType.SEND);
+ }
+
+ @Test
+ public void testNullAndArrayElementsWithWarnStrategy() throws InitializationException {
+ final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
+ final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
+ runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
+ runner.setProperty(PutHBaseJSON.COMPLEX_FIELD_STRATEGY, PutHBaseJSON.COMPLEX_FIELD_WARN.getValue());
+
+ // should route to success because there is at least one valid field
+ final String content = "{ \"field1\" : [{ \"child_field1\" : \"child_value1\" }], \"field2\" : \"value2\", \"field3\" : null }";
+ runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
+
+ assertNotNull(hBaseClient.getFlowFilePuts());
+ assertEquals(1, hBaseClient.getFlowFilePuts().size());
+
+ final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME);
+ assertEquals(1, puts.size());
+
+ // should have skipped field1 and field3
+ final Map<String,String> expectedColumns = new HashMap<>();
+ expectedColumns.put("field2", "value2");
+ HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts);
+ }
+
+ @Test
+ public void testNullAndArrayElementsWithIgnoreStrategy() throws InitializationException {
+ final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
+ final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
+ runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
+ runner.setProperty(PutHBaseJSON.COMPLEX_FIELD_STRATEGY, PutHBaseJSON.COMPLEX_FIELD_IGNORE.getValue());
+
+ // should route to success because there is at least one valid field
+ final String content = "{ \"field1\" : [{ \"child_field1\" : \"child_value1\" }], \"field2\" : \"value2\", \"field3\" : null }";
+ runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
+
+ assertNotNull(hBaseClient.getFlowFilePuts());
+ assertEquals(1, hBaseClient.getFlowFilePuts().size());
+
+ final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME);
+ assertEquals(1, puts.size());
+
+ // should have skipped field1 and field3
+ final Map<String,String> expectedColumns = new HashMap<>();
+ expectedColumns.put("field2", "value2");
+ HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts);
+ }
+
+ @Test
+ public void testNullAndArrayElementsWithFailureStrategy() throws InitializationException {
+ final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
+ final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
+ runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
+ runner.setProperty(PutHBaseJSON.COMPLEX_FIELD_STRATEGY, PutHBaseJSON.COMPLEX_FIELD_FAIL.getValue());
+
+ // should route to success because there is at least one valid field
+ final String content = "{ \"field1\" : [{ \"child_field1\" : \"child_value1\" }], \"field2\" : \"value2\", \"field3\" : null }";
+ runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_FAILURE, 1);
+
+ final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_FAILURE).get(0);
+ outFile.assertContentEquals(content);
+
+ // should be no provenance events
+ assertEquals(0, runner.getProvenanceEvents().size());
+
+ // no puts should have made it to the client
+ assertEquals(0, hBaseClient.getFlowFilePuts().size());
+ }
+
+ @Test
+ public void testNullAndArrayElementsWithTextStrategy() throws InitializationException {
+ final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
+ final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
+ runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
+ runner.setProperty(PutHBaseJSON.COMPLEX_FIELD_STRATEGY, PutHBaseJSON.COMPLEX_FIELD_TEXT.getValue());
+
+ // should route to success because there is at least one valid field
+ final String content = "{ \"field1\" : [{ \"child_field1\" : \"child_value1\" }], \"field2\" : \"value2\", \"field3\" : null }";
+ runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
+
+ assertNotNull(hBaseClient.getFlowFilePuts());
+ assertEquals(1, hBaseClient.getFlowFilePuts().size());
+
+ final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME);
+ assertEquals(1, puts.size());
+
+ // should have skipped field1 and field3
+ final Map<String,String> expectedColumns = new HashMap<>();
+ expectedColumns.put("field1", "[{\"child_field1\":\"child_value1\"}]");
+ expectedColumns.put("field2", "value2");
+ HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts);
+ }
+
+ @Test
+ public void testNestedDocWithTextStrategy() throws InitializationException {
+ final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
+ final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
+ runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
+ runner.setProperty(PutHBaseJSON.COMPLEX_FIELD_STRATEGY, PutHBaseJSON.COMPLEX_FIELD_TEXT.getValue());
+
+ // should route to success because there is at least one valid field
+ final String content = "{ \"field1\" : { \"child_field1\" : \"child_value1\" }, \"field2\" : \"value2\", \"field3\" : null }";
+ runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
+
+ assertNotNull(hBaseClient.getFlowFilePuts());
+ assertEquals(1, hBaseClient.getFlowFilePuts().size());
+
+ final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME);
+ assertEquals(1, puts.size());
+
+ // should have skipped field1 and field3
+ final Map<String,String> expectedColumns = new HashMap<>();
+ expectedColumns.put("field1", "{\"child_field1\":\"child_value1\"}");
+ expectedColumns.put("field2", "value2");
+ HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts);
+ }
+
+ @Test
+ public void testAllElementsAreNullOrArrays() throws InitializationException {
+ final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
+ final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
+ runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
+ runner.setProperty(PutHBaseJSON.COMPLEX_FIELD_STRATEGY, PutHBaseJSON.COMPLEX_FIELD_WARN.getValue());
+
+ // should route to failure since it would produce a put with no columns
+ final String content = "{ \"field1\" : [{ \"child_field1\" : \"child_value1\" }], \"field2\" : null }";
+ runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_FAILURE, 1);
+
+ final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_FAILURE).get(0);
+ outFile.assertContentEquals(content);
+
+ // should be no provenance events
+ assertEquals(0, runner.getProvenanceEvents().size());
+
+ // no puts should have made it to the client
+ assertEquals(0, hBaseClient.getFlowFilePuts().size());
+ }
+
+ @Test
+ public void testInvalidJson() throws InitializationException {
+ final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
+ getHBaseClientService(runner);
+ runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
+
+ final String content = "NOT JSON";
+ runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_FAILURE, 1);
+ }
+
+ private TestRunner getTestRunner(String table, String columnFamily, String batchSize) {
+ final TestRunner runner = TestRunners.newTestRunner(PutHBaseJSON.class);
+ runner.setProperty(PutHBaseJSON.TABLE_NAME, table);
+ runner.setProperty(PutHBaseJSON.COLUMN_FAMILY, columnFamily);
+ runner.setProperty(PutHBaseJSON.BATCH_SIZE, batchSize);
+ return runner;
+ }
+
+ private MockHBaseClientService getHBaseClientService(final TestRunner runner) throws InitializationException {
+ final MockHBaseClientService hBaseClient = new MockHBaseClientService();
+ runner.addControllerService("hbaseClient", hBaseClient);
+ runner.enableControllerService(hBaseClient);
+ runner.setProperty(PutHBaseCell.HBASE_CLIENT_SERVICE, "hbaseClient");
+ return hBaseClient;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/40dd8a0a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
index 9ff2c46..79eef92 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
@@ -20,6 +20,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.hbase.put.PutColumn;
import org.apache.nifi.hbase.put.PutFlowFile;
import org.apache.nifi.hbase.scan.Column;
import org.apache.nifi.hbase.scan.ResultHandler;
@@ -74,6 +75,16 @@ public interface HBaseClientService extends ControllerService {
void put(String tableName, Collection<PutFlowFile> puts) throws IOException;
/**
+ * Puts the given row to HBase with the provided columns.
+ *
+ * @param tableName the name of an HBase table
+ * @param rowId the id of the row to put
+ * @param columns the columns of the row to put
+ * @throws IOException thrown when there are communication errors with HBase
+ */
+ void put(String tableName, String rowId, Collection<PutColumn> columns) throws IOException;
+
+ /**
* Scans the given table using the optional filter criteria and passing each result to the provided handler.
*
* @param tableName the name of an HBase table to scan
http://git-wip-us.apache.org/repos/asf/nifi/blob/40dd8a0a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutColumn.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutColumn.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutColumn.java
new file mode 100644
index 0000000..0971f94
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutColumn.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase.put;
+
+/**
+ * Encapsulates the information for one column of a put operation.
+ */
+public class PutColumn {
+
+ private final String columnFamily;
+ private final String columnQualifier;
+ private final byte[] buffer;
+
+
+ public PutColumn(final String columnFamily, final String columnQualifier, final byte[] buffer) {
+ this.columnFamily = columnFamily;
+ this.columnQualifier = columnQualifier;
+ this.buffer = buffer;
+ }
+
+ public String getColumnFamily() {
+ return columnFamily;
+ }
+
+ public String getColumnQualifier() {
+ return columnQualifier;
+ }
+
+ public byte[] getBuffer() {
+ return buffer;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/40dd8a0a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutFlowFile.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutFlowFile.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutFlowFile.java
index ed6319e..a97e3a4 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutFlowFile.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutFlowFile.java
@@ -16,8 +16,11 @@
*/
package org.apache.nifi.hbase.put;
+import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.flowfile.FlowFile;
+import java.util.Collection;
+
/**
* Wrapper to encapsulate all of the information for the Put along with the FlowFile.
*/
@@ -25,18 +28,13 @@ public class PutFlowFile {
private final String tableName;
private final String row;
- private final String columnFamily;
- private final String columnQualifier;
- private final byte[] buffer;
+ private final Collection<PutColumn> columns;
private final FlowFile flowFile;
- public PutFlowFile(String tableName, String row, String columnFamily, String columnQualifier,
- byte[] buffer, FlowFile flowFile) {
+ public PutFlowFile(String tableName, String row, Collection<PutColumn> columns, FlowFile flowFile) {
this.tableName = tableName;
this.row = row;
- this.columnFamily = columnFamily;
- this.columnQualifier = columnQualifier;
- this.buffer = buffer;
+ this.columns = columns;
this.flowFile = flowFile;
}
@@ -48,20 +46,26 @@ public class PutFlowFile {
return row;
}
- public String getColumnFamily() {
- return columnFamily;
+ public Collection<PutColumn> getColumns() {
+ return columns;
}
- public String getColumnQualifier() {
- return columnQualifier;
+ public FlowFile getFlowFile() {
+ return flowFile;
}
- public byte[] getBuffer() {
- return buffer;
- }
+ public boolean isValid() {
+ if (StringUtils.isBlank(tableName) || StringUtils.isBlank(row) || flowFile == null || columns == null || columns.isEmpty()) {
+ return false;
+ }
- public FlowFile getFlowFile() {
- return flowFile;
+ for (PutColumn column : columns) {
+ if (StringUtils.isBlank(column.getColumnQualifier()) || StringUtils.isBlank(column.getColumnFamily()) || column.getBuffer() == null) {
+ return false;
+ }
+ }
+
+ return true;
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/40dd8a0a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
index 42590c2..b207191 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
@@ -43,6 +43,7 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.hbase.put.PutColumn;
import org.apache.nifi.hbase.put.PutFlowFile;
import org.apache.nifi.hbase.scan.Column;
import org.apache.nifi.hbase.scan.ResultCell;
@@ -195,9 +196,13 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
put = new Put(putFlowFile.getRow().getBytes(StandardCharsets.UTF_8));
rowPuts.put(putFlowFile.getRow(), put);
}
- put.addColumn(putFlowFile.getColumnFamily().getBytes(StandardCharsets.UTF_8),
- putFlowFile.getColumnQualifier().getBytes(StandardCharsets.UTF_8),
- putFlowFile.getBuffer());
+
+ for (final PutColumn column : putFlowFile.getColumns()) {
+ put.addColumn(
+ column.getColumnFamily().getBytes(StandardCharsets.UTF_8),
+ column.getColumnQualifier().getBytes(StandardCharsets.UTF_8),
+ column.getBuffer());
+ }
}
table.put(new ArrayList<>(rowPuts.values()));
@@ -205,6 +210,20 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
}
@Override
+ public void put(final String tableName, final String rowId, final Collection<PutColumn> columns) throws IOException {
+ try (final Table table = connection.getTable(TableName.valueOf(tableName))) {
+ Put put = new Put(rowId.getBytes(StandardCharsets.UTF_8));
+ for (final PutColumn column : columns) {
+ put.addColumn(
+ column.getColumnFamily().getBytes(StandardCharsets.UTF_8),
+ column.getColumnQualifier().getBytes(StandardCharsets.UTF_8),
+ column.getBuffer());
+ }
+ table.put(put);
+ }
+ }
+
+ @Override
public void scan(final String tableName, final Collection<Column> columns, final String filterExpression, final long minTime, final ResultHandler handler)
throws IOException {