You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2018/08/03 14:38:01 UTC

nifi git commit: NIFI-5484: Fixed PutHive3Streaming to use the Hive Metastore URI property (to include multiple URIs) NIFI-5484: Incorporated review comments, added unit test for new validator

Repository: nifi
Updated Branches:
  refs/heads/master 5e6c43f83 -> 3d546b8d8


NIFI-5484: Fixed PutHive3Streaming to use the Hive Metastore URI property (to include multiple URIs)
NIFI-5484: Incorporated review comments, added unit test for new validator

This closes #2934


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/3d546b8d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/3d546b8d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/3d546b8d

Branch: refs/heads/master
Commit: 3d546b8d87178ad84a76734938aa1eecccb8c38a
Parents: 5e6c43f
Author: Matthew Burgess <ma...@apache.org>
Authored: Thu Aug 2 21:39:48 2018 -0400
Committer: Matt Gilman <ma...@gmail.com>
Committed: Fri Aug 3 10:36:56 2018 -0400

----------------------------------------------------------------------
 .../nifi/processor/util/StandardValidators.java | 19 +++++++++++
 .../util/validator/TestStandardValidators.java  | 20 +++++++++++
 .../nifi/processors/hive/PutHive3Streaming.java | 29 +++++++++++-----
 .../processors/hive/TestPutHive3Streaming.java  | 36 ++++++++++++++++++++
 4 files changed, 96 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/3d546b8d/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
index 0ea7c17..51e41db 100644
--- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
@@ -36,6 +36,7 @@ import java.text.ParseException;
 import java.time.Instant;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 
@@ -390,6 +391,24 @@ public class StandardValidators {
         }
     };
 
+    public static final Validator URI_LIST_VALIDATOR = (subject, input, context) -> {
+
+        if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
+            return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
+        }
+
+        if (input == null || input.isEmpty()) {
+            return new ValidationResult.Builder().subject(subject).input(input).explanation("Not a valid URI, value is missing or empty").valid(false).build();
+        }
+
+        Optional<ValidationResult> invalidUri = Arrays.stream(input.split(","))
+                .filter(uri -> uri != null && !uri.trim().isEmpty())
+                .map(String::trim)
+                .map((uri) -> StandardValidators.URI_VALIDATOR.validate(subject,uri,context)).filter((uri) -> !uri.isValid()).findFirst();
+
+        return invalidUri.orElseGet(() -> new ValidationResult.Builder().subject(subject).input(input).explanation("Valid URI(s)").valid(true).build());
+    };
+
     public static final Validator REGULAR_EXPRESSION_VALIDATOR = createRegexValidator(0, Integer.MAX_VALUE, false);
 
     public static final Validator ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR = new Validator() {

http://git-wip-us.apache.org/repos/asf/nifi/blob/3d546b8d/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/validator/TestStandardValidators.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/validator/TestStandardValidators.java b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/validator/TestStandardValidators.java
index ffebb9d..f02946b 100644
--- a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/validator/TestStandardValidators.java
+++ b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/validator/TestStandardValidators.java
@@ -288,4 +288,24 @@ public class TestStandardValidators {
         vr = val.validate("foo", "2016-01-01T01:01:01.000Z", vc);
         assertTrue(vr.isValid());
     }
+
+    @Test
+    public void testURIListValidator() {
+        Validator val = StandardValidators.URI_LIST_VALIDATOR;
+        ValidationContext vc = mock(ValidationContext.class);
+        ValidationResult vr = val.validate("foo", null, vc);
+        assertFalse(vr.isValid());
+
+        vr = val.validate("foo", "", vc);
+        assertFalse(vr.isValid());
+
+        vr = val.validate("foo", "/no_scheme", vc);
+        assertTrue(vr.isValid());
+
+        vr = val.validate("foo", "http://localhost 8080, https://host2:8080 ", vc);
+        assertFalse(vr.isValid());
+
+        vr = val.validate("foo", "http://localhost , https://host2:8080 ", vc);
+        assertTrue(vr.isValid());
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/3d546b8d/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
index 664915c..6e87771 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
@@ -19,6 +19,7 @@ package org.apache.nifi.processors.hive;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hive.common.util.ShutdownHookManager;
 import org.apache.hive.streaming.ConnectionError;
@@ -81,7 +82,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 import static org.apache.nifi.processors.hive.AbstractHive3QLProcessor.ATTR_OUTPUT_TABLES;
@@ -115,12 +115,12 @@ public class PutHive3Streaming extends AbstractProcessor {
     static final PropertyDescriptor METASTORE_URI = new PropertyDescriptor.Builder()
             .name("hive3-stream-metastore-uri")
             .displayName("Hive Metastore URI")
-            .description("The URI location for the Hive Metastore. Note that this is not the location of the Hive Server. The default port for the "
-                    + "Hive metastore is 9043.")
-            .required(true)
+            .description("The URI location(s) for the Hive metastore. This is a comma-separated list of Hive metastore URIs; note that this is not the location of the Hive Server. "
+                    + "The default port for the Hive metastore is 9043. If this field is not set, then the 'hive.metastore.uris' property from any provided configuration resources "
+                    + "will be used, and if none are provided, then the default value from a default hive-site.xml will be used (usually thrift://localhost:9083).")
+            .required(false)
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .addValidator(StandardValidators.URI_VALIDATOR)
-            .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))) // no start with / or end with /
+            .addValidator(StandardValidators.URI_LIST_VALIDATOR)
             .build();
 
     static final PropertyDescriptor HIVE_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder()
@@ -354,13 +354,26 @@ public class PutHive3Streaming extends AbstractProcessor {
         final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
 
         final ComponentLog log = getLogger();
-        final String metastoreUri = context.getProperty(METASTORE_URI).evaluateAttributeExpressions(flowFile).getValue();
+        String metastoreURIs = null;
+        if (context.getProperty(METASTORE_URI).isSet()) {
+            metastoreURIs = context.getProperty(METASTORE_URI).evaluateAttributeExpressions(flowFile).getValue();
+            if (StringUtils.isEmpty(metastoreURIs)) {
+                // Shouldn't be empty at this point, log an error, penalize the flow file, and return
+                log.error("The '" + METASTORE_URI.getDisplayName() + "' property evaluated to null or empty, penalizing flow file, routing to failure");
+                session.transfer(session.penalize(flowFile), REL_FAILURE);
+            }
+        }
 
         final String partitionValuesString = context.getProperty(PARTITION_VALUES).evaluateAttributeExpressions(flowFile).getValue();
         final boolean autoCreatePartitions = context.getProperty(AUTOCREATE_PARTITIONS).asBoolean();
         final boolean disableStreamingOptimizations = context.getProperty(DISABLE_STREAMING_OPTIMIZATIONS).asBoolean();
 
-        HiveOptions o = new HiveOptions(metastoreUri, dbName, tableName)
+        // Override the Hive Metastore URIs in the config if set by the user
+        if (metastoreURIs != null) {
+           hiveConfig.set(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName(), metastoreURIs);
+        }
+
+        HiveOptions o = new HiveOptions(metastoreURIs, dbName, tableName)
                 .withHiveConf(hiveConfig)
                 .withAutoCreatePartitions(autoCreatePartitions)
                 .withCallTimeout(callTimeout)

http://git-wip-us.apache.org/repos/asf/nifi/blob/3d546b8d/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java
index 6a65783..cfc6017 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.serde.serdeConstants;
@@ -289,6 +290,35 @@ public class TestPutHive3Streaming {
     }
 
     @Test
+    public void onTriggerMultipleURIs() throws Exception {
+        configure(processor, 1);
+        runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://host1:9083,thrift://host2:9083");
+        runner.setProperty(PutHive3Streaming.DB_NAME, "default");
+        runner.setProperty(PutHive3Streaming.TABLE_NAME, "users");
+        runner.enqueue(new byte[0]);
+        runner.run();
+
+        runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 1);
+        final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutHive3Streaming.REL_SUCCESS).get(0);
+        assertEquals("1", flowFile.getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR));
+        assertEquals("default.users", flowFile.getAttribute(ATTR_OUTPUT_TABLES));
+    }
+
+    @Test
+    public void onTriggerURIFromConfigFile() throws Exception {
+        configure(processor, 1);
+        runner.setProperty(PutHive3Streaming.DB_NAME, "default");
+        runner.setProperty(PutHive3Streaming.TABLE_NAME, "users");
+        runner.enqueue(new byte[0]);
+        runner.run();
+
+        runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 1);
+        final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutHive3Streaming.REL_SUCCESS).get(0);
+        assertEquals("1", flowFile.getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR));
+        assertEquals("default.users", flowFile.getAttribute(ATTR_OUTPUT_TABLES));
+    }
+
+    @Test
     public void onTriggerComplex() throws Exception {
         configureComplex(processor, 10, -1, null);
         runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083");
@@ -662,6 +692,12 @@ public class TestPutHive3Streaming {
         @Override
         StreamingConnection makeStreamingConnection(HiveOptions options, RecordReader reader) throws StreamingException {
 
+            // Test here to ensure the 'hive.metastore.uris' property matches the options.getMetastoreUri() value (if it is set)
+            String userDefinedMetastoreURI = options.getMetaStoreURI();
+            if (null != userDefinedMetastoreURI) {
+                assertEquals(userDefinedMetastoreURI, options.getHiveConf().get(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName()));
+            }
+
             if (generateConnectFailure) {
                 throw new StubConnectionError("Unit Test - Connection Error");
             }