You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by bbende <gi...@git.apache.org> on 2017/01/04 21:11:32 UTC

[GitHub] nifi pull request #1349: NIFI-1784 Initial commit for FetchHBaseRow processo...

Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1349#discussion_r94665770
  
    --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/FetchHBaseRow.java ---
    @@ -0,0 +1,407 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.hbase;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer;
    +import org.apache.nifi.hbase.io.JsonRowSerializer;
    +import org.apache.nifi.hbase.io.RowSerializer;
    +import org.apache.nifi.hbase.scan.Column;
    +import org.apache.nifi.hbase.scan.ResultCell;
    +import org.apache.nifi.hbase.scan.ResultHandler;
    +import org.apache.nifi.hbase.util.ResultCellUtil;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.io.IOException;
    +import java.nio.charset.Charset;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.regex.Pattern;
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@Tags({"hbase", "scan", "fetch", "get", "enrich"})
    +@CapabilityDescription("Fetches a row from an HBase table. The Destination property controls whether the cells are added as flow file attributes, " +
    +        "or the row is written to the flow file content as JSON. This processor may be used to fetch a fixed row on a interval by specifying the " +
    +        "table and row id directly in the processor, or it may be used to dynamically fetch rows by referencing the table and row id from " +
    +        "incoming flow files.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"),
    +        @WritesAttribute(attribute = "hbase.row", description = "The row that was fetched from the HBase table"),
    +        @WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise")
    +})
    +public class FetchHBaseRow extends AbstractProcessor {
    +
    +    static final Pattern COLUMNS_PATTERN = Pattern.compile("\\w+(:\\w+)?(?:,\\w+(:\\w+)?)*");
    +
    +    static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder()
    +            .name("HBase Client Service")
    +            .description("Specifies the Controller Service to use for accessing HBase.")
    +            .required(true)
    +            .identifiesControllerService(HBaseClientService.class)
    +            .build();
    +
    +    static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
    +            .name("Table Name")
    +            .description("The name of the HBase Table to fetch from.")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor ROW_ID = new PropertyDescriptor.Builder()
    +            .name("Row Identifier")
    +            .description("The identifier of the row to fetch.")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor COLUMNS = new PropertyDescriptor.Builder()
    +            .name("Columns")
    +            .description("An optional comma-separated list of \"<colFamily>:<colQualifier>\" pairs to fetch. To return all columns " +
    +                    "for a given family, leave off the qualifier such as \"<colFamily1>,<colFamily2>\".")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.createRegexMatchingValidator(COLUMNS_PATTERN))
    +            .build();
    +
    +    static final AllowableValue DESTINATION_ATTRIBUTES = new AllowableValue("flowfile-attributes", "flowfile-attributes",
    +            "Adds each cell as a FlowFile attribute where they key is col-family:col-qualifier and the value is the cell's value.");
    +    static final AllowableValue DESTINATION_CONTENT = new AllowableValue("flowfile-content", "flowfile-content",
    +            "Overwrites the FlowFile content with a JSON document representing the row that was fetched. " +
    +                    "The format of the JSON document is determined by the JSON Format property.");
    +
    +    static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder()
    +            .name("Destination")
    +            .description("Indicates whether the row fetched from HBase is written to FlowFile content or FlowFile Attributes.")
    +            .required(true)
    +            .allowableValues(DESTINATION_ATTRIBUTES, DESTINATION_CONTENT)
    +            .defaultValue(DESTINATION_ATTRIBUTES.getValue())
    +            .build();
    +
    +    static final AllowableValue VALUE_ENCODING_STRING = new AllowableValue("string", "string", "Creates a String using the bytes of the cell value and the given Character Set.");
    +    static final AllowableValue VALUE_ENCODING_BASE64 = new AllowableValue("base64", "base64", "Creates a Base64 encoded String of the cell value.");
    +
    +    static final PropertyDescriptor VALUE_ENCODING = new PropertyDescriptor.Builder()
    +            .name("Value Encoding")
    +            .description("Specifies how to represent the values of cells when stored in FlowFile attributes, or written to JSON.")
    +            .required(true)
    +            .allowableValues(VALUE_ENCODING_STRING, VALUE_ENCODING_BASE64)
    +            .defaultValue(VALUE_ENCODING_STRING.getValue())
    +            .build();
    +
    +    static final AllowableValue JSON_FORMAT_FULL_ROW = new AllowableValue("full-row", "full-row", "Creates a JSON document with the format: " +
    +            "{\"row\": \"<row key>\", \"cells\": { \"<cell 1 family>:<cell 1 qualifier>\": \"<cell 1 value>\", \"<cell 2 family>:<cell 2 qualifier>\": \"<cell 2 value>\", ... }}.");
    --- End diff --
    
    Good point that we aren't doing anything with the timestamp. Based on that and the other comment about base64 encoding below, I'm now re-thinking the JSON format.
    
    Currently the full-row option produces something like:
    ```{
      "row" : "row1",
      "cells": {
          "fam1:qual1" : "val1",
          "fam1:qual2" : "val2"
      }
    }
    ```
    And the qualifier-and-value option would produce:
    ```
    {
        "qual1" : "val1",
        "qual2" : "val2"
    }
    ```
    
    The reason I added the qualifier-and-value option is because I expect someone will want to take the row and insert it into another system that already supports JSON, like MongoDB or Solr, and it might be a lot easier to have a flat JSON where its just field:value pairs.
    
    I'm now thinking maybe the full row should be more like this:
    ```
    {
      "row" : "row1",
      "cells": [
          {
            "family" : "fam1",
            "qualifier" : "qual1"
            "value" : "val1"
            "timestamp" : 123456789
          },
          {
            "family" : "fam1",
            "qualifier" : "qual2"
            "value" : "val2"
            "timestamp" : 123456789
          }
      ]
    }
    ```
    This way we include the timestamp, and also when we base64 encode it will seem more clear which piece is being looked at. 
    
    We could pull the family up so that it isn't repeated, but I'm also trying to think about what people are going to do with this data after they get it from HBase, and I'm expecting someone may want to use SplitJSON to get each cell, and then they would potentially lose the family it came from if it wasn't repeated each time.
    
    We could even go as far as to flatten the whole thing like:
    ```
    [
      {
        "row" : "row1",
        "family" : "fam1",
        "qualifier" : "qual1"
        "value" : "val1"
        "timestamp" : 123456789
      },
      {
        "row" : "row1",
        "family" : "fam1",
        "qualifier" : "qual2"
        "value" : "val2"
        "timestamp" : 123456789
      }
    ]
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---