You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/11/23 21:46:25 UTC

[14/50] [abbrv] nifi git commit: NIFI-1175 Exposing minimum properties required to create an HBase connection on the HBaseClientService as an optional alternative to the conf files

NIFI-1175 Exposing minimum properties required to create an HBase connection on the HBaseClientService as an optional alternative to the conf files


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/2b9b5e00
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/2b9b5e00
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/2b9b5e00

Branch: refs/heads/NIFI-655
Commit: 2b9b5e008f307ff5f31c36840ec503cacae5ff30
Parents: 453b140
Author: Bryan Bende <bb...@apache.org>
Authored: Mon Nov 16 14:01:23 2015 -0500
Committer: Bryan Bende <bb...@apache.org>
Committed: Tue Nov 17 12:01:46 2015 -0500

----------------------------------------------------------------------
 .../apache/nifi/hbase/HBaseClientService.java   | 30 ++++++-
 .../nifi-hbase_1_1_2-client-service/pom.xml     |  6 ++
 .../nifi/hbase/HBase_1_1_2_ClientService.java   | 93 +++++++++++++++++++-
 .../hbase/TestHBase_1_1_2_ClientService.java    | 67 ++++++++++++++
 4 files changed, 190 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/2b9b5e00/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
index a3f2040..9ff2c46 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
@@ -24,6 +24,7 @@ import org.apache.nifi.hbase.put.PutFlowFile;
 import org.apache.nifi.hbase.scan.Column;
 import org.apache.nifi.hbase.scan.ResultHandler;
 import org.apache.nifi.hbase.validate.ConfigFilesValidator;
+import org.apache.nifi.processor.util.StandardValidators;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -34,12 +35,35 @@ public interface HBaseClientService extends ControllerService {
 
     PropertyDescriptor HADOOP_CONF_FILES = new PropertyDescriptor.Builder()
             .name("Hadoop Configuration Files")
-            .description("Comma-separated list of Hadoop Configuration files, such as hbase-site.xml")
-            .required(true)
-            .defaultValue("./conf/hbase-site.xml")
+            .description("Comma-separated list of Hadoop Configuration files, such as hbase-site.xml, including full paths to the files.")
             .addValidator(new ConfigFilesValidator())
             .build();
 
+    PropertyDescriptor ZOOKEEPER_QUORUM = new PropertyDescriptor.Builder()
+            .name("ZooKeeper Quorum")
+            .description("Comma-separated list of ZooKeeper hosts for HBase. Required if Hadoop Configuration Files are not provided.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    PropertyDescriptor ZOOKEEPER_CLIENT_PORT = new PropertyDescriptor.Builder()
+            .name("ZooKeeper Client Port")
+            .description("The port on which ZooKeeper is accepting client connections. Required if Hadoop Configuration Files are not provided.")
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .build();
+
+    PropertyDescriptor ZOOKEEPER_ZNODE_PARENT = new PropertyDescriptor.Builder()
+            .name("ZooKeeper ZNode Parent")
+            .description("The ZooKeeper ZNode Parent value for HBase (example: /hbase). Required if Hadoop Configuration Files are not provided.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    PropertyDescriptor HBASE_CLIENT_RETRIES = new PropertyDescriptor.Builder()
+            .name("HBase Client Retries")
+            .description("The number of times the HBase client will retry connecting. Required if Hadoop Configuration Files are not provided.")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("1")
+            .build();
+
     /**
      * Puts a batch of mutations to the given table.
      *

http://git-wip-us.apache.org/repos/asf/nifi/blob/2b9b5e00/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/pom.xml
index a90e0e3..68ea55d 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/pom.xml
@@ -57,6 +57,12 @@
             <artifactId>commons-lang3</artifactId>
             <version>3.4</version>
         </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>log4j-over-slf4j</artifactId>
+            <version>${org.slf4j.version}</version>
+        </dependency>
+
 
         <!-- test dependencies -->
         <dependency>

http://git-wip-us.apache.org/repos/asf/nifi/blob/2b9b5e00/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
index 9c300db..42590c2 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Put;
@@ -31,11 +32,14 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.ParseFilter;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnDisabled;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.ControllerServiceInitializationContext;
@@ -43,6 +47,7 @@ import org.apache.nifi.hbase.put.PutFlowFile;
 import org.apache.nifi.hbase.scan.Column;
 import org.apache.nifi.hbase.scan.ResultCell;
 import org.apache.nifi.hbase.scan.ResultHandler;
+import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.reporting.InitializationException;
 
 import java.io.IOException;
@@ -55,9 +60,20 @@ import java.util.List;
 import java.util.Map;
 
 @Tags({ "hbase", "client"})
-@CapabilityDescription("Implementation of HBaseClientService for HBase 1.1.2.")
+@CapabilityDescription("Implementation of HBaseClientService for HBase 1.1.2. This service can be configured by providing " +
+        "a comma-separated list of configuration files, or by specifying values for the other properties. If configuration files " +
+        "are provided, they will be loaded first, and the values of the additional properties will override the values from " +
+        "the configuration files. In addition, any user defined properties on the processor will also be passed to the HBase " +
+        "configuration.")
+@DynamicProperty(name="The name of an HBase configuration property.", value="The value of the given HBase configuration property.",
+        description="These properties will be set on the HBase configuration after loading any provided configuration files.")
 public class HBase_1_1_2_ClientService extends AbstractControllerService implements HBaseClientService {
 
+    static final String HBASE_CONF_ZK_QUORUM = "hbase.zookeeper.quorum";
+    static final String HBASE_CONF_ZK_PORT = "hbase.zookeeper.property.clientPort";
+    static final String HBASE_CONF_ZNODE_PARENT = "zookeeper.znode.parent";
+    static final String HBASE_CONF_CLIENT_RETRIES = "hbase.client.retries.number";
+
     private volatile Connection connection;
     private List<PropertyDescriptor> properties;
 
@@ -65,6 +81,10 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
     protected void init(ControllerServiceInitializationContext config) throws InitializationException {
         List<PropertyDescriptor> props = new ArrayList<>();
         props.add(HADOOP_CONF_FILES);
+        props.add(ZOOKEEPER_QUORUM);
+        props.add(ZOOKEEPER_CLIENT_PORT);
+        props.add(ZOOKEEPER_ZNODE_PARENT);
+        props.add(HBASE_CLIENT_RETRIES);
         this.properties = Collections.unmodifiableList(props);
     }
 
@@ -73,16 +93,83 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
         return properties;
     }
 
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .description("Specifies the value for '" + propertyDescriptorName + "' in the HBase configuration.")
+                .name(propertyDescriptorName)
+                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                .dynamic(true)
+                .build();
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        boolean confFileProvided = validationContext.getProperty(HADOOP_CONF_FILES).isSet();
+        boolean zkQuorumProvided = validationContext.getProperty(ZOOKEEPER_QUORUM).isSet();
+        boolean zkPortProvided = validationContext.getProperty(ZOOKEEPER_CLIENT_PORT).isSet();
+        boolean znodeParentProvided = validationContext.getProperty(ZOOKEEPER_ZNODE_PARENT).isSet();
+        boolean retriesProvided = validationContext.getProperty(HBASE_CLIENT_RETRIES).isSet();
+
+        final List<ValidationResult> problems = new ArrayList<>();
+
+        if (!confFileProvided && (!zkQuorumProvided || !zkPortProvided || !znodeParentProvided || !retriesProvided)) {
+            problems.add(new ValidationResult.Builder()
+                    .valid(false)
+                    .subject(this.getClass().getSimpleName())
+                    .explanation("ZooKeeper Quorum, ZooKeeper Client Port, ZooKeeper ZNode Parent, and HBase Client Retries are required " +
+                            "when Hadoop Configuration Files are not provided.")
+                    .build());
+        }
+
+        return problems;
+    }
+
     @OnEnabled
     public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException {
         this.connection = createConnection(context);
+
+        // connection check
+        if (this.connection != null) {
+            final Admin admin = this.connection.getAdmin();
+            if (admin != null) {
+                admin.listTableNames();
+            }
+        }
     }
 
     protected Connection createConnection(final ConfigurationContext context) throws IOException {
         final Configuration hbaseConfig = HBaseConfiguration.create();
-        for (final String configFile : context.getProperty(HADOOP_CONF_FILES).getValue().split(",")) {
-            hbaseConfig.addResource(new Path(configFile.trim()));
+
+        // if conf files are provided, start with those
+        if (context.getProperty(HADOOP_CONF_FILES).isSet()) {
+            for (final String configFile : context.getProperty(HADOOP_CONF_FILES).getValue().split(",")) {
+                hbaseConfig.addResource(new Path(configFile.trim()));
+            }
+        }
+
+        // override with any properties that are provided
+        if (context.getProperty(ZOOKEEPER_QUORUM).isSet()) {
+            hbaseConfig.set(HBASE_CONF_ZK_QUORUM, context.getProperty(ZOOKEEPER_QUORUM).getValue());
+        }
+        if (context.getProperty(ZOOKEEPER_CLIENT_PORT).isSet()) {
+            hbaseConfig.set(HBASE_CONF_ZK_PORT, context.getProperty(ZOOKEEPER_CLIENT_PORT).getValue());
         }
+        if (context.getProperty(ZOOKEEPER_ZNODE_PARENT).isSet()) {
+            hbaseConfig.set(HBASE_CONF_ZNODE_PARENT, context.getProperty(ZOOKEEPER_ZNODE_PARENT).getValue());
+        }
+        if (context.getProperty(HBASE_CLIENT_RETRIES).isSet()) {
+            hbaseConfig.set(HBASE_CONF_CLIENT_RETRIES, context.getProperty(HBASE_CLIENT_RETRIES).getValue());
+        }
+
+        // add any dynamic properties to the HBase configuration
+        for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
+            final PropertyDescriptor descriptor = entry.getKey();
+            if (descriptor.isDynamic()) {
+                hbaseConfig.set(descriptor.getName(), entry.getValue());
+            }
+        }
+
         return ConnectionFactory.createConnection(hbaseConfig);
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/2b9b5e00/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java
index 71dd51b..1575f3c 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java
@@ -56,6 +56,73 @@ import static org.mockito.Mockito.when;
 public class TestHBase_1_1_2_ClientService {
 
     @Test
+    public void testCustomValidate() throws InitializationException {
+        final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
+
+        final String tableName = "nifi";
+        final Table table = Mockito.mock(Table.class);
+        when(table.getName()).thenReturn(TableName.valueOf(tableName));
+
+        // no conf file or zk properties so should be invalid
+        MockHBaseClientService service = new MockHBaseClientService(table);
+        runner.addControllerService("hbaseClientService", service);
+        runner.enableControllerService(service);
+
+        runner.assertNotValid(service);
+        runner.removeControllerService(service);
+
+        // conf file with no zk properties should be valid
+        service = new MockHBaseClientService(table);
+        runner.addControllerService("hbaseClientService", service);
+        runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/core-site.xml");
+        runner.enableControllerService(service);
+
+        runner.assertValid(service);
+        runner.removeControllerService(service);
+
+        // only quorum and no conf file should be invalid
+        service = new MockHBaseClientService(table);
+        runner.addControllerService("hbaseClientService", service);
+        runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_QUORUM, "localhost");
+        runner.enableControllerService(service);
+
+        runner.assertNotValid(service);
+        runner.removeControllerService(service);
+
+        // quorum and port, no znode, no conf file, should be invalid
+        service = new MockHBaseClientService(table);
+        runner.addControllerService("hbaseClientService", service);
+        runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_QUORUM, "localhost");
+        runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_CLIENT_PORT, "2181");
+        runner.enableControllerService(service);
+
+        runner.assertNotValid(service);
+        runner.removeControllerService(service);
+
+        // quorum, port, and znode, no conf file, should be valid
+        service = new MockHBaseClientService(table);
+        runner.addControllerService("hbaseClientService", service);
+        runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_QUORUM, "localhost");
+        runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_CLIENT_PORT, "2181");
+        runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_ZNODE_PARENT, "/hbase");
+        runner.enableControllerService(service);
+
+        runner.assertValid(service);
+        runner.removeControllerService(service);
+
+        // quorum and port with conf file should be valid
+        service = new MockHBaseClientService(table);
+        runner.addControllerService("hbaseClientService", service);
+        runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/core-site.xml");
+        runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_QUORUM, "localhost");
+        runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_CLIENT_PORT, "2181");
+        runner.enableControllerService(service);
+
+        runner.assertValid(service);
+        runner.removeControllerService(service);
+    }
+
+    @Test
     public void testSinglePut() throws InitializationException, IOException {
         final String tableName = "nifi";
         final String row = "row1";