You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by MikeThomsen <gi...@git.apache.org> on 2017/09/02 12:10:48 UTC

[GitHub] nifi pull request #2125: NIFI-4346 Created a LookupService that uses HBase a...

GitHub user MikeThomsen opened a pull request:

    https://github.com/apache/nifi/pull/2125

    NIFI-4346 Created a LookupService that uses HBase as its back end.

    Thank you for submitting a contribution to Apache NiFi.
    
    In order to streamline the review of the contribution we ask you
    to ensure the following steps have been taken:
    
    ### For all changes:
    - [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
         in the commit message?
    
    - [ ] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
    
    - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)?
    
    - [ ] Is your initial contribution a single, squashed commit?
    
    ### For code changes:
    - [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
    - [ ] Have you written or updated unit tests to verify your changes?
    - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? 
    - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
    - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
    - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?
    
    ### For documentation related changes:
    - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
    
    ### Note:
    Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/MikeThomsen/nifi NIFI-4346

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/2125.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2125
    
----
commit ef896c9c48e3fe6367f5614b710c782aa4ded780
Author: Mike Thomsen <mi...@gmail.com>
Date:   2017-09-01T20:21:51Z

    NIFI-4346 Created a LookupService that uses HBase as its back end.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #2125: NIFI-4346 Created a LookupService that uses HBase a...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2125#discussion_r138076211
  
    --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_LookupService.java ---
    @@ -0,0 +1,168 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.hbase;
    +
    +import org.apache.hadoop.hbase.TableName;
    +import org.apache.hadoop.hbase.client.Get;
    +import org.apache.hadoop.hbase.client.Result;
    +import org.apache.hadoop.hbase.client.Table;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.controller.ControllerServiceInitializationContext;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.lookup.LookupService;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.NavigableMap;
    +import java.util.Optional;
    +import java.util.Set;
    +
    +public class HBase_1_1_2_LookupService extends HBase_1_1_2_ClientService implements LookupService {
    +    private static final Set<String> REQUIRED_KEYS;
    +
    +    public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
    +            .name("hb-lu-table-name")
    +            .displayName("Table Name")
    +            .description("The name of the table where look ups will be run.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor RETURN_CFS = new PropertyDescriptor.Builder()
    +            .name("hb-lu-return-cfs")
    +            .displayName("Column Families")
    +            .description("The column families that will be returned.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor RETURN_QFS = new PropertyDescriptor.Builder()
    +            .name("hb-lu-return-qfs")
    +            .displayName("Column Qualifiers")
    +            .description("The column qualifies that will be returned.")
    +            .required(false)
    +            .addValidator(Validator.VALID)
    +            .build();
    +
    +    static {
    +        REQUIRED_KEYS = new HashSet<String>();
    +        REQUIRED_KEYS.add("rowKey");
    +    }
    +
    +    private String tableName;
    +    private List<byte[]> families;
    +    private List<byte[]> qualifiers;
    +
    +    private List<PropertyDescriptor> lookupProperties;
    +
    +    @Override
    +    protected void init(ControllerServiceInitializationContext config) throws InitializationException {
    +        super.init(config);
    +        this.lookupProperties = new ArrayList<>();
    +        this.lookupProperties.addAll(properties);
    +        this.lookupProperties.add(TABLE_NAME);
    +        this.lookupProperties.add(RETURN_CFS);
    +        this.lookupProperties.add(RETURN_QFS);
    +    }
    +
    +    @Override
    +    public Optional lookup(Map coordinates) throws LookupFailureException {
    +        byte[] rowKey = ((String)coordinates.get("rowKey")).getBytes();
    +        try {
    +            Map<String, Object> values = new HashMap<>();
    +            try (Table table = this.connection.getTable(TableName.valueOf(tableName))) {
    +                Get get = new Get(rowKey);
    +                Result result = table.get(get);
    +
    +                for (byte[] fam : families) {
    +                    NavigableMap<byte[], byte[]>  map = result.getFamilyMap(fam);
    +                    for (Map.Entry<byte[], byte[]> entry : map.entrySet()) {
    +                        if (qualifiers.contains(entry.getKey()) || qualifiers.size() == 0) {
    +                            values.put(new String(entry.getKey()), new String(entry.getValue()));
    +                        }
    +                    }
    +                }
    +            }
    +
    +            if (values.size() == 1) {
    +                return Optional.ofNullable(values.values().iterator().next());
    +            } else if (values.size() > 1) {
    +                final List<RecordField> fields = new ArrayList<>();
    +                fields.add(new RecordField("key1", RecordFieldType.STRING.getDataType()));
    +                fields.add(new RecordField("key2", RecordFieldType.STRING.getDataType()));
    +                fields.add(new RecordField("key3", RecordFieldType.STRING.getDataType()));
    +                final RecordSchema schema = new SimpleRecordSchema(fields);
    +                return Optional.ofNullable(new MapRecord(schema, values));
    +            } else {
    +                throw new LookupFailureException(String.format("Nothing was found that matched the criteria for row key %s", coordinates.get("rowKey")));
    +            }
    +        } catch (IOException e) {
    +            getLogger().error("Error occurred loading {}", new Object[] { coordinates.get("rowKey") }, e);
    +            throw new LookupFailureException(e);
    +        }
    +    }
    +
    +    @Override
    +    public Class<?> getValueType() {
    +        return null;
    --- End diff --
    
    This should not be returning null. It needs to return the class for the type of object that will be returned -- namely, Record.class


---

[GitHub] nifi pull request #2125: NIFI-4346 Created a LookupService that uses HBase a...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2125#discussion_r138075099
  
    --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_LookupService.java ---
    @@ -0,0 +1,168 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.hbase;
    +
    +import org.apache.hadoop.hbase.TableName;
    +import org.apache.hadoop.hbase.client.Get;
    +import org.apache.hadoop.hbase.client.Result;
    +import org.apache.hadoop.hbase.client.Table;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.controller.ControllerServiceInitializationContext;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.lookup.LookupService;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.NavigableMap;
    +import java.util.Optional;
    +import java.util.Set;
    +
    +public class HBase_1_1_2_LookupService extends HBase_1_1_2_ClientService implements LookupService {
    +    private static final Set<String> REQUIRED_KEYS;
    +
    +    public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
    +            .name("hb-lu-table-name")
    +            .displayName("Table Name")
    +            .description("The name of the table where look ups will be run.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor RETURN_CFS = new PropertyDescriptor.Builder()
    +            .name("hb-lu-return-cfs")
    +            .displayName("Column Families")
    +            .description("The column families that will be returned.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor RETURN_QFS = new PropertyDescriptor.Builder()
    +            .name("hb-lu-return-qfs")
    +            .displayName("Column Qualifiers")
    +            .description("The column qualifies that will be returned.")
    +            .required(false)
    +            .addValidator(Validator.VALID)
    +            .build();
    +
    +    static {
    +        REQUIRED_KEYS = new HashSet<String>();
    +        REQUIRED_KEYS.add("rowKey");
    +    }
    +
    +    private String tableName;
    +    private List<byte[]> families;
    +    private List<byte[]> qualifiers;
    +
    +    private List<PropertyDescriptor> lookupProperties;
    +
    +    @Override
    +    protected void init(ControllerServiceInitializationContext config) throws InitializationException {
    +        super.init(config);
    +        this.lookupProperties = new ArrayList<>();
    +        this.lookupProperties.addAll(properties);
    +        this.lookupProperties.add(TABLE_NAME);
    +        this.lookupProperties.add(RETURN_CFS);
    +        this.lookupProperties.add(RETURN_QFS);
    +    }
    +
    +    @Override
    +    public Optional lookup(Map coordinates) throws LookupFailureException {
    +        byte[] rowKey = ((String)coordinates.get("rowKey")).getBytes();
    +        try {
    +            Map<String, Object> values = new HashMap<>();
    +            try (Table table = this.connection.getTable(TableName.valueOf(tableName))) {
    +                Get get = new Get(rowKey);
    +                Result result = table.get(get);
    +
    +                for (byte[] fam : families) {
    +                    NavigableMap<byte[], byte[]>  map = result.getFamilyMap(fam);
    +                    for (Map.Entry<byte[], byte[]> entry : map.entrySet()) {
    +                        if (qualifiers.contains(entry.getKey()) || qualifiers.size() == 0) {
    +                            values.put(new String(entry.getKey()), new String(entry.getValue()));
    --- End diff --
    
    It's best to avoid using the String(byte[]) constructor and either explicitly pass StandardCharsets.UTF_8 and document that it expects UTF-8 formatted data or better yet expose a Character Set as a configurable property to use.


---

[GitHub] nifi issue #2125: NIFI-4346 Created a LookupService that uses HBase as its b...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on the issue:

    https://github.com/apache/nifi/pull/2125
  
    @MikeThomsen thanks for the contribution! I think this will be a very useful lookup service for a lot of people. Code looks good for the most part but could use a few minor tweaks. Commented inline. Thanks again!


---

[GitHub] nifi pull request #2125: NIFI-4346 Created a LookupService that uses HBase a...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2125#discussion_r138074571
  
    --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_LookupService.java ---
    @@ -0,0 +1,168 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.hbase;
    +
    +import org.apache.hadoop.hbase.TableName;
    +import org.apache.hadoop.hbase.client.Get;
    +import org.apache.hadoop.hbase.client.Result;
    +import org.apache.hadoop.hbase.client.Table;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.controller.ControllerServiceInitializationContext;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.lookup.LookupService;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.NavigableMap;
    +import java.util.Optional;
    +import java.util.Set;
    +
    +public class HBase_1_1_2_LookupService extends HBase_1_1_2_ClientService implements LookupService {
    --- End diff --
    
    This should implement LookupService<Record>, I believe. 


---

[GitHub] nifi pull request #2125: NIFI-4346 Created a LookupService that uses HBase a...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2125#discussion_r138073549
  
    --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java ---
    @@ -88,16 +88,16 @@
     
         static final long TICKET_RENEWAL_PERIOD = 60000;
     
    -    private volatile Connection connection;
    --- End diff --
    
    I think it's best to leave these as private and expose protected getters if the values are needed.


---

[GitHub] nifi issue #2125: NIFI-4346 Created a LookupService that uses HBase as its b...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2125
  
    +1 The string lookup one is going to be interesting.


---

[GitHub] nifi pull request #2125: NIFI-4346 Created a LookupService that uses HBase a...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2125#discussion_r138076738
  
    --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_LookupService.java ---
    @@ -0,0 +1,168 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.hbase;
    +
    +import org.apache.hadoop.hbase.TableName;
    +import org.apache.hadoop.hbase.client.Get;
    +import org.apache.hadoop.hbase.client.Result;
    +import org.apache.hadoop.hbase.client.Table;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.controller.ControllerServiceInitializationContext;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.lookup.LookupService;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.NavigableMap;
    +import java.util.Optional;
    +import java.util.Set;
    +
    +public class HBase_1_1_2_LookupService extends HBase_1_1_2_ClientService implements LookupService {
    +    private static final Set<String> REQUIRED_KEYS;
    +
    +    public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
    +            .name("hb-lu-table-name")
    +            .displayName("Table Name")
    +            .description("The name of the table where look ups will be run.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor RETURN_CFS = new PropertyDescriptor.Builder()
    +            .name("hb-lu-return-cfs")
    +            .displayName("Column Families")
    +            .description("The column families that will be returned.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor RETURN_QFS = new PropertyDescriptor.Builder()
    +            .name("hb-lu-return-qfs")
    +            .displayName("Column Qualifiers")
    +            .description("The column qualifies that will be returned.")
    +            .required(false)
    +            .addValidator(Validator.VALID)
    +            .build();
    +
    +    static {
    +        REQUIRED_KEYS = new HashSet<String>();
    +        REQUIRED_KEYS.add("rowKey");
    +    }
    +
    +    private String tableName;
    +    private List<byte[]> families;
    +    private List<byte[]> qualifiers;
    +
    +    private List<PropertyDescriptor> lookupProperties;
    +
    +    @Override
    +    protected void init(ControllerServiceInitializationContext config) throws InitializationException {
    +        super.init(config);
    +        this.lookupProperties = new ArrayList<>();
    +        this.lookupProperties.addAll(properties);
    +        this.lookupProperties.add(TABLE_NAME);
    +        this.lookupProperties.add(RETURN_CFS);
    +        this.lookupProperties.add(RETURN_QFS);
    +    }
    +
    +    @Override
    +    public Optional lookup(Map coordinates) throws LookupFailureException {
    +        byte[] rowKey = ((String)coordinates.get("rowKey")).getBytes();
    +        try {
    +            Map<String, Object> values = new HashMap<>();
    +            try (Table table = this.connection.getTable(TableName.valueOf(tableName))) {
    +                Get get = new Get(rowKey);
    +                Result result = table.get(get);
    +
    +                for (byte[] fam : families) {
    +                    NavigableMap<byte[], byte[]>  map = result.getFamilyMap(fam);
    +                    for (Map.Entry<byte[], byte[]> entry : map.entrySet()) {
    +                        if (qualifiers.contains(entry.getKey()) || qualifiers.size() == 0) {
    +                            values.put(new String(entry.getKey()), new String(entry.getValue()));
    +                        }
    +                    }
    +                }
    +            }
    +
    +            if (values.size() == 1) {
    +                return Optional.ofNullable(values.values().iterator().next());
    +            } else if (values.size() > 1) {
    +                final List<RecordField> fields = new ArrayList<>();
    +                fields.add(new RecordField("key1", RecordFieldType.STRING.getDataType()));
    +                fields.add(new RecordField("key2", RecordFieldType.STRING.getDataType()));
    +                fields.add(new RecordField("key3", RecordFieldType.STRING.getDataType()));
    +                final RecordSchema schema = new SimpleRecordSchema(fields);
    +                return Optional.ofNullable(new MapRecord(schema, values));
    +            } else {
    +                throw new LookupFailureException(String.format("Nothing was found that matched the criteria for row key %s", coordinates.get("rowKey")));
    +            }
    +        } catch (IOException e) {
    +            getLogger().error("Error occurred loading {}", new Object[] { coordinates.get("rowKey") }, e);
    +            throw new LookupFailureException(e);
    +        }
    +    }
    +
    +    @Override
    +    public Class<?> getValueType() {
    +        return null;
    +    }
    +
    +    @Override
    +    public Set<String> getRequiredKeys() {
    +        return REQUIRED_KEYS;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @OnEnabled
    +    public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, InterruptedException {
    +        super.onEnabled(context);
    +
    +        this.tableName = context.getProperty(TABLE_NAME).getValue();
    +        String families = context.getProperty(RETURN_CFS).getValue();
    +        String[] familiesSplit = families.split(",[\\s]*");
    --- End diff --
    
    It may be better in this case to use split(",") and then trim each result as you iterate over them, simply because as-is, it will strip out any leading white space but not trailing white space (i.e., if i used a value like "value1 , value2" then it would result in "value1 " (with a space) and "value2")


---

[GitHub] nifi pull request #2125: NIFI-4346 Created a LookupService that uses HBase a...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2125#discussion_r138073948
  
    --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_LookupService.java ---
    @@ -0,0 +1,168 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.hbase;
    +
    +import org.apache.hadoop.hbase.TableName;
    +import org.apache.hadoop.hbase.client.Get;
    +import org.apache.hadoop.hbase.client.Result;
    +import org.apache.hadoop.hbase.client.Table;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.controller.ControllerServiceInitializationContext;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.lookup.LookupService;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.NavigableMap;
    +import java.util.Optional;
    +import java.util.Set;
    +
    +public class HBase_1_1_2_LookupService extends HBase_1_1_2_ClientService implements LookupService {
    +    private static final Set<String> REQUIRED_KEYS;
    +
    +    public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
    +            .name("hb-lu-table-name")
    +            .displayName("Table Name")
    +            .description("The name of the table where look ups will be run.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor RETURN_CFS = new PropertyDescriptor.Builder()
    +            .name("hb-lu-return-cfs")
    +            .displayName("Column Families")
    +            .description("The column families that will be returned.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor RETURN_QFS = new PropertyDescriptor.Builder()
    +            .name("hb-lu-return-qfs")
    +            .displayName("Column Qualifiers")
    +            .description("The column qualifies that will be returned.")
    +            .required(false)
    +            .addValidator(Validator.VALID)
    +            .build();
    +
    +    static {
    --- End diff --
    
    Probably better to avoid the static block all together here and just declare the variable as:
    
    `private static final Set<String> REQUIRED_KEYS = Collections.singleton("rowKey");`


---

[GitHub] nifi issue #2125: NIFI-4346 Created a LookupService that uses HBase as its b...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2125
  
    (Go ahead and commit, seems like a good approach to me)


---

[GitHub] nifi pull request #2125: NIFI-4346 Created a LookupService that uses HBase a...

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2125#discussion_r142770600
  
    --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java ---
    @@ -88,16 +88,16 @@
     
         static final long TICKET_RENEWAL_PERIOD = 60000;
     
    -    private volatile Connection connection;
    +    protected volatile Connection connection;
         private volatile UserGroupInformation ugi;
         private volatile KerberosTicketRenewer renewer;
     
    -    private List<PropertyDescriptor> properties;
    -    private KerberosProperties kerberosProperties;
    +    protected List<PropertyDescriptor> properties;
    +    protected KerberosProperties kerberosProperties;
         private volatile File kerberosConfigFile = null;
     
         // Holder of cached Configuration information so validation does not reload the same config over and over
    -    private final AtomicReference<ValidationResources> validationResourceHolder = new AtomicReference<>();
    +    protected final AtomicReference<ValidationResources> validationResourceHolder = new AtomicReference<>();
    --- End diff --
    
    Can we leave as private? don't see anything referencing it.


---

[GitHub] nifi issue #2125: NIFI-4346 Created a LookupService that uses HBase as its b...

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on the issue:

    https://github.com/apache/nifi/pull/2125
  
    @MikeThomsen Thanks for the updates...
    
    While I was testing this I started thinking that maybe the lookup service should be using an HBaseClientService, rather then extending it.
    
    My thinking is that, if you wanted to have several lookup services, they shouldn't all have to create their own set of connections, they can all just use one client service.
    
    I took a shot at making these changes and put them in a branch here:
    https://github.com/bbende/nifi/commits/NIFI-4346
    
    Since the scan is going through the client service now, I had to make a slight change to the way the column families and qualifiers are specified. I made it follow the pattern from the 'Columns' property in GetHBase.
    
    Let me know what you think about these changes. If you are on-board with this approach then I can go ahead and commit all of this, keeping your original commit as the starting point of course.


---

[GitHub] nifi pull request #2125: NIFI-4346 Created a LookupService that uses HBase a...

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2125#discussion_r142768509
  
    --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_LookupService.java ---
    @@ -0,0 +1,180 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.hbase;
    +
    +import org.apache.hadoop.hbase.TableName;
    +import org.apache.hadoop.hbase.client.Get;
    +import org.apache.hadoop.hbase.client.Result;
    +import org.apache.hadoop.hbase.client.Table;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.controller.ControllerServiceInitializationContext;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.lookup.LookupService;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +
    +import java.io.IOException;
    +import java.nio.charset.Charset;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.NavigableMap;
    +import java.util.Optional;
    +import java.util.Set;
    +
    +@Tags({"hbase", "record", "lookup", "service"})
    +@CapabilityDescription(
    +    "A lookup service that retrieves one or more columns from HBase based on a supplied rowKey."
    +)
    +public class HBase_1_1_2_LookupService extends HBase_1_1_2_ClientService implements LookupService<Record> {
    +    private static final Set<String> REQUIRED_KEYS = Collections.singleton("rowKey");
    +
    +    public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
    +            .name("hb-lu-table-name")
    +            .displayName("Table Name")
    +            .description("The name of the table where look ups will be run.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor RETURN_CFS = new PropertyDescriptor.Builder()
    +            .name("hb-lu-return-cfs")
    +            .displayName("Column Families")
    +            .description("The column families that will be returned.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor RETURN_QFS = new PropertyDescriptor.Builder()
    +            .name("hb-lu-return-qfs")
    +            .displayName("Column Qualifiers")
    +            .description("The column qualifies that will be returned.")
    +            .required(false)
    +            .addValidator(Validator.VALID)
    +            .build();
    +    protected static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
    +            .name("hb-lu-charset")
    +            .displayName("Character Set")
    +            .description("Specifies the character set of the document data.")
    +            .required(true)
    +            .defaultValue("UTF-8")
    +            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
    +            .build();
    +
    +    private String tableName;
    +    private List<byte[]> families;
    +    private List<byte[]> qualifiers;
    +    private Charset charset;
    +    private List<PropertyDescriptor> lookupProperties;
    +
    +    @Override
    +    protected void init(ControllerServiceInitializationContext config) throws InitializationException {
    +        super.init(config);
    +        this.lookupProperties = new ArrayList<>();
    +        this.lookupProperties.addAll(properties);
    +        this.lookupProperties.add(TABLE_NAME);
    +        this.lookupProperties.add(RETURN_CFS);
    +        this.lookupProperties.add(RETURN_QFS);
    +        this.lookupProperties.add(CHARSET);
    +    }
    +
    +    @Override
    +    public Optional<Record> lookup(Map<String, String> coordinates) throws LookupFailureException {
    +        byte[] rowKey = ((String)coordinates.get("rowKey")).getBytes();
    +        try {
    +            Map<String, Object> values = new HashMap<>();
    +            try (Table table = this.connection.getTable(TableName.valueOf(tableName))) {
    +                Get get = new Get(rowKey);
    +                Result result = table.get(get);
    +
    +                for (byte[] fam : families) {
    +                    NavigableMap<byte[], byte[]>  map = result.getFamilyMap(fam);
    +                    for (Map.Entry<byte[], byte[]> entry : map.entrySet()) {
    +                        if (qualifiers.contains(entry.getKey()) || qualifiers.size() == 0) {
    +                            values.put(new String(entry.getKey(), charset), new String(entry.getValue(), charset));
    +                        }
    +                    }
    +                }
    +            }
    +
    +            if (values.size() > 0) {
    +                final List<RecordField> fields = new ArrayList<>();
    +                for (String key : values.keySet()) {
    +                    fields.add(new RecordField(key, RecordFieldType.STRING.getDataType()));
    +                }
    +                final RecordSchema schema = new SimpleRecordSchema(fields);
    +                return Optional.ofNullable(new MapRecord(schema, values));
    +            } else {
    +                throw new LookupFailureException(String.format("Nothing was found that matched the criteria for row key %s", coordinates.get("rowKey")));
    --- End diff --
    
    Looks like Mark P had suggest returning Optional.empty() here instead of throwing an exception


---

[GitHub] nifi issue #2125: NIFI-4346 Created a LookupService that uses HBase as its b...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2125
  
    @markap14 @mattyb149 Can you take a look when you get a chance?


---

[GitHub] nifi pull request #2125: NIFI-4346 Created a LookupService that uses HBase a...

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2125#discussion_r142770453
  
    --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java ---
    @@ -88,16 +88,16 @@
     
         static final long TICKET_RENEWAL_PERIOD = 60000;
     
    -    private volatile Connection connection;
    +    protected volatile Connection connection;
    --- End diff --
    
    Can we leave the connection as private and add a `protected Connection getConnection()` for the lookup service to use?


---

[GitHub] nifi pull request #2125: NIFI-4346 Created a LookupService that uses HBase a...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/nifi/pull/2125


---

[GitHub] nifi pull request #2125: NIFI-4346 Created a LookupService that uses HBase a...

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2125#discussion_r142771753
  
    --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java ---
    @@ -88,16 +88,16 @@
     
         static final long TICKET_RENEWAL_PERIOD = 60000;
     
    -    private volatile Connection connection;
    +    protected volatile Connection connection;
         private volatile UserGroupInformation ugi;
         private volatile KerberosTicketRenewer renewer;
     
    -    private List<PropertyDescriptor> properties;
    -    private KerberosProperties kerberosProperties;
    +    protected List<PropertyDescriptor> properties;
    --- End diff --
    
    Can we leave this as private and then add a protected method that sub-classes can use to add additional properties? So something like this...
    ```
    protected List<PropertyDescriptor> getAdditionalProperties() {
    }
    ```
    And then in init, it would call it like this:
    ```
    List<PropertyDescriptor> props = new ArrayList<>();
            ...
            props.addAll(getAdditionalProperties());
            this.properties = Collections.unmodifiableList(props);
    ```
    
    Then the lookup service would override getAdditionalProperties() to provide it's specific properties to be added.


---

[GitHub] nifi issue #2125: NIFI-4346 Created a LookupService that uses HBase as its b...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2125
  
    @bbende all of the changes you requested were added.


---

[GitHub] nifi pull request #2125: NIFI-4346 Created a LookupService that uses HBase a...

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2125#discussion_r142770583
  
    --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java ---
    @@ -88,16 +88,16 @@
     
         static final long TICKET_RENEWAL_PERIOD = 60000;
     
    -    private volatile Connection connection;
    +    protected volatile Connection connection;
         private volatile UserGroupInformation ugi;
         private volatile KerberosTicketRenewer renewer;
     
    -    private List<PropertyDescriptor> properties;
    -    private KerberosProperties kerberosProperties;
    +    protected List<PropertyDescriptor> properties;
    +    protected KerberosProperties kerberosProperties;
    --- End diff --
    
    Can we leave as private? don't see anything referencing it.


---

[GitHub] nifi pull request #2125: NIFI-4346 Created a LookupService that uses HBase a...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2125#discussion_r138077452
  
    --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_LookupService.java ---
    @@ -0,0 +1,168 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.hbase;
    +
    +import org.apache.hadoop.hbase.TableName;
    +import org.apache.hadoop.hbase.client.Get;
    +import org.apache.hadoop.hbase.client.Result;
    +import org.apache.hadoop.hbase.client.Table;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.controller.ControllerServiceInitializationContext;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.lookup.LookupService;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.NavigableMap;
    +import java.util.Optional;
    +import java.util.Set;
    +
    +public class HBase_1_1_2_LookupService extends HBase_1_1_2_ClientService implements LookupService {
    +    private static final Set<String> REQUIRED_KEYS;
    +
    +    public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
    +            .name("hb-lu-table-name")
    +            .displayName("Table Name")
    +            .description("The name of the table where look ups will be run.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor RETURN_CFS = new PropertyDescriptor.Builder()
    +            .name("hb-lu-return-cfs")
    +            .displayName("Column Families")
    +            .description("The column families that will be returned.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor RETURN_QFS = new PropertyDescriptor.Builder()
    +            .name("hb-lu-return-qfs")
    +            .displayName("Column Qualifiers")
    +            .description("The column qualifies that will be returned.")
    +            .required(false)
    +            .addValidator(Validator.VALID)
    +            .build();
    +
    +    static {
    +        REQUIRED_KEYS = new HashSet<String>();
    +        REQUIRED_KEYS.add("rowKey");
    +    }
    +
    +    private String tableName;
    +    private List<byte[]> families;
    +    private List<byte[]> qualifiers;
    +
    +    private List<PropertyDescriptor> lookupProperties;
    +
    +    @Override
    +    protected void init(ControllerServiceInitializationContext config) throws InitializationException {
    +        super.init(config);
    +        this.lookupProperties = new ArrayList<>();
    +        this.lookupProperties.addAll(properties);
    +        this.lookupProperties.add(TABLE_NAME);
    +        this.lookupProperties.add(RETURN_CFS);
    +        this.lookupProperties.add(RETURN_QFS);
    +    }
    +
    +    @Override
    +    public Optional lookup(Map coordinates) throws LookupFailureException {
    +        byte[] rowKey = ((String)coordinates.get("rowKey")).getBytes();
    +        try {
    +            Map<String, Object> values = new HashMap<>();
    +            try (Table table = this.connection.getTable(TableName.valueOf(tableName))) {
    +                Get get = new Get(rowKey);
    +                Result result = table.get(get);
    +
    +                for (byte[] fam : families) {
    +                    NavigableMap<byte[], byte[]>  map = result.getFamilyMap(fam);
    +                    for (Map.Entry<byte[], byte[]> entry : map.entrySet()) {
    +                        if (qualifiers.contains(entry.getKey()) || qualifiers.size() == 0) {
    +                            values.put(new String(entry.getKey()), new String(entry.getValue()));
    +                        }
    +                    }
    +                }
    +            }
    +
    +            if (values.size() == 1) {
    +                return Optional.ofNullable(values.values().iterator().next());
    +            } else if (values.size() > 1) {
    +                final List<RecordField> fields = new ArrayList<>();
    +                fields.add(new RecordField("key1", RecordFieldType.STRING.getDataType()));
    +                fields.add(new RecordField("key2", RecordFieldType.STRING.getDataType()));
    +                fields.add(new RecordField("key3", RecordFieldType.STRING.getDataType()));
    +                final RecordSchema schema = new SimpleRecordSchema(fields);
    --- End diff --
    
    I'm not sure what this schema is meant to represent... are the results coming back from HBase actually in a map with keys "key1", "key2", and "key3"? If so, it would be helpful to add some documentation explaining what that represents. If not, then we need to be sure that the schema built does in fact reflect the values returned.


---

[GitHub] nifi pull request #2125: NIFI-4346 Created a LookupService that uses HBase a...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2125#discussion_r138074642
  
    --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_LookupService.java ---
    @@ -0,0 +1,168 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.hbase;
    +
    +import org.apache.hadoop.hbase.TableName;
    +import org.apache.hadoop.hbase.client.Get;
    +import org.apache.hadoop.hbase.client.Result;
    +import org.apache.hadoop.hbase.client.Table;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.controller.ControllerServiceInitializationContext;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.lookup.LookupService;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.NavigableMap;
    +import java.util.Optional;
    +import java.util.Set;
    +
    +public class HBase_1_1_2_LookupService extends HBase_1_1_2_ClientService implements LookupService {
    +    private static final Set<String> REQUIRED_KEYS;
    +
    +    public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
    +            .name("hb-lu-table-name")
    +            .displayName("Table Name")
    +            .description("The name of the table where look ups will be run.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor RETURN_CFS = new PropertyDescriptor.Builder()
    +            .name("hb-lu-return-cfs")
    +            .displayName("Column Families")
    +            .description("The column families that will be returned.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor RETURN_QFS = new PropertyDescriptor.Builder()
    +            .name("hb-lu-return-qfs")
    +            .displayName("Column Qualifiers")
    +            .description("The column qualifies that will be returned.")
    +            .required(false)
    +            .addValidator(Validator.VALID)
    +            .build();
    +
    +    static {
    +        REQUIRED_KEYS = new HashSet<String>();
    +        REQUIRED_KEYS.add("rowKey");
    +    }
    +
    +    private String tableName;
    +    private List<byte[]> families;
    +    private List<byte[]> qualifiers;
    +
    +    private List<PropertyDescriptor> lookupProperties;
    +
    +    @Override
    +    protected void init(ControllerServiceInitializationContext config) throws InitializationException {
    +        super.init(config);
    +        this.lookupProperties = new ArrayList<>();
    +        this.lookupProperties.addAll(properties);
    +        this.lookupProperties.add(TABLE_NAME);
    +        this.lookupProperties.add(RETURN_CFS);
    +        this.lookupProperties.add(RETURN_QFS);
    +    }
    +
    +    @Override
    +    public Optional lookup(Map coordinates) throws LookupFailureException {
    --- End diff --
    
    This should be declared as returning `Optional<Record>` and take `Map<String, String>` as the argument.


---

[GitHub] nifi issue #2125: NIFI-4346 Created a LookupService that uses HBase as its b...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on the issue:

    https://github.com/apache/nifi/pull/2125
  
    Will review...


---

[GitHub] nifi pull request #2125: NIFI-4346 Created a LookupService that uses HBase a...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2125#discussion_r138076835
  
    --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_LookupService.java ---
    @@ -0,0 +1,168 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.hbase;
    +
    +import org.apache.hadoop.hbase.TableName;
    +import org.apache.hadoop.hbase.client.Get;
    +import org.apache.hadoop.hbase.client.Result;
    +import org.apache.hadoop.hbase.client.Table;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.controller.ControllerServiceInitializationContext;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.lookup.LookupService;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.NavigableMap;
    +import java.util.Optional;
    +import java.util.Set;
    +
    +public class HBase_1_1_2_LookupService extends HBase_1_1_2_ClientService implements LookupService {
    +    private static final Set<String> REQUIRED_KEYS;
    +
    +    public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
    +            .name("hb-lu-table-name")
    +            .displayName("Table Name")
    +            .description("The name of the table where look ups will be run.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor RETURN_CFS = new PropertyDescriptor.Builder()
    +            .name("hb-lu-return-cfs")
    +            .displayName("Column Families")
    +            .description("The column families that will be returned.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor RETURN_QFS = new PropertyDescriptor.Builder()
    +            .name("hb-lu-return-qfs")
    +            .displayName("Column Qualifiers")
    +            .description("The column qualifies that will be returned.")
    +            .required(false)
    +            .addValidator(Validator.VALID)
    +            .build();
    +
    +    static {
    +        REQUIRED_KEYS = new HashSet<String>();
    +        REQUIRED_KEYS.add("rowKey");
    +    }
    +
    +    private String tableName;
    +    private List<byte[]> families;
    +    private List<byte[]> qualifiers;
    +
    +    private List<PropertyDescriptor> lookupProperties;
    +
    +    @Override
    +    protected void init(ControllerServiceInitializationContext config) throws InitializationException {
    +        super.init(config);
    +        this.lookupProperties = new ArrayList<>();
    +        this.lookupProperties.addAll(properties);
    +        this.lookupProperties.add(TABLE_NAME);
    +        this.lookupProperties.add(RETURN_CFS);
    +        this.lookupProperties.add(RETURN_QFS);
    +    }
    +
    +    @Override
    +    public Optional lookup(Map coordinates) throws LookupFailureException {
    +        byte[] rowKey = ((String)coordinates.get("rowKey")).getBytes();
    +        try {
    +            Map<String, Object> values = new HashMap<>();
    +            try (Table table = this.connection.getTable(TableName.valueOf(tableName))) {
    +                Get get = new Get(rowKey);
    +                Result result = table.get(get);
    +
    +                for (byte[] fam : families) {
    +                    NavigableMap<byte[], byte[]>  map = result.getFamilyMap(fam);
    +                    for (Map.Entry<byte[], byte[]> entry : map.entrySet()) {
    +                        if (qualifiers.contains(entry.getKey()) || qualifiers.size() == 0) {
    +                            values.put(new String(entry.getKey()), new String(entry.getValue()));
    +                        }
    +                    }
    +                }
    +            }
    +
    +            if (values.size() == 1) {
    +                return Optional.ofNullable(values.values().iterator().next());
    +            } else if (values.size() > 1) {
    +                final List<RecordField> fields = new ArrayList<>();
    +                fields.add(new RecordField("key1", RecordFieldType.STRING.getDataType()));
    +                fields.add(new RecordField("key2", RecordFieldType.STRING.getDataType()));
    +                fields.add(new RecordField("key3", RecordFieldType.STRING.getDataType()));
    +                final RecordSchema schema = new SimpleRecordSchema(fields);
    +                return Optional.ofNullable(new MapRecord(schema, values));
    +            } else {
    +                throw new LookupFailureException(String.format("Nothing was found that matched the criteria for row key %s", coordinates.get("rowKey")));
    +            }
    +        } catch (IOException e) {
    +            getLogger().error("Error occurred loading {}", new Object[] { coordinates.get("rowKey") }, e);
    +            throw new LookupFailureException(e);
    +        }
    +    }
    +
    +    @Override
    +    public Class<?> getValueType() {
    +        return null;
    +    }
    +
    +    @Override
    +    public Set<String> getRequiredKeys() {
    +        return REQUIRED_KEYS;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @OnEnabled
    +    public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, InterruptedException {
    +        super.onEnabled(context);
    +
    +        this.tableName = context.getProperty(TABLE_NAME).getValue();
    +        String families = context.getProperty(RETURN_CFS).getValue();
    +        String[] familiesSplit = families.split(",[\\s]*");
    +        this.families = new ArrayList<>();
    +        for (String fs : familiesSplit) {
    +            this.families.add(fs.getBytes());
    +        }
    +        this.qualifiers = new ArrayList<>();
    +        String quals = context.getProperty(RETURN_QFS).getValue();
    +
    +        if (quals != null && quals.length() > 0) {
    +            String[] qualsSplit = quals.split("[\\s]*");
    --- End diff --
    
    Same comment as above re: split and then trim()


---

[GitHub] nifi pull request #2125: NIFI-4346 Created a LookupService that uses HBase a...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2125#discussion_r138158414
  
    --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_LookupService.java ---
    @@ -0,0 +1,168 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.hbase;
    +
    +import org.apache.hadoop.hbase.TableName;
    +import org.apache.hadoop.hbase.client.Get;
    +import org.apache.hadoop.hbase.client.Result;
    +import org.apache.hadoop.hbase.client.Table;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.controller.ControllerServiceInitializationContext;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.lookup.LookupService;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.NavigableMap;
    +import java.util.Optional;
    +import java.util.Set;
    +
    +public class HBase_1_1_2_LookupService extends HBase_1_1_2_ClientService implements LookupService {
    +    private static final Set<String> REQUIRED_KEYS;
    +
    +    public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
    +            .name("hb-lu-table-name")
    +            .displayName("Table Name")
    +            .description("The name of the table where look ups will be run.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor RETURN_CFS = new PropertyDescriptor.Builder()
    +            .name("hb-lu-return-cfs")
    +            .displayName("Column Families")
    +            .description("The column families that will be returned.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor RETURN_QFS = new PropertyDescriptor.Builder()
    +            .name("hb-lu-return-qfs")
    +            .displayName("Column Qualifiers")
    +            .description("The column qualifies that will be returned.")
    +            .required(false)
    +            .addValidator(Validator.VALID)
    +            .build();
    +
    +    static {
    +        REQUIRED_KEYS = new HashSet<String>();
    +        REQUIRED_KEYS.add("rowKey");
    +    }
    +
    +    private String tableName;
    +    private List<byte[]> families;
    +    private List<byte[]> qualifiers;
    +
    +    private List<PropertyDescriptor> lookupProperties;
    +
    +    @Override
    +    protected void init(ControllerServiceInitializationContext config) throws InitializationException {
    +        super.init(config);
    +        this.lookupProperties = new ArrayList<>();
    +        this.lookupProperties.addAll(properties);
    +        this.lookupProperties.add(TABLE_NAME);
    +        this.lookupProperties.add(RETURN_CFS);
    +        this.lookupProperties.add(RETURN_QFS);
    +    }
    +
    +    @Override
    +    public Optional lookup(Map coordinates) throws LookupFailureException {
    +        byte[] rowKey = ((String)coordinates.get("rowKey")).getBytes();
    +        try {
    +            Map<String, Object> values = new HashMap<>();
    +            try (Table table = this.connection.getTable(TableName.valueOf(tableName))) {
    +                Get get = new Get(rowKey);
    +                Result result = table.get(get);
    +
    +                for (byte[] fam : families) {
    +                    NavigableMap<byte[], byte[]>  map = result.getFamilyMap(fam);
    +                    for (Map.Entry<byte[], byte[]> entry : map.entrySet()) {
    +                        if (qualifiers.contains(entry.getKey()) || qualifiers.size() == 0) {
    +                            values.put(new String(entry.getKey()), new String(entry.getValue()));
    +                        }
    +                    }
    +                }
    +            }
    +
    +            if (values.size() == 1) {
    +                return Optional.ofNullable(values.values().iterator().next());
    +            } else if (values.size() > 1) {
    +                final List<RecordField> fields = new ArrayList<>();
    +                fields.add(new RecordField("key1", RecordFieldType.STRING.getDataType()));
    +                fields.add(new RecordField("key2", RecordFieldType.STRING.getDataType()));
    +                fields.add(new RecordField("key3", RecordFieldType.STRING.getDataType()));
    +                final RecordSchema schema = new SimpleRecordSchema(fields);
    --- End diff --
    
    That was copy pasta that I must have forgotten to update...


---

[GitHub] nifi issue #2125: NIFI-4346 Created a LookupService that uses HBase as its b...

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on the issue:

    https://github.com/apache/nifi/pull/2125
  
    Oh and I had rename to "___RecordLookupService" b/c I was attempting to make a "___StringLookupService" variant, but that turned out to have some other issues.


---

[GitHub] nifi issue #2125: NIFI-4346 Created a LookupService that uses HBase as its b...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2125
  
    @markap14 All of the changes should now be checked in.


---

[GitHub] nifi pull request #2125: NIFI-4346 Created a LookupService that uses HBase a...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2125#discussion_r138077643
  
    --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_LookupService.java ---
    @@ -0,0 +1,168 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.hbase;
    +
    +import org.apache.hadoop.hbase.TableName;
    +import org.apache.hadoop.hbase.client.Get;
    +import org.apache.hadoop.hbase.client.Result;
    +import org.apache.hadoop.hbase.client.Table;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.controller.ControllerServiceInitializationContext;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.lookup.LookupService;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.NavigableMap;
    +import java.util.Optional;
    +import java.util.Set;
    +
    +public class HBase_1_1_2_LookupService extends HBase_1_1_2_ClientService implements LookupService {
    --- End diff --
    
    We should also add a @CapabilityDescription annotation and a @Tags annotation


---

[GitHub] nifi pull request #2125: NIFI-4346 Created a LookupService that uses HBase a...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2125#discussion_r138075273
  
    --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_LookupService.java ---
    @@ -0,0 +1,168 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.hbase;
    +
    +import org.apache.hadoop.hbase.TableName;
    +import org.apache.hadoop.hbase.client.Get;
    +import org.apache.hadoop.hbase.client.Result;
    +import org.apache.hadoop.hbase.client.Table;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.controller.ControllerServiceInitializationContext;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.lookup.LookupService;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.NavigableMap;
    +import java.util.Optional;
    +import java.util.Set;
    +
    +public class HBase_1_1_2_LookupService extends HBase_1_1_2_ClientService implements LookupService {
    +    private static final Set<String> REQUIRED_KEYS;
    +
    +    public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
    +            .name("hb-lu-table-name")
    +            .displayName("Table Name")
    +            .description("The name of the table where look ups will be run.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor RETURN_CFS = new PropertyDescriptor.Builder()
    +            .name("hb-lu-return-cfs")
    +            .displayName("Column Families")
    +            .description("The column families that will be returned.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor RETURN_QFS = new PropertyDescriptor.Builder()
    +            .name("hb-lu-return-qfs")
    +            .displayName("Column Qualifiers")
    +            .description("The column qualifies that will be returned.")
    +            .required(false)
    +            .addValidator(Validator.VALID)
    +            .build();
    +
    +    static {
    +        REQUIRED_KEYS = new HashSet<String>();
    +        REQUIRED_KEYS.add("rowKey");
    +    }
    +
    +    private String tableName;
    +    private List<byte[]> families;
    +    private List<byte[]> qualifiers;
    +
    +    private List<PropertyDescriptor> lookupProperties;
    +
    +    @Override
    +    protected void init(ControllerServiceInitializationContext config) throws InitializationException {
    +        super.init(config);
    +        this.lookupProperties = new ArrayList<>();
    +        this.lookupProperties.addAll(properties);
    +        this.lookupProperties.add(TABLE_NAME);
    +        this.lookupProperties.add(RETURN_CFS);
    +        this.lookupProperties.add(RETURN_QFS);
    +    }
    +
    +    @Override
    +    public Optional lookup(Map coordinates) throws LookupFailureException {
    +        byte[] rowKey = ((String)coordinates.get("rowKey")).getBytes();
    +        try {
    +            Map<String, Object> values = new HashMap<>();
    +            try (Table table = this.connection.getTable(TableName.valueOf(tableName))) {
    +                Get get = new Get(rowKey);
    +                Result result = table.get(get);
    +
    +                for (byte[] fam : families) {
    +                    NavigableMap<byte[], byte[]>  map = result.getFamilyMap(fam);
    +                    for (Map.Entry<byte[], byte[]> entry : map.entrySet()) {
    +                        if (qualifiers.contains(entry.getKey()) || qualifiers.size() == 0) {
    +                            values.put(new String(entry.getKey()), new String(entry.getValue()));
    +                        }
    +                    }
    +                }
    +            }
    +
    +            if (values.size() == 1) {
    +                return Optional.ofNullable(values.values().iterator().next());
    +            } else if (values.size() > 1) {
    +                final List<RecordField> fields = new ArrayList<>();
    +                fields.add(new RecordField("key1", RecordFieldType.STRING.getDataType()));
    +                fields.add(new RecordField("key2", RecordFieldType.STRING.getDataType()));
    +                fields.add(new RecordField("key3", RecordFieldType.STRING.getDataType()));
    +                final RecordSchema schema = new SimpleRecordSchema(fields);
    +                return Optional.ofNullable(new MapRecord(schema, values));
    +            } else {
    +                throw new LookupFailureException(String.format("Nothing was found that matched the criteria for row key %s", coordinates.get("rowKey")));
    --- End diff --
    
    We should not be throwing a LookupFailure in case of no results, but rather just return Optional.empty()


---