You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2018/12/05 00:54:27 UTC

nifi git commit: NIFI-5838 - Improve the schema validation method in Kite processors

Repository: nifi
Updated Branches:
  refs/heads/master cc9f89b00 -> 986a2a484


NIFI-5838 - Improve the schema validation method in Kite processors

review

Add empty check

This closes #3182.

Signed-off-by: Koji Kawamura <ij...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/986a2a48
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/986a2a48
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/986a2a48

Branch: refs/heads/master
Commit: 986a2a484285a342e20494107abe52ff98ad2880
Parents: cc9f89b
Author: Pierre Villard <pi...@gmail.com>
Authored: Thu Nov 22 18:50:11 2018 +0100
Committer: Koji Kawamura <ij...@apache.org>
Committed: Wed Dec 5 09:53:52 2018 +0900

----------------------------------------------------------------------
 .../processors/kite/AbstractKiteProcessor.java  | 25 ++++++++++++--------
 .../processors/kite/TestCSVToAvroProcessor.java | 18 ++++++++++++++
 2 files changed, 33 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/986a2a48/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java
index 65dcd5f..345c1c2 100644
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java
+++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java
@@ -39,6 +39,7 @@ import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processors.hadoop.HadoopValidators;
+import org.apache.nifi.util.StringUtils;
 import org.kitesdk.data.DatasetNotFoundException;
 import org.kitesdk.data.Datasets;
 import org.kitesdk.data.SchemaNotFoundException;
@@ -101,29 +102,30 @@ abstract class AbstractKiteProcessor extends AbstractProcessor {
             return parseSchema(uriOrLiteral);
         }
 
+        if(uri.getScheme() == null) {
+            throw new SchemaNotFoundException("If the schema is not a JSON string, a scheme must be specified in the URI "
+                    + "(ex: dataset:, view:, resource:, file:, hdfs:, etc).");
+        }
+
         try {
             if ("dataset".equals(uri.getScheme()) || "view".equals(uri.getScheme())) {
                 return Datasets.load(uri).getDataset().getDescriptor().getSchema();
             } else if ("resource".equals(uri.getScheme())) {
-                try (InputStream in = Resources.getResource(uri.getSchemeSpecificPart())
-                        .openStream()) {
+                try (InputStream in = Resources.getResource(uri.getSchemeSpecificPart()).openStream()) {
                     return parseSchema(uri, in);
                 }
             } else {
                 // try to open the file
                 Path schemaPath = new Path(uri);
-                FileSystem fs = schemaPath.getFileSystem(conf);
-                try (InputStream in = fs.open(schemaPath)) {
+                try (FileSystem fs = schemaPath.getFileSystem(conf); InputStream in = fs.open(schemaPath)) {
                     return parseSchema(uri, in);
                 }
             }
 
         } catch (DatasetNotFoundException e) {
-            throw new SchemaNotFoundException(
-                    "Cannot read schema of missing dataset: " + uri, e);
+            throw new SchemaNotFoundException("Cannot read schema of missing dataset: " + uri, e);
         } catch (IOException e) {
-            throw new SchemaNotFoundException(
-                    "Failed while reading " + uri + ": " + e.getMessage(), e);
+            throw new SchemaNotFoundException("Failed while reading " + uri + ": " + e.getMessage(), e);
         }
     }
 
@@ -131,8 +133,7 @@ abstract class AbstractKiteProcessor extends AbstractProcessor {
         try {
             return new Schema.Parser().parse(literal);
         } catch (RuntimeException e) {
-            throw new SchemaNotFoundException(
-                    "Failed to parse schema: " + literal, e);
+            throw new SchemaNotFoundException("Failed to parse schema: " + literal, e);
         }
     }
 
@@ -150,6 +151,10 @@ abstract class AbstractKiteProcessor extends AbstractProcessor {
             Configuration conf = getConfiguration(context.getProperty(CONF_XML_FILES).evaluateAttributeExpressions().getValue());
             String error = null;
 
+            if(StringUtils.isBlank(uri)) {
+                return new ValidationResult.Builder().subject(subject).input(uri).explanation("Schema cannot be null.").valid(false).build();
+            }
+
             final boolean elPresent = context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(uri);
             if (!elPresent) {
                 try {

http://git-wip-us.apache.org/repos/asf/nifi/blob/986a2a48/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
index 8bad01c..50f5599 100644
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
+++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
@@ -19,6 +19,7 @@
 package org.apache.nifi.processors.kite;
 
 import java.io.ByteArrayInputStream;
+import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.Charset;
@@ -67,6 +68,23 @@ public class TestCSVToAvroProcessor {
     public static final String FAILURE_SUMMARY = "" +
             "Field id: cannot make \"long\" value: '': Field id type:LONG pos:0 not set and has no default value";
 
+
+    /**
+     * Test for a schema that is not a JSON but does not throw exception when trying to parse as an URI
+     */
+    @Test
+    public void testSchemeValidation() throws IOException {
+        TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
+        runner.setProperty(ConvertCSVToAvro.SCHEMA, "column1;column2");
+        runner.assertNotValid();
+        runner.setProperty(ConvertCSVToAvro.SCHEMA, "src/test/resources/Shapes_header.csv.avro");
+        runner.assertNotValid();
+        runner.setProperty(ConvertCSVToAvro.SCHEMA, "file:" + new File("src/test/resources/Shapes_header.csv.avro").getAbsolutePath());
+        runner.assertValid();
+        runner.setProperty(ConvertCSVToAvro.SCHEMA, "");
+        runner.assertNotValid();
+    }
+
     /**
      * Basic test for tab separated files, similar to #test
      */