You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by bdesert <gi...@git.apache.org> on 2018/01/31 23:22:54 UTC

[GitHub] nifi pull request #2446: NIFI-4833 Add ScanHBase processor

GitHub user bdesert opened a pull request:

    https://github.com/apache/nifi/pull/2446

    NIFI-4833 Add ScanHBase processor

    ### Description:
    
    Add new processor ScanHBase and a test package.
    
    ------------------------------------------------------
    Thank you for submitting a contribution to Apache NiFi.
    
    In order to streamline the review of the contribution we ask you
    to ensure the following steps have been taken:
    
    ### For all changes:
    - [v] Is there a JIRA ticket associated with this PR? Is it referenced 
         in the commit message?
    
    - [v] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
    
    - [v] Has your PR been rebased against the latest commit within the target branch (typically master)?
    
    - [v] Is your initial contribution a single, squashed commit?
    
    ### For code changes:
    - [v] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
    - [v] Have you written or updated unit tests to verify your changes?
    - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? 
    - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
    - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
    - [v] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?
    
    ### For documentation related changes:
    - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
    
    ### Note:
    Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/bdesert/nifi master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/2446.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2446
    
----
commit 7a9dc565f43284a8535de052a812a252c6950613
Author: Ed <ed...@...>
Date:   2018-01-31T21:20:35Z

    NIFI-4833 Add ScanHBase processor

----


---

[GitHub] nifi pull request #2446: NIFI-4833 Add ScanHBase processor

Posted by bdesert <gi...@git.apache.org>.
Github user bdesert closed the pull request at:

    https://github.com/apache/nifi/pull/2446


---

[GitHub] nifi pull request #2446: NIFI-4833 Add ScanHBase processor

Posted by bdesert <gi...@git.apache.org>.
Github user bdesert commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2446#discussion_r165447440
  
    --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java ---
    @@ -0,0 +1,564 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.hbase;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.hbase.io.JsonFullRowSerializer;
    +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer;
    +import org.apache.nifi.hbase.io.RowSerializer;
    +import org.apache.nifi.hbase.scan.Column;
    +import org.apache.nifi.hbase.scan.ResultCell;
    +import org.apache.nifi.hbase.scan.ResultHandler;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.util.Tuple;
    +
    +import java.io.IOException;
    +import java.nio.charset.Charset;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.regex.Pattern;
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@Tags({"hbase", "scan", "fetch", "get"})
    +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end ),"
    +        + "by time range, by filter expression, or any combination of them. \n"
    +        + "Order of records can be controlled by a property <code>Reversed</code>"
    +        + "Number of rows retrieved by the processor can be limited.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"),
    +        @WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."),
    +        @WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"),
    +        @WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"),
    +        @WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions. <br/>Could be null (not present) if transfered to FAILURE")
    +})
    +public class ScanHBase extends AbstractProcessor {
    +	//enhanced regex for columns to allow "-" in column qualifier names
    +    static final Pattern COLUMNS_PATTERN = Pattern.compile("\\w+(:(\\w|-)+)?(?:,\\w+(:(\\w|-)+)?)*");
    +    static final byte[] nl = System.lineSeparator().getBytes();
    +
    +    static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder()
    +            .displayName("HBase Client Service")
    +            .name("scanhbase-client-service")
    +            .description("Specifies the Controller Service to use for accessing HBase.")
    +            .required(true)
    +            .identifiesControllerService(HBaseClientService.class)
    +            .build();
    +
    +    static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
    +            .displayName("Table Name")
    +            .name("scanhbase-table-name")
    +            .description("The name of the HBase Table to fetch from.")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor START_ROW = new PropertyDescriptor.Builder()
    +            .displayName("Start rowkey")
    +            .name("scanhbase-start-rowkey")
    +            .description("The rowkey to start scan from.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor END_ROW = new PropertyDescriptor.Builder()
    +            .displayName("End rowkey")
    +            .name("scanhbase-end-rowkey")
    +            .description("The row key to end scan by.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor TIME_RANGE_MIN = new PropertyDescriptor.Builder()
    +            .displayName("Time range min")
    +            .name("scanhbase-time-range-min")
    +            .description("Time range min value. Both min and max values for time range should be either blank or provided.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.LONG_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor TIME_RANGE_MAX = new PropertyDescriptor.Builder()
    +            .displayName("Time range max")
    +            .name("scanhbase-time-range-max")
    +            .description("Time range max value. Both min and max values for time range should be either blank or provided.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.LONG_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor LIMIT_ROWS = new PropertyDescriptor.Builder()
    +            .displayName("Limit rows")
    +            .name("scanhbase-limit")
    +            .description("Limit number of rows retrieved by scan.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor BULK_SIZE = new PropertyDescriptor.Builder()
    +            .displayName("Max rows per flow file")
    +            .name("scanhbase-bulk-size")
    +            .description("Limits number of rows in single flow file content. Set to 0 to avoid multiple flow files.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +            .build();
    +    
    +    
    +    static final PropertyDescriptor REVERSED_SCAN = new PropertyDescriptor.Builder()
    +            .displayName("Reversed order")
    +            .name("scanhbase-reversed-order")
    +            .description("Set whether this scan is a reversed one. This is false by default which means forward(normal) scan.")
    +            .expressionLanguageSupported(false)
    +            .allowableValues("true", "false")
    +            .required(false)
    +            .defaultValue("false")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    --- End diff --
    
    Agree. Although it doesn't make any difference (due to allowable values), but looks nicer with boolean validator.


---

[GitHub] nifi pull request #2446: NIFI-4833 Add ScanHBase processor

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2446#discussion_r165338502
  
    --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java ---
    @@ -0,0 +1,564 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.hbase;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.hbase.io.JsonFullRowSerializer;
    +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer;
    +import org.apache.nifi.hbase.io.RowSerializer;
    +import org.apache.nifi.hbase.scan.Column;
    +import org.apache.nifi.hbase.scan.ResultCell;
    +import org.apache.nifi.hbase.scan.ResultHandler;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.util.Tuple;
    +
    +import java.io.IOException;
    +import java.nio.charset.Charset;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.regex.Pattern;
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@Tags({"hbase", "scan", "fetch", "get"})
    +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end ),"
    +        + "by time range, by filter expression, or any combination of them. \n"
    +        + "Order of records can be controlled by a property <code>Reversed</code>"
    +        + "Number of rows retrieved by the processor can be limited.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"),
    +        @WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."),
    +        @WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"),
    +        @WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"),
    +        @WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions. <br/>Could be null (not present) if transfered to FAILURE")
    +})
    +public class ScanHBase extends AbstractProcessor {
    +	//enhanced regex for columns to allow "-" in column qualifier names
    +    static final Pattern COLUMNS_PATTERN = Pattern.compile("\\w+(:(\\w|-)+)?(?:,\\w+(:(\\w|-)+)?)*");
    +    static final byte[] nl = System.lineSeparator().getBytes();
    +
    +    static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder()
    +            .displayName("HBase Client Service")
    +            .name("scanhbase-client-service")
    +            .description("Specifies the Controller Service to use for accessing HBase.")
    +            .required(true)
    +            .identifiesControllerService(HBaseClientService.class)
    +            .build();
    +
    +    static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
    +            .displayName("Table Name")
    +            .name("scanhbase-table-name")
    +            .description("The name of the HBase Table to fetch from.")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor START_ROW = new PropertyDescriptor.Builder()
    +            .displayName("Start rowkey")
    +            .name("scanhbase-start-rowkey")
    +            .description("The rowkey to start scan from.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor END_ROW = new PropertyDescriptor.Builder()
    +            .displayName("End rowkey")
    +            .name("scanhbase-end-rowkey")
    +            .description("The row key to end scan by.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor TIME_RANGE_MIN = new PropertyDescriptor.Builder()
    +            .displayName("Time range min")
    +            .name("scanhbase-time-range-min")
    +            .description("Time range min value. Both min and max values for time range should be either blank or provided.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.LONG_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor TIME_RANGE_MAX = new PropertyDescriptor.Builder()
    +            .displayName("Time range max")
    +            .name("scanhbase-time-range-max")
    +            .description("Time range max value. Both min and max values for time range should be either blank or provided.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.LONG_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor LIMIT_ROWS = new PropertyDescriptor.Builder()
    +            .displayName("Limit rows")
    +            .name("scanhbase-limit")
    +            .description("Limit number of rows retrieved by scan.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor BULK_SIZE = new PropertyDescriptor.Builder()
    +            .displayName("Max rows per flow file")
    +            .name("scanhbase-bulk-size")
    +            .description("Limits number of rows in single flow file content. Set to 0 to avoid multiple flow files.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +            .build();
    +    
    +    
    +    static final PropertyDescriptor REVERSED_SCAN = new PropertyDescriptor.Builder()
    +            .displayName("Reversed order")
    +            .name("scanhbase-reversed-order")
    +            .description("Set whether this scan is a reversed one. This is false by default which means forward(normal) scan.")
    +            .expressionLanguageSupported(false)
    +            .allowableValues("true", "false")
    +            .required(false)
    +            .defaultValue("false")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor FILTER_EXPRESSION = new PropertyDescriptor.Builder()
    +            .displayName("Filter expression")
    +            .name("scanhbase-filter-expression")
    +            .description("An HBase filter expression that will be applied to the scan. This property can not be used when also using the Columns property.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor COLUMNS = new PropertyDescriptor.Builder()
    +            .displayName("Columns")
    +            .name("scanhbase-columns")
    +            .description("An optional comma-separated list of \"<colFamily>:<colQualifier>\" pairs to fetch. To return all columns " +
    +                    "for a given family, leave off the qualifier such as \"<colFamily1>,<colFamily2>\".")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.createRegexMatchingValidator(COLUMNS_PATTERN))
    +            .build();
    +
    +    static final AllowableValue JSON_FORMAT_FULL_ROW = new AllowableValue("full-row", "full-row",
    +            "Creates a JSON document with the format: {\"row\":<row-id>, \"cells\":[{\"fam\":<col-fam>, \"qual\":<col-val>, \"val\":<value>, \"ts\":<timestamp>}]}.");
    +    static final AllowableValue JSON_FORMAT_QUALIFIER_AND_VALUE = new AllowableValue("col-qual-and-val", "col-qual-and-val",
    +            "Creates a JSON document with the format: {\"<col-qual>\":\"<value>\", \"<col-qual>\":\"<value>\".");
    +
    +    static final PropertyDescriptor JSON_FORMAT = new PropertyDescriptor.Builder()
    +            .displayName("JSON Format")
    +            .name("scanhbase-json-format")
    +            .description("Specifies how to represent the HBase row as a JSON document.")
    +            .required(true)
    +            .allowableValues(JSON_FORMAT_FULL_ROW, JSON_FORMAT_QUALIFIER_AND_VALUE)
    +            .defaultValue(JSON_FORMAT_FULL_ROW.getValue())
    +            .build();
    +
    +    static final PropertyDescriptor DECODE_CHARSET = new PropertyDescriptor.Builder()
    +            .displayName("Decode Character Set")
    +            .name("scanhbase-decode-charset")
    +            .description("The character set used to decode data from HBase.")
    +            .required(true)
    +            .defaultValue("UTF-8")
    +            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor ENCODE_CHARSET = new PropertyDescriptor.Builder()
    +            .displayName("Encode Character Set")
    +            .name("scanhbase-encode-charset")
    +            .description("The character set used to encode the JSON representation of the row.")
    +            .required(true)
    +            .defaultValue("UTF-8")
    +            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
    +            .build();
    +
    +    public static final Relationship REL_ORIGINAL = new Relationship.Builder()
    +            .name("original")
    +            .description("The original input file will be routed to this destination, even if no rows are retrieved based on provided conditions.")
    +            .build();
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("All successful fetches are routed to this relationship.")
    +            .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("All failed fetches are routed to this relationship.")
    +            .build();
    +
    +    static final String HBASE_TABLE_ATTR = "hbase.table";
    +    static final String HBASE_ROWS_COUNT_ATTR = "hbase.rows.count";
    +
    +    static final List<PropertyDescriptor> properties;
    +    static {
    +        List<PropertyDescriptor> props = new ArrayList<>();
    +        props.add(HBASE_CLIENT_SERVICE);
    +        props.add(TABLE_NAME);
    +        props.add(START_ROW);
    +        props.add(END_ROW);
    +        props.add(TIME_RANGE_MIN);
    +        props.add(TIME_RANGE_MAX);
    +        props.add(LIMIT_ROWS);
    +        props.add(REVERSED_SCAN);
    +        props.add(BULK_SIZE);
    +        props.add(FILTER_EXPRESSION);
    +        props.add(COLUMNS);
    +        props.add(JSON_FORMAT);
    +        props.add(ENCODE_CHARSET);
    +        props.add(DECODE_CHARSET);
    +        properties = Collections.unmodifiableList(props);
    +    }
    +
    +    static final Set<Relationship> relationships;
    +    static {
    +        Set<Relationship> rels = new HashSet<>();
    +        rels.add(REL_SUCCESS);
    +        rels.add(REL_ORIGINAL);
    +        rels.add(REL_FAILURE);
    +        relationships = Collections.unmodifiableSet(rels);
    +    }
    +
    +    private volatile Charset decodeCharset;
    +    private volatile Charset encodeCharset;
    +    private RowSerializer serializer = null;
    +    
    +
    +    @OnScheduled
    +    public void onScheduled(ProcessContext context) {
    +        this.decodeCharset = Charset.forName(context.getProperty(DECODE_CHARSET).getValue());
    +        this.encodeCharset = Charset.forName(context.getProperty(ENCODE_CHARSET).getValue());
    +        
    +    	final String jsonFormat = context.getProperty(JSON_FORMAT).getValue();
    +        if (jsonFormat.equals(JSON_FORMAT_FULL_ROW.getValue())) {
    +            this.serializer = new JsonFullRowSerializer(decodeCharset, encodeCharset);
    +        } else {
    +            this.serializer = new JsonQualifierAndValueRowSerializer(decodeCharset, encodeCharset);
    +        }
    +    }
    +    
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
    +
    +        final List<ValidationResult> problems = new ArrayList<>();
    +
    +        final String columns = validationContext.getProperty(COLUMNS).getValue();
    +        final String filter = validationContext.getProperty(FILTER_EXPRESSION).getValue();
    +
    +        if (!StringUtils.isBlank(columns) && !StringUtils.isBlank(filter)) {
    +            problems.add(new ValidationResult.Builder()
    +                    .subject(FILTER_EXPRESSION.getDisplayName())
    +                    .input(filter).valid(false)
    +                    .explanation("a filter expression can not be used in conjunction with the Columns property")
    --- End diff --
    
    > a filter expression can not be used in conjunction with the Columns property
    
    That should read:
    
    > A filter expression can not be used in conjunction with the Columns property.


---

[GitHub] nifi pull request #2446: NIFI-4833 Add ScanHBase processor

Posted by bdesert <gi...@git.apache.org>.
Github user bdesert commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2446#discussion_r165449601
  
    --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java ---
    @@ -0,0 +1,564 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.hbase;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.hbase.io.JsonFullRowSerializer;
    +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer;
    +import org.apache.nifi.hbase.io.RowSerializer;
    +import org.apache.nifi.hbase.scan.Column;
    +import org.apache.nifi.hbase.scan.ResultCell;
    +import org.apache.nifi.hbase.scan.ResultHandler;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.util.Tuple;
    +
    +import java.io.IOException;
    +import java.nio.charset.Charset;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.regex.Pattern;
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@Tags({"hbase", "scan", "fetch", "get"})
    +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end ),"
    +        + "by time range, by filter expression, or any combination of them. \n"
    +        + "Order of records can be controlled by a property <code>Reversed</code>"
    +        + "Number of rows retrieved by the processor can be limited.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"),
    +        @WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."),
    +        @WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"),
    +        @WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"),
    +        @WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions. <br/>Could be null (not present) if transfered to FAILURE")
    +})
    +public class ScanHBase extends AbstractProcessor {
    +	//enhanced regex for columns to allow "-" in column qualifier names
    +    static final Pattern COLUMNS_PATTERN = Pattern.compile("\\w+(:(\\w|-)+)?(?:,\\w+(:(\\w|-)+)?)*");
    +    static final byte[] nl = System.lineSeparator().getBytes();
    +
    +    static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder()
    +            .displayName("HBase Client Service")
    +            .name("scanhbase-client-service")
    +            .description("Specifies the Controller Service to use for accessing HBase.")
    +            .required(true)
    +            .identifiesControllerService(HBaseClientService.class)
    +            .build();
    +
    +    static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
    +            .displayName("Table Name")
    +            .name("scanhbase-table-name")
    +            .description("The name of the HBase Table to fetch from.")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor START_ROW = new PropertyDescriptor.Builder()
    +            .displayName("Start rowkey")
    +            .name("scanhbase-start-rowkey")
    +            .description("The rowkey to start scan from.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor END_ROW = new PropertyDescriptor.Builder()
    +            .displayName("End rowkey")
    +            .name("scanhbase-end-rowkey")
    +            .description("The row key to end scan by.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor TIME_RANGE_MIN = new PropertyDescriptor.Builder()
    +            .displayName("Time range min")
    +            .name("scanhbase-time-range-min")
    +            .description("Time range min value. Both min and max values for time range should be either blank or provided.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.LONG_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor TIME_RANGE_MAX = new PropertyDescriptor.Builder()
    +            .displayName("Time range max")
    +            .name("scanhbase-time-range-max")
    +            .description("Time range max value. Both min and max values for time range should be either blank or provided.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.LONG_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor LIMIT_ROWS = new PropertyDescriptor.Builder()
    +            .displayName("Limit rows")
    +            .name("scanhbase-limit")
    +            .description("Limit number of rows retrieved by scan.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor BULK_SIZE = new PropertyDescriptor.Builder()
    +            .displayName("Max rows per flow file")
    +            .name("scanhbase-bulk-size")
    +            .description("Limits number of rows in single flow file content. Set to 0 to avoid multiple flow files.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +            .build();
    +    
    +    
    +    static final PropertyDescriptor REVERSED_SCAN = new PropertyDescriptor.Builder()
    +            .displayName("Reversed order")
    +            .name("scanhbase-reversed-order")
    +            .description("Set whether this scan is a reversed one. This is false by default which means forward(normal) scan.")
    +            .expressionLanguageSupported(false)
    +            .allowableValues("true", "false")
    +            .required(false)
    +            .defaultValue("false")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor FILTER_EXPRESSION = new PropertyDescriptor.Builder()
    +            .displayName("Filter expression")
    +            .name("scanhbase-filter-expression")
    +            .description("An HBase filter expression that will be applied to the scan. This property can not be used when also using the Columns property.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor COLUMNS = new PropertyDescriptor.Builder()
    +            .displayName("Columns")
    +            .name("scanhbase-columns")
    +            .description("An optional comma-separated list of \"<colFamily>:<colQualifier>\" pairs to fetch. To return all columns " +
    +                    "for a given family, leave off the qualifier such as \"<colFamily1>,<colFamily2>\".")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.createRegexMatchingValidator(COLUMNS_PATTERN))
    +            .build();
    +
    +    static final AllowableValue JSON_FORMAT_FULL_ROW = new AllowableValue("full-row", "full-row",
    +            "Creates a JSON document with the format: {\"row\":<row-id>, \"cells\":[{\"fam\":<col-fam>, \"qual\":<col-val>, \"val\":<value>, \"ts\":<timestamp>}]}.");
    +    static final AllowableValue JSON_FORMAT_QUALIFIER_AND_VALUE = new AllowableValue("col-qual-and-val", "col-qual-and-val",
    +            "Creates a JSON document with the format: {\"<col-qual>\":\"<value>\", \"<col-qual>\":\"<value>\".");
    +
    +    static final PropertyDescriptor JSON_FORMAT = new PropertyDescriptor.Builder()
    +            .displayName("JSON Format")
    +            .name("scanhbase-json-format")
    +            .description("Specifies how to represent the HBase row as a JSON document.")
    +            .required(true)
    +            .allowableValues(JSON_FORMAT_FULL_ROW, JSON_FORMAT_QUALIFIER_AND_VALUE)
    +            .defaultValue(JSON_FORMAT_FULL_ROW.getValue())
    +            .build();
    +
    +    static final PropertyDescriptor DECODE_CHARSET = new PropertyDescriptor.Builder()
    +            .displayName("Decode Character Set")
    +            .name("scanhbase-decode-charset")
    +            .description("The character set used to decode data from HBase.")
    +            .required(true)
    +            .defaultValue("UTF-8")
    +            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor ENCODE_CHARSET = new PropertyDescriptor.Builder()
    +            .displayName("Encode Character Set")
    +            .name("scanhbase-encode-charset")
    +            .description("The character set used to encode the JSON representation of the row.")
    +            .required(true)
    +            .defaultValue("UTF-8")
    +            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
    +            .build();
    +
    +    public static final Relationship REL_ORIGINAL = new Relationship.Builder()
    +            .name("original")
    +            .description("The original input file will be routed to this destination, even if no rows are retrieved based on provided conditions.")
    +            .build();
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("All successful fetches are routed to this relationship.")
    +            .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("All failed fetches are routed to this relationship.")
    +            .build();
    +
    +    static final String HBASE_TABLE_ATTR = "hbase.table";
    +    static final String HBASE_ROWS_COUNT_ATTR = "hbase.rows.count";
    +
    +    static final List<PropertyDescriptor> properties;
    +    static {
    +        List<PropertyDescriptor> props = new ArrayList<>();
    +        props.add(HBASE_CLIENT_SERVICE);
    +        props.add(TABLE_NAME);
    +        props.add(START_ROW);
    +        props.add(END_ROW);
    +        props.add(TIME_RANGE_MIN);
    +        props.add(TIME_RANGE_MAX);
    +        props.add(LIMIT_ROWS);
    +        props.add(REVERSED_SCAN);
    +        props.add(BULK_SIZE);
    +        props.add(FILTER_EXPRESSION);
    +        props.add(COLUMNS);
    +        props.add(JSON_FORMAT);
    +        props.add(ENCODE_CHARSET);
    +        props.add(DECODE_CHARSET);
    +        properties = Collections.unmodifiableList(props);
    +    }
    +
    +    static final Set<Relationship> relationships;
    +    static {
    +        Set<Relationship> rels = new HashSet<>();
    +        rels.add(REL_SUCCESS);
    +        rels.add(REL_ORIGINAL);
    +        rels.add(REL_FAILURE);
    +        relationships = Collections.unmodifiableSet(rels);
    +    }
    +
    +    private volatile Charset decodeCharset;
    +    private volatile Charset encodeCharset;
    +    private RowSerializer serializer = null;
    +    
    +
    +    @OnScheduled
    +    public void onScheduled(ProcessContext context) {
    +        this.decodeCharset = Charset.forName(context.getProperty(DECODE_CHARSET).getValue());
    +        this.encodeCharset = Charset.forName(context.getProperty(ENCODE_CHARSET).getValue());
    +        
    +    	final String jsonFormat = context.getProperty(JSON_FORMAT).getValue();
    +        if (jsonFormat.equals(JSON_FORMAT_FULL_ROW.getValue())) {
    +            this.serializer = new JsonFullRowSerializer(decodeCharset, encodeCharset);
    +        } else {
    +            this.serializer = new JsonQualifierAndValueRowSerializer(decodeCharset, encodeCharset);
    +        }
    +    }
    +    
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
    +
    +        final List<ValidationResult> problems = new ArrayList<>();
    +
    +        final String columns = validationContext.getProperty(COLUMNS).getValue();
    +        final String filter = validationContext.getProperty(FILTER_EXPRESSION).getValue();
    +
    +        if (!StringUtils.isBlank(columns) && !StringUtils.isBlank(filter)) {
    +            problems.add(new ValidationResult.Builder()
    +                    .subject(FILTER_EXPRESSION.getDisplayName())
    +                    .input(filter).valid(false)
    +                    .explanation("a filter expression can not be used in conjunction with the Columns property")
    --- End diff --
    
    👍 


---

[GitHub] nifi issue #2446: NIFI-4833 Add ScanHBase processor

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2446
  
    I have a patch that I've been working on for adding support for HBase visibility labels to the existing processors. Might want to think about how to integrate that into this processor.


---

[GitHub] nifi pull request #2446: NIFI-4833 Add ScanHBase processor

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2446#discussion_r165304550
  
    --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java ---
    @@ -0,0 +1,564 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.hbase;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.hbase.io.JsonFullRowSerializer;
    +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer;
    +import org.apache.nifi.hbase.io.RowSerializer;
    +import org.apache.nifi.hbase.scan.Column;
    +import org.apache.nifi.hbase.scan.ResultCell;
    +import org.apache.nifi.hbase.scan.ResultHandler;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.util.Tuple;
    +
    +import java.io.IOException;
    +import java.nio.charset.Charset;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.regex.Pattern;
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@Tags({"hbase", "scan", "fetch", "get"})
    +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end ),"
    +        + "by time range, by filter expression, or any combination of them. \n"
    +        + "Order of records can be controlled by a property <code>Reversed</code>"
    +        + "Number of rows retrieved by the processor can be limited.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"),
    +        @WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."),
    +        @WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"),
    +        @WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"),
    +        @WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions. <br/>Could be null (not present) if transfered to FAILURE")
    +})
    +public class ScanHBase extends AbstractProcessor {
    +	//enhanced regex for columns to allow "-" in column qualifier names
    +    static final Pattern COLUMNS_PATTERN = Pattern.compile("\\w+(:(\\w|-)+)?(?:,\\w+(:(\\w|-)+)?)*");
    +    static final byte[] nl = System.lineSeparator().getBytes();
    +
    +    static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder()
    +            .displayName("HBase Client Service")
    +            .name("scanhbase-client-service")
    +            .description("Specifies the Controller Service to use for accessing HBase.")
    +            .required(true)
    +            .identifiesControllerService(HBaseClientService.class)
    +            .build();
    +
    +    static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
    +            .displayName("Table Name")
    +            .name("scanhbase-table-name")
    +            .description("The name of the HBase Table to fetch from.")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor START_ROW = new PropertyDescriptor.Builder()
    +            .displayName("Start rowkey")
    +            .name("scanhbase-start-rowkey")
    +            .description("The rowkey to start scan from.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor END_ROW = new PropertyDescriptor.Builder()
    +            .displayName("End rowkey")
    +            .name("scanhbase-end-rowkey")
    +            .description("The row key to end scan by.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor TIME_RANGE_MIN = new PropertyDescriptor.Builder()
    +            .displayName("Time range min")
    +            .name("scanhbase-time-range-min")
    +            .description("Time range min value. Both min and max values for time range should be either blank or provided.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.LONG_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor TIME_RANGE_MAX = new PropertyDescriptor.Builder()
    +            .displayName("Time range max")
    +            .name("scanhbase-time-range-max")
    +            .description("Time range max value. Both min and max values for time range should be either blank or provided.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.LONG_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor LIMIT_ROWS = new PropertyDescriptor.Builder()
    +            .displayName("Limit rows")
    +            .name("scanhbase-limit")
    +            .description("Limit number of rows retrieved by scan.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor BULK_SIZE = new PropertyDescriptor.Builder()
    +            .displayName("Max rows per flow file")
    +            .name("scanhbase-bulk-size")
    +            .description("Limits number of rows in single flow file content. Set to 0 to avoid multiple flow files.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +            .build();
    +    
    +    
    +    static final PropertyDescriptor REVERSED_SCAN = new PropertyDescriptor.Builder()
    +            .displayName("Reversed order")
    +            .name("scanhbase-reversed-order")
    +            .description("Set whether this scan is a reversed one. This is false by default which means forward(normal) scan.")
    +            .expressionLanguageSupported(false)
    +            .allowableValues("true", "false")
    --- End diff --
    
    Using the AllowableValue type here is generally better than strings.


---

[GitHub] nifi pull request #2446: NIFI-4833 Add ScanHBase processor

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2446#discussion_r165304841
  
    --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java ---
    @@ -0,0 +1,564 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.hbase;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.hbase.io.JsonFullRowSerializer;
    +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer;
    +import org.apache.nifi.hbase.io.RowSerializer;
    +import org.apache.nifi.hbase.scan.Column;
    +import org.apache.nifi.hbase.scan.ResultCell;
    +import org.apache.nifi.hbase.scan.ResultHandler;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.util.Tuple;
    +
    +import java.io.IOException;
    +import java.nio.charset.Charset;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.regex.Pattern;
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@Tags({"hbase", "scan", "fetch", "get"})
    +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end ),"
    +        + "by time range, by filter expression, or any combination of them. \n"
    +        + "Order of records can be controlled by a property <code>Reversed</code>"
    +        + "Number of rows retrieved by the processor can be limited.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"),
    +        @WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."),
    +        @WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"),
    +        @WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"),
    +        @WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions. <br/>Could be null (not present) if transfered to FAILURE")
    +})
    +public class ScanHBase extends AbstractProcessor {
    +	//enhanced regex for columns to allow "-" in column qualifier names
    +    static final Pattern COLUMNS_PATTERN = Pattern.compile("\\w+(:(\\w|-)+)?(?:,\\w+(:(\\w|-)+)?)*");
    +    static final byte[] nl = System.lineSeparator().getBytes();
    +
    +    static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder()
    +            .displayName("HBase Client Service")
    +            .name("scanhbase-client-service")
    +            .description("Specifies the Controller Service to use for accessing HBase.")
    +            .required(true)
    +            .identifiesControllerService(HBaseClientService.class)
    +            .build();
    +
    +    static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
    +            .displayName("Table Name")
    +            .name("scanhbase-table-name")
    +            .description("The name of the HBase Table to fetch from.")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor START_ROW = new PropertyDescriptor.Builder()
    +            .displayName("Start rowkey")
    +            .name("scanhbase-start-rowkey")
    +            .description("The rowkey to start scan from.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor END_ROW = new PropertyDescriptor.Builder()
    +            .displayName("End rowkey")
    +            .name("scanhbase-end-rowkey")
    +            .description("The row key to end scan by.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor TIME_RANGE_MIN = new PropertyDescriptor.Builder()
    +            .displayName("Time range min")
    +            .name("scanhbase-time-range-min")
    +            .description("Time range min value. Both min and max values for time range should be either blank or provided.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.LONG_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor TIME_RANGE_MAX = new PropertyDescriptor.Builder()
    +            .displayName("Time range max")
    +            .name("scanhbase-time-range-max")
    +            .description("Time range max value. Both min and max values for time range should be either blank or provided.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.LONG_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor LIMIT_ROWS = new PropertyDescriptor.Builder()
    +            .displayName("Limit rows")
    +            .name("scanhbase-limit")
    +            .description("Limit number of rows retrieved by scan.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor BULK_SIZE = new PropertyDescriptor.Builder()
    +            .displayName("Max rows per flow file")
    +            .name("scanhbase-bulk-size")
    +            .description("Limits number of rows in single flow file content. Set to 0 to avoid multiple flow files.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +            .build();
    +    
    +    
    +    static final PropertyDescriptor REVERSED_SCAN = new PropertyDescriptor.Builder()
    +            .displayName("Reversed order")
    +            .name("scanhbase-reversed-order")
    +            .description("Set whether this scan is a reversed one. This is false by default which means forward(normal) scan.")
    +            .expressionLanguageSupported(false)
    +            .allowableValues("true", "false")
    +            .required(false)
    +            .defaultValue("false")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor FILTER_EXPRESSION = new PropertyDescriptor.Builder()
    +            .displayName("Filter expression")
    +            .name("scanhbase-filter-expression")
    +            .description("An HBase filter expression that will be applied to the scan. This property can not be used when also using the Columns property.")
    --- End diff --
    
    It would be a good idea to provide a simple example in the description of a filter expression for new users.


---

[GitHub] nifi issue #2446: NIFI-4833 Add ScanHBase processor

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2446
  
    I'll take a stab at reviewing, but you should git cherry pick this onto a git branch and not do it off your master branch.


---

[GitHub] nifi pull request #2446: NIFI-4833 Add ScanHBase processor

Posted by bdesert <gi...@git.apache.org>.
Github user bdesert commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2446#discussion_r165448494
  
    --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java ---
    @@ -0,0 +1,564 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.hbase;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.hbase.io.JsonFullRowSerializer;
    +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer;
    +import org.apache.nifi.hbase.io.RowSerializer;
    +import org.apache.nifi.hbase.scan.Column;
    +import org.apache.nifi.hbase.scan.ResultCell;
    +import org.apache.nifi.hbase.scan.ResultHandler;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.util.Tuple;
    +
    +import java.io.IOException;
    +import java.nio.charset.Charset;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.regex.Pattern;
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@Tags({"hbase", "scan", "fetch", "get"})
    +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end ),"
    +        + "by time range, by filter expression, or any combination of them. \n"
    +        + "Order of records can be controlled by a property <code>Reversed</code>"
    +        + "Number of rows retrieved by the processor can be limited.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"),
    +        @WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."),
    +        @WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"),
    +        @WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"),
    +        @WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions. <br/>Could be null (not present) if transfered to FAILURE")
    +})
    +public class ScanHBase extends AbstractProcessor {
    +	//enhanced regex for columns to allow "-" in column qualifier names
    +    static final Pattern COLUMNS_PATTERN = Pattern.compile("\\w+(:(\\w|-)+)?(?:,\\w+(:(\\w|-)+)?)*");
    +    static final byte[] nl = System.lineSeparator().getBytes();
    +
    +    static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder()
    +            .displayName("HBase Client Service")
    +            .name("scanhbase-client-service")
    +            .description("Specifies the Controller Service to use for accessing HBase.")
    +            .required(true)
    +            .identifiesControllerService(HBaseClientService.class)
    +            .build();
    +
    +    static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
    +            .displayName("Table Name")
    +            .name("scanhbase-table-name")
    +            .description("The name of the HBase Table to fetch from.")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor START_ROW = new PropertyDescriptor.Builder()
    +            .displayName("Start rowkey")
    +            .name("scanhbase-start-rowkey")
    +            .description("The rowkey to start scan from.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor END_ROW = new PropertyDescriptor.Builder()
    +            .displayName("End rowkey")
    +            .name("scanhbase-end-rowkey")
    +            .description("The row key to end scan by.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor TIME_RANGE_MIN = new PropertyDescriptor.Builder()
    +            .displayName("Time range min")
    +            .name("scanhbase-time-range-min")
    +            .description("Time range min value. Both min and max values for time range should be either blank or provided.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.LONG_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor TIME_RANGE_MAX = new PropertyDescriptor.Builder()
    +            .displayName("Time range max")
    +            .name("scanhbase-time-range-max")
    +            .description("Time range max value. Both min and max values for time range should be either blank or provided.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.LONG_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor LIMIT_ROWS = new PropertyDescriptor.Builder()
    +            .displayName("Limit rows")
    +            .name("scanhbase-limit")
    +            .description("Limit number of rows retrieved by scan.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor BULK_SIZE = new PropertyDescriptor.Builder()
    +            .displayName("Max rows per flow file")
    +            .name("scanhbase-bulk-size")
    +            .description("Limits number of rows in single flow file content. Set to 0 to avoid multiple flow files.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +            .build();
    +    
    +    
    +    static final PropertyDescriptor REVERSED_SCAN = new PropertyDescriptor.Builder()
    +            .displayName("Reversed order")
    +            .name("scanhbase-reversed-order")
    +            .description("Set whether this scan is a reversed one. This is false by default which means forward(normal) scan.")
    +            .expressionLanguageSupported(false)
    +            .allowableValues("true", "false")
    +            .required(false)
    +            .defaultValue("false")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor FILTER_EXPRESSION = new PropertyDescriptor.Builder()
    +            .displayName("Filter expression")
    +            .name("scanhbase-filter-expression")
    +            .description("An HBase filter expression that will be applied to the scan. This property can not be used when also using the Columns property.")
    --- End diff --
    
    It's basically regular syntax of hbase shell filters. I don't parse them and just pass to HBase client as is. so if there is any change in future version to the filter i'll provide as an example, that example won't be valid, while processor itself will still be OK. That was my concern. If you still think it's would be better to have an example, I'll add it. Let me know.


---

[GitHub] nifi pull request #2446: NIFI-4833 Add ScanHBase processor

Posted by bdesert <gi...@git.apache.org>.
Github user bdesert commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2446#discussion_r165449486
  
    --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java ---
    @@ -0,0 +1,564 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.hbase;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.hbase.io.JsonFullRowSerializer;
    +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer;
    +import org.apache.nifi.hbase.io.RowSerializer;
    +import org.apache.nifi.hbase.scan.Column;
    +import org.apache.nifi.hbase.scan.ResultCell;
    +import org.apache.nifi.hbase.scan.ResultHandler;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.util.Tuple;
    +
    +import java.io.IOException;
    +import java.nio.charset.Charset;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.regex.Pattern;
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@Tags({"hbase", "scan", "fetch", "get"})
    +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end ),"
    +        + "by time range, by filter expression, or any combination of them. \n"
    +        + "Order of records can be controlled by a property <code>Reversed</code>"
    +        + "Number of rows retrieved by the processor can be limited.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"),
    +        @WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."),
    +        @WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"),
    +        @WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"),
    +        @WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions. <br/>Could be null (not present) if transfered to FAILURE")
    +})
    +public class ScanHBase extends AbstractProcessor {
    +	//enhanced regex for columns to allow "-" in column qualifier names
    +    static final Pattern COLUMNS_PATTERN = Pattern.compile("\\w+(:(\\w|-)+)?(?:,\\w+(:(\\w|-)+)?)*");
    +    static final byte[] nl = System.lineSeparator().getBytes();
    +
    +    static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder()
    +            .displayName("HBase Client Service")
    +            .name("scanhbase-client-service")
    +            .description("Specifies the Controller Service to use for accessing HBase.")
    +            .required(true)
    +            .identifiesControllerService(HBaseClientService.class)
    +            .build();
    +
    +    static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
    +            .displayName("Table Name")
    +            .name("scanhbase-table-name")
    +            .description("The name of the HBase Table to fetch from.")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor START_ROW = new PropertyDescriptor.Builder()
    +            .displayName("Start rowkey")
    +            .name("scanhbase-start-rowkey")
    +            .description("The rowkey to start scan from.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor END_ROW = new PropertyDescriptor.Builder()
    +            .displayName("End rowkey")
    +            .name("scanhbase-end-rowkey")
    +            .description("The row key to end scan by.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor TIME_RANGE_MIN = new PropertyDescriptor.Builder()
    +            .displayName("Time range min")
    +            .name("scanhbase-time-range-min")
    +            .description("Time range min value. Both min and max values for time range should be either blank or provided.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.LONG_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor TIME_RANGE_MAX = new PropertyDescriptor.Builder()
    +            .displayName("Time range max")
    +            .name("scanhbase-time-range-max")
    +            .description("Time range max value. Both min and max values for time range should be either blank or provided.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.LONG_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor LIMIT_ROWS = new PropertyDescriptor.Builder()
    +            .displayName("Limit rows")
    +            .name("scanhbase-limit")
    +            .description("Limit number of rows retrieved by scan.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor BULK_SIZE = new PropertyDescriptor.Builder()
    +            .displayName("Max rows per flow file")
    +            .name("scanhbase-bulk-size")
    +            .description("Limits number of rows in single flow file content. Set to 0 to avoid multiple flow files.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +            .build();
    +    
    +    
    +    static final PropertyDescriptor REVERSED_SCAN = new PropertyDescriptor.Builder()
    +            .displayName("Reversed order")
    +            .name("scanhbase-reversed-order")
    +            .description("Set whether this scan is a reversed one. This is false by default which means forward(normal) scan.")
    +            .expressionLanguageSupported(false)
    +            .allowableValues("true", "false")
    --- End diff --
    
    if there were constant defined for boolean types, I would agree. But in this case, I would keep it this way, it's just cleaner and less lines 


---

[GitHub] nifi pull request #2446: NIFI-4833 Add ScanHBase processor

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2446#discussion_r165304629
  
    --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java ---
    @@ -0,0 +1,564 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.hbase;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.hbase.io.JsonFullRowSerializer;
    +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer;
    +import org.apache.nifi.hbase.io.RowSerializer;
    +import org.apache.nifi.hbase.scan.Column;
    +import org.apache.nifi.hbase.scan.ResultCell;
    +import org.apache.nifi.hbase.scan.ResultHandler;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.util.Tuple;
    +
    +import java.io.IOException;
    +import java.nio.charset.Charset;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.regex.Pattern;
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@Tags({"hbase", "scan", "fetch", "get"})
    +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end ),"
    +        + "by time range, by filter expression, or any combination of them. \n"
    +        + "Order of records can be controlled by a property <code>Reversed</code>"
    +        + "Number of rows retrieved by the processor can be limited.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"),
    +        @WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."),
    +        @WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"),
    +        @WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"),
    +        @WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions. <br/>Could be null (not present) if transfered to FAILURE")
    +})
    +public class ScanHBase extends AbstractProcessor {
    +	//enhanced regex for columns to allow "-" in column qualifier names
    +    static final Pattern COLUMNS_PATTERN = Pattern.compile("\\w+(:(\\w|-)+)?(?:,\\w+(:(\\w|-)+)?)*");
    +    static final byte[] nl = System.lineSeparator().getBytes();
    +
    +    static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder()
    +            .displayName("HBase Client Service")
    +            .name("scanhbase-client-service")
    +            .description("Specifies the Controller Service to use for accessing HBase.")
    +            .required(true)
    +            .identifiesControllerService(HBaseClientService.class)
    +            .build();
    +
    +    static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
    +            .displayName("Table Name")
    +            .name("scanhbase-table-name")
    +            .description("The name of the HBase Table to fetch from.")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor START_ROW = new PropertyDescriptor.Builder()
    +            .displayName("Start rowkey")
    +            .name("scanhbase-start-rowkey")
    +            .description("The rowkey to start scan from.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor END_ROW = new PropertyDescriptor.Builder()
    +            .displayName("End rowkey")
    +            .name("scanhbase-end-rowkey")
    +            .description("The row key to end scan by.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor TIME_RANGE_MIN = new PropertyDescriptor.Builder()
    +            .displayName("Time range min")
    +            .name("scanhbase-time-range-min")
    +            .description("Time range min value. Both min and max values for time range should be either blank or provided.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.LONG_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor TIME_RANGE_MAX = new PropertyDescriptor.Builder()
    +            .displayName("Time range max")
    +            .name("scanhbase-time-range-max")
    +            .description("Time range max value. Both min and max values for time range should be either blank or provided.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.LONG_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor LIMIT_ROWS = new PropertyDescriptor.Builder()
    +            .displayName("Limit rows")
    +            .name("scanhbase-limit")
    +            .description("Limit number of rows retrieved by scan.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor BULK_SIZE = new PropertyDescriptor.Builder()
    +            .displayName("Max rows per flow file")
    +            .name("scanhbase-bulk-size")
    +            .description("Limits number of rows in single flow file content. Set to 0 to avoid multiple flow files.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +            .build();
    +    
    +    
    +    static final PropertyDescriptor REVERSED_SCAN = new PropertyDescriptor.Builder()
    +            .displayName("Reversed order")
    +            .name("scanhbase-reversed-order")
    +            .description("Set whether this scan is a reversed one. This is false by default which means forward(normal) scan.")
    +            .expressionLanguageSupported(false)
    +            .allowableValues("true", "false")
    +            .required(false)
    +            .defaultValue("false")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    --- End diff --
    
    Should be BOOLEAN_VALIDATOR


---