You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2022/01/19 16:24:44 UTC
[nifi] branch main updated: NIFI-9591 This closes #5674. Removed nifi-kite-bundle
This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 8edb5fa NIFI-9591 This closes #5674. Removed nifi-kite-bundle
8edb5fa is described below
commit 8edb5faac9ff469e4c810891c30892962ed83b29
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Wed Jan 19 09:15:56 2022 -0600
NIFI-9591 This closes #5674. Removed nifi-kite-bundle
Signed-off-by: Joe Witt <jo...@apache.org>
---
nifi-assembly/NOTICE | 11 -
.../nifi-kite-bundle/nifi-kite-nar/pom.xml | 130 ------
.../src/main/resources/META-INF/NOTICE | 73 ---
.../nifi-kite-bundle/nifi-kite-processors/pom.xml | 365 ---------------
.../kite/AbstractKiteConvertProcessor.java | 62 ---
.../processors/kite/AbstractKiteProcessor.java | 213 ---------
.../nifi/processors/kite/AvroRecordConverter.java | 341 --------------
.../org/apache/nifi/processors/kite/AvroUtil.java | 38 --
.../nifi/processors/kite/ConvertAvroSchema.java | 396 ----------------
.../nifi/processors/kite/ConvertCSVToAvro.java | 311 -------------
.../nifi/processors/kite/ConvertJSONToAvro.java | 215 ---------
.../nifi/processors/kite/FailureTracker.java | 83 ----
.../nifi/processors/kite/InferAvroSchema.java | 499 ---------------------
.../nifi/processors/kite/StoreInKiteDataset.java | 178 --------
.../services/org.apache.nifi.processor.Processor | 19 -
.../additionalDetails.html | 142 ------
.../processors/kite/TestAvroRecordConverter.java | 202 ---------
.../processors/kite/TestCSVToAvroProcessor.java | 325 --------------
.../processors/kite/TestConfigurationProperty.java | 120 -----
.../processors/kite/TestConvertAvroSchema.java | 372 ---------------
.../apache/nifi/processors/kite/TestGetSchema.java | 100 -----
.../nifi/processors/kite/TestInferAvroSchema.java | 340 --------------
.../processors/kite/TestJSONToAvroProcessor.java | 185 --------
.../processors/kite/TestKiteProcessorsCluster.java | 131 ------
.../processors/kite/TestKiteStorageProcessor.java | 170 -------
.../org/apache/nifi/processors/kite/TestUtil.java | 103 -----
.../src/test/resources/Shapes.json | 10 -
.../src/test/resources/Shapes.json.avro | 34 --
.../src/test/resources/ShapesHeader.csv | 1 -
.../src/test/resources/Shapes_Header.csv | 352 ---------------
.../test/resources/Shapes_Header_TabDelimited.csv | 352 ---------------
.../src/test/resources/Shapes_NoHeader.csv | 351 ---------------
.../src/test/resources/Shapes_header.csv.avro | 23 -
nifi-nar-bundles/nifi-kite-bundle/pom.xml | 89 ----
nifi-nar-bundles/pom.xml | 1 -
35 files changed, 6337 deletions(-)
diff --git a/nifi-assembly/NOTICE b/nifi-assembly/NOTICE
index 04fce9b..e5cf091 100644
--- a/nifi-assembly/NOTICE
+++ b/nifi-assembly/NOTICE
@@ -936,17 +936,6 @@ The following binary components are provided under the Apache Software License v
The following NOTICE information applies:
Copyright 2011 JsonPath authors
- (ASLv2) Kite SDK
- The following NOTICE information applies:
- This product includes software developed by Cloudera, Inc.
- (http://www.cloudera.com/).
-
- This product includes software developed at
- The Apache Software Foundation (http://www.apache.org/).
-
- This product includes software developed by
- Saxonica (http://www.saxonica.com/).
-
(ASLv2) MongoDB Java Driver
The following NOTICE information applies:
Copyright (C) 2008-2013 10gen, Inc.
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-nar/pom.xml b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-nar/pom.xml
deleted file mode 100644
index 3d99f3a..0000000
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-nar/pom.xml
+++ /dev/null
@@ -1,130 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
- <!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- -->
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-kite-bundle</artifactId>
- <version>1.16.0-SNAPSHOT</version>
- </parent>
-
- <artifactId>nifi-kite-nar</artifactId>
- <packaging>nar</packaging>
- <properties>
- <maven.javadoc.skip>true</maven.javadoc.skip>
- <source.skip>true</source.skip>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-hadoop-libraries-nar</artifactId>
- <version>1.16.0-SNAPSHOT</version>
- <type>nar</type>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-kite-processors</artifactId>
- <!-- The following are inherited from nifi-hadoop-libraries-nar -->
- <exclusions>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.xerial.snappy</groupId>
- <artifactId>snappy-java</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.thoughtworks.paranamer</groupId>
- <artifactId>paranamer</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-mapper-asl</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-core-asl</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-compress</groupId>
- <artifactId>commons-compress</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-compress</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-codec</groupId>
- <artifactId>commons-codec</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-cli</groupId>
- <artifactId>commons-cli</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-collections</groupId>
- <artifactId>commons-collections</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-lang</groupId>
- <artifactId>commons-lang</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-framework</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-client</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.log4j</groupId>
- <artifactId>apache-log4j-extras</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- </dependencies>
-</project>
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-nar/src/main/resources/META-INF/NOTICE
deleted file mode 100644
index 88b5dae..0000000
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-nar/src/main/resources/META-INF/NOTICE
+++ /dev/null
@@ -1,73 +0,0 @@
-nifi-kite-nar
-Copyright 2014-2022 The Apache Software Foundation
-
-This product includes software developed at
-The Apache Software Foundation (http://www.apache.org/).
-
-******************
-Apache Software License v2
-******************
-
-The following binary components are provided under the Apache Software License v2
-
- (ASLv2) Apache Avro
- The following NOTICE information applies:
- Apache Avro
- Copyright 2009-2017 The Apache Software Foundation
-
- (ASLv2) Apache Commons Lang
- The following NOTICE information applies:
- Apache Commons Lang
- Copyright 2001-2015 The Apache Software Foundation
-
- This product includes software from the Spring Framework,
- under the Apache License 2.0 (see: StringUtils.containsWhitespace())
-
- (ASLv2) Apache Commons JEXL
- The following NOTICE information applies:
- Apache Commons JEXL
- Copyright 2001-2011 The Apache Software Foundation
-
- (ASLv2) Kite SDK
- The following NOTICE information applies:
- This product includes software developed by Cloudera, Inc.
- (http://www.cloudera.com/).
-
- This product includes software developed at
- The Apache Software Foundation (http://www.apache.org/).
-
- This product includes software developed by
- Saxonica (http://www.saxonica.com/).
-
- (ASLv2) Parquet MR
- The following NOTICE information applies:
- Parquet MR
- Copyright 2012 Twitter, Inc.
-
- This project includes code from https://github.com/lemire/JavaFastPFOR
- parquet-column/src/main/java/parquet/column/values/bitpacking/LemireBitPacking.java
- Apache License Version 2.0 http://www.apache.org/licenses/.
- (c) Daniel Lemire, http://lemire.me/en/
-
- (ASLv2) Jackson JSON processor
- The following NOTICE information applies:
- # Jackson JSON processor
-
- Jackson is a high-performance, Free/Open Source JSON processing library.
- It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has
- been in development since 2007.
- It is currently developed by a community of developers, as well as supported
- commercially by FasterXML.com.
-
- ## Licensing
-
- Jackson core and extension components may licensed under different licenses.
- To find the details that apply to this artifact see the accompanying LICENSE file.
- For more information, including possible other licensing options, contact
- FasterXML.com (http://fasterxml.com).
-
- ## Credits
-
- A list of contributors may be found from CREDITS file, which is included
- in some artifacts (usually source distributions); but is always available
- from the source code management (SCM) system project uses.
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/pom.xml b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/pom.xml
deleted file mode 100644
index 8f6578b..0000000
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/pom.xml
+++ /dev/null
@@ -1,365 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
- <!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- -->
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-kite-bundle</artifactId>
- <version>1.16.0-SNAPSHOT</version>
- </parent>
-
- <artifactId>nifi-kite-processors</artifactId>
- <packaging>jar</packaging>
-
- <properties>
- <hive.version>1.2.1</hive.version>
- <kite.version>1.1.0</kite.version>
- <findbugs-annotations.version>1.3.9-1</findbugs-annotations.version>
- </properties>
-
- <dependencies>
- <!-- NiFi -->
-
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-utils</artifactId>
- <version>1.16.0-SNAPSHOT</version>
- </dependency>
-
- <!-- Kite -->
-
- <dependency>
- <groupId>org.kitesdk</groupId>
- <artifactId>kite-data-core</artifactId>
- <version>${kite.version}</version>
- <exclusions>
- <exclusion>
- <!-- Use findbugs-annotations instead -->
- <groupId>com.google.code.findbugs</groupId>
- <artifactId>jsr305</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- <version>3.9</version>
- </dependency>
-
- <dependency>
- <groupId>org.kitesdk</groupId>
- <artifactId>kite-data-hive</artifactId>
- <version>${kite.version}</version>
- <exclusions>
- <exclusion>
- <!-- Use findbugs-annotations instead -->
- <groupId>com.google.code.findbugs</groupId>
- <artifactId>jsr305</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.twitter</groupId>
- <artifactId>parquet-hive-bundle</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.kitesdk</groupId>
- <artifactId>kite-hadoop-dependencies</artifactId>
- <type>pom</type>
- <scope>provided</scope>
- <version>${kite.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-app</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <!-- Hive dependencies to connect to the MetaStore -->
- <dependency>
- <groupId>org.apache.hive.hcatalog</groupId>
- <artifactId>hive-hcatalog-core</artifactId>
- <version>${hive.version}</version>
- <exclusions>
- <exclusion>
- <groupId>com.google.code.findbugs</groupId>
- <artifactId>jsr305</artifactId>
- </exclusion>
- <exclusion>
- <artifactId>servlet-api</artifactId>
- <groupId>javax.servlet</groupId>
- </exclusion>
- <exclusion>
- <artifactId>jetty-all</artifactId>
- <groupId>org.eclipse.jetty.aggregate</groupId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-exec</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-cli</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-service</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.twitter</groupId>
- <artifactId>parquet-hadoop-bundle</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-core</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.antlr</groupId>
- <artifactId>antlr-runtime</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.ant</groupId>
- <artifactId>ant</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.json</groupId>
- <artifactId>json</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-pool</groupId>
- <artifactId>commons-pool</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-dbcp</groupId>
- <artifactId>commons-dbcp</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.jolbox</groupId>
- <artifactId>bonecp</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.derby</groupId>
- <artifactId>derby</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpclient</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpcore</artifactId>
- </exclusion>
- <exclusion>
- <groupId>jline</groupId>
- <artifactId>jline</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>apache-log4j-extras</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>28.0-jre</version>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>log4j-over-slf4j</artifactId>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>jcl-over-slf4j</artifactId>
- </dependency>
-
- <dependency>
- <!-- avoid warnings by bundling annotations -->
- <groupId>com.github.stephenc.findbugs</groupId>
- <artifactId>findbugs-annotations</artifactId>
- <scope>compile</scope>
- <version>${findbugs-annotations.version}</version>
- </dependency>
-
- <!-- Test dependencies -->
-
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-mock</artifactId>
- <version>1.16.0-SNAPSHOT</version>
- <scope>test</scope>
- </dependency>
- <!-- must override implicitly dependency on avro to get more recent codec factory options-->
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
- <version>1.8.1</version>
- </dependency>
-
- <dependency>
- <groupId>org.kitesdk</groupId>
- <artifactId>kite-minicluster</artifactId>
- <version>${kite.version}</version>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.kitesdk</groupId>
- <artifactId>kite-hadoop-cdh5-dependencies</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.kitesdk</groupId>
- <artifactId>kite-hbase-cdh5-dependencies</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.kitesdk</groupId>
- <artifactId>kite-hadoop-cdh5-test-dependencies</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.kitesdk</groupId>
- <artifactId>kite-hbase-cdh5-test-dependencies</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-serde</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-exec</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-service</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-node</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>com.sun.jersey</groupId>
- <artifactId>jersey-servlet</artifactId>
- <version>1.19</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.kitesdk</groupId>
- <artifactId>kite-data-core</artifactId>
- <version>${kite.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.kitesdk</groupId>
- <artifactId>kite-hadoop-test-dependencies</artifactId>
- <type>pom</type>
- <scope>test</scope>
- <version>${kite.version}</version>
- <exclusions>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>apache-log4j-extras</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-hadoop-utils</artifactId>
- <version>1.16.0-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-text</artifactId>
- <version>1.8</version>
- </dependency>
-
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.rat</groupId>
- <artifactId>apache-rat-plugin</artifactId>
- <configuration>
- <excludes combine.children="append">
- <exclude>src/test/resources/*.csv</exclude>
- <exclude>src/test/resources/*.json</exclude>
- <exclude>src/test/resources/*.avro</exclude>
- </excludes>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
-</project>
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteConvertProcessor.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteConvertProcessor.java
deleted file mode 100644
index 561bf46..0000000
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteConvertProcessor.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nifi.processors.kite;
-
-import org.apache.avro.file.CodecFactory;
-import org.apache.nifi.components.PropertyDescriptor;
-
-import com.google.common.annotations.VisibleForTesting;
-
-abstract class AbstractKiteConvertProcessor extends AbstractKiteProcessor {
-
- @VisibleForTesting
- static final PropertyDescriptor COMPRESSION_TYPE = new PropertyDescriptor.Builder()
- .name("kite-compression-type")
- .displayName("Compression type")
- .description("Compression type to use when writting Avro files. Default is Snappy.")
- .allowableValues(CodecType.values())
- .defaultValue(CodecType.SNAPPY.toString())
- .build();
-
- public enum CodecType {
- BZIP2,
- DEFLATE,
- NONE,
- SNAPPY,
- LZO
- }
-
- protected CodecFactory getCodecFactory(String property) {
- CodecType type = CodecType.valueOf(property);
- switch (type) {
- case BZIP2:
- return CodecFactory.bzip2Codec();
- case DEFLATE:
- return CodecFactory.deflateCodec(CodecFactory.DEFAULT_DEFLATE_LEVEL);
- case NONE:
- return CodecFactory.nullCodec();
- case LZO:
- return CodecFactory.xzCodec(CodecFactory.DEFAULT_XZ_LEVEL);
- case SNAPPY:
- default:
- return CodecFactory.snappyCodec();
- }
- }
-
-}
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java
deleted file mode 100644
index a2afa57..0000000
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nifi.processors.kite;
-
-import com.google.common.base.Splitter;
-import com.google.common.collect.ImmutableList;
-import com.google.common.io.Resources;
-import org.apache.avro.Schema;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.Validator;
-import org.apache.nifi.components.resource.ResourceCardinality;
-import org.apache.nifi.components.resource.ResourceType;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.util.StringUtils;
-import org.kitesdk.data.DatasetNotFoundException;
-import org.kitesdk.data.Datasets;
-import org.kitesdk.data.SchemaNotFoundException;
-import org.kitesdk.data.URIBuilder;
-import org.kitesdk.data.spi.DefaultConfiguration;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.List;
-
-abstract class AbstractKiteProcessor extends AbstractProcessor {
-
- private static final Splitter COMMA = Splitter.on(',').trimResults();
-
- protected static final PropertyDescriptor CONF_XML_FILES = new PropertyDescriptor.Builder()
- .name("Hadoop configuration files")
- .displayName("Hadoop configuration Resources")
- .description("A file or comma separated list of files which contains the Hadoop file system configuration. Without this, Hadoop "
- + "will search the classpath for a 'core-site.xml' and 'hdfs-site.xml' file or will revert to a default configuration.")
- .required(false)
- .identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE)
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .build();
-
- protected static final Validator RECOGNIZED_URI = new Validator() {
- @Override
- public ValidationResult validate(String subject, String uri, ValidationContext context) {
- String message = "not set";
- boolean isValid = true;
-
- if (uri.trim().isEmpty()) {
- isValid = false;
- } else {
- final boolean elPresent = context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(uri);
- if (!elPresent) {
- try {
- new URIBuilder(URI.create(uri)).build();
- } catch (RuntimeException e) {
- message = e.getMessage();
- isValid = false;
- }
- }
- }
-
- return new ValidationResult.Builder()
- .subject(subject)
- .input(uri)
- .explanation("Dataset URI is invalid: " + message)
- .valid(isValid)
- .build();
- }
- };
-
- /**
- * Resolves a {@link Schema} for the given string, either a URI or a JSON literal.
- */
- protected static Schema getSchema(String uriOrLiteral, Configuration conf) {
- URI uri;
- try {
- uri = new URI(uriOrLiteral);
- } catch (URISyntaxException e) {
- // try to parse the schema as a literal
- return parseSchema(uriOrLiteral);
- }
-
- if(uri.getScheme() == null) {
- throw new SchemaNotFoundException("If the schema is not a JSON string, a scheme must be specified in the URI "
- + "(ex: dataset:, view:, resource:, file:, hdfs:, etc).");
- }
-
- try {
- if ("dataset".equals(uri.getScheme()) || "view".equals(uri.getScheme())) {
- return Datasets.load(uri).getDataset().getDescriptor().getSchema();
- } else if ("resource".equals(uri.getScheme())) {
- try (InputStream in = Resources.getResource(uri.getSchemeSpecificPart()).openStream()) {
- return parseSchema(uri, in);
- }
- } else {
- // try to open the file
- Path schemaPath = new Path(uri);
- try (FileSystem fs = schemaPath.getFileSystem(conf); InputStream in = fs.open(schemaPath)) {
- return parseSchema(uri, in);
- }
- }
-
- } catch (DatasetNotFoundException e) {
- throw new SchemaNotFoundException("Cannot read schema of missing dataset: " + uri, e);
- } catch (IOException e) {
- throw new SchemaNotFoundException("Failed while reading " + uri + ": " + e.getMessage(), e);
- }
- }
-
- private static Schema parseSchema(String literal) {
- try {
- return new Schema.Parser().parse(literal);
- } catch (RuntimeException e) {
- throw new SchemaNotFoundException("Failed to parse schema: " + literal, e);
- }
- }
-
- private static Schema parseSchema(URI uri, InputStream in) throws IOException {
- try {
- return new Schema.Parser().parse(in);
- } catch (RuntimeException e) {
- throw new SchemaNotFoundException("Failed to parse schema at " + uri, e);
- }
- }
-
- protected static final Validator SCHEMA_VALIDATOR = new Validator() {
- @Override
- public ValidationResult validate(String subject, String uri, ValidationContext context) {
- Configuration conf = getConfiguration(context.getProperty(CONF_XML_FILES).evaluateAttributeExpressions().getValue());
- String error = null;
-
- if(StringUtils.isBlank(uri)) {
- return new ValidationResult.Builder().subject(subject).input(uri).explanation("Schema cannot be null.").valid(false).build();
- }
-
- final boolean elPresent = context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(uri);
- if (!elPresent) {
- try {
- getSchema(uri, conf);
- } catch (SchemaNotFoundException e) {
- error = e.getMessage();
- }
- }
- return new ValidationResult.Builder()
- .subject(subject)
- .input(uri)
- .explanation(error)
- .valid(error == null)
- .build();
- }
- };
-
- protected static final List<PropertyDescriptor> ABSTRACT_KITE_PROPS = ImmutableList.<PropertyDescriptor>builder()
- .add(CONF_XML_FILES)
- .build();
-
- static List<PropertyDescriptor> getProperties() {
- return ABSTRACT_KITE_PROPS;
- }
-
- @OnScheduled
- protected void setDefaultConfiguration(ProcessContext context)
- throws IOException {
- DefaultConfiguration.set(getConfiguration(
- context.getProperty(CONF_XML_FILES).evaluateAttributeExpressions().getValue()));
- }
-
- protected static Configuration getConfiguration(String configFiles) {
- Configuration conf = DefaultConfiguration.get();
-
- if (configFiles == null || configFiles.isEmpty()) {
- return conf;
- }
-
- for (String file : COMMA.split(configFiles)) {
- // process each resource only once
- if (conf.getResource(file) == null) {
- // use Path instead of String to get the file from the FS
- conf.addResource(new Path(file));
- }
- }
-
- return conf;
- }
-
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return ABSTRACT_KITE_PROPS;
- }
-}
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroRecordConverter.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroRecordConverter.java
deleted file mode 100644
index f66e9ed..0000000
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroRecordConverter.java
+++ /dev/null
@@ -1,341 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nifi.processors.kite;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Scanner;
-
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
-import org.apache.avro.generic.GenericData.Record;
-import org.apache.avro.generic.IndexedRecord;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-/**
- * Responsible for converting records of one Avro type to another. Supports
- * syntax like "record.field" to unpack fields and will try to do simple type
- * conversion.
- */
-public class AvroRecordConverter {
- private final Schema inputSchema;
- private final Schema outputSchema;
- // Store this from output field to input field so we can look up by output.
- private final Map<String, String> fieldMapping;
- private final Locale locale;
- private static final Locale DEFAULT_LOCALE = Locale.getDefault();
-
- /**
- * @param inputSchema
- * Schema of input record objects
- * @param outputSchema
- * Schema of output record objects
- * @param fieldMapping
- * Map from field name in input record to field name in output
- * record.
- */
- public AvroRecordConverter(Schema inputSchema, Schema outputSchema,
- Map<String, String> fieldMapping) {
- this(inputSchema, outputSchema, fieldMapping, DEFAULT_LOCALE);
- }
-
- /**
- * @param inputSchema
- * Schema of input record objects
- * @param outputSchema
- * Schema of output record objects
- * @param fieldMapping
- * Map from field name in input record to field name in output
- * record.
- * @param locale
- * Locale to use
- */
- public AvroRecordConverter(Schema inputSchema, Schema outputSchema,
- Map<String, String> fieldMapping, Locale locale) {
- this.inputSchema = inputSchema;
- this.outputSchema = outputSchema;
- // Need to reverse this map.
- this.fieldMapping = Maps
- .newHashMapWithExpectedSize(fieldMapping.size());
- for (Map.Entry<String, String> entry : fieldMapping.entrySet()) {
- this.fieldMapping.put(entry.getValue(), entry.getKey());
- }
- this.locale = locale;
- }
-
- /**
- * @return Any fields in the output schema that are not mapped or are mapped
- * by a non-existent input field.
- */
- public Collection<String> getUnmappedFields() {
- List<String> result = Lists.newArrayList();
- for (Field f : outputSchema.getFields()) {
- String fieldName = f.name();
- if (fieldMapping.containsKey(fieldName)) {
- fieldName = fieldMapping.get(fieldName);
- }
-
- Schema currentSchema = inputSchema;
- while (fieldName.contains(".")) {
- // Recurse down the schema to find the right field.
- int dotIndex = fieldName.indexOf('.');
- String entityName = fieldName.substring(0, dotIndex);
- // Get the schema. In case we had an optional record, choose
- // just the record.
- currentSchema = getNonNullSchema(currentSchema);
- if (currentSchema.getField(entityName) == null) {
- // Tried to step into a schema that doesn't exist. Break out
- // of the loop
- break;
- }
- currentSchema = currentSchema.getField(entityName).schema();
- fieldName = fieldName.substring(dotIndex + 1);
- }
- if (currentSchema == null
- || getNonNullSchema(currentSchema).getField(fieldName) == null) {
- result.add(f.name());
- }
- }
- return result;
- }
-
- /**
- * Converts one record to another given a input and output schema plus
- * explicit mappings for certain target fields.
- *
- * @param input
- * Input record to convert conforming to the inputSchema this
- * converter was created with.
- * @return Record converted to the outputSchema this converter was created
- * with.
- * @throws AvroConversionException
- * When schemas do not match or illegal conversions are
- * attempted, such as when numeric data fails to parse.
- */
- public Record convert(Record input) throws AvroConversionException {
- Record result = new Record(outputSchema);
- for (Field outputField : outputSchema.getFields()) {
- // Default to matching by name
- String inputFieldName = outputField.name();
- if (fieldMapping.containsKey(outputField.name())) {
- inputFieldName = fieldMapping.get(outputField.name());
- }
-
- IndexedRecord currentRecord = input;
- Schema currentSchema = getNonNullSchema(inputSchema);
- while (inputFieldName.contains(".")) {
- // Recurse down the schema to find the right field.
- int dotIndex = inputFieldName.indexOf('.');
- String entityName = inputFieldName.substring(0, dotIndex);
- // Get the record object
- Object innerRecord = currentRecord.get(currentSchema.getField(
- entityName).pos());
- if (innerRecord == null) {
- // Probably hit a null record here. Just break out of the
- // loop so that null object will be passed to convertData
- // below.
- currentRecord = null;
- break;
- }
- if (innerRecord != null
- && !(innerRecord instanceof IndexedRecord)) {
- throw new AvroConversionException(inputFieldName
- + " stepped through a non-record");
- }
- currentRecord = (IndexedRecord) innerRecord;
-
- // Get the schema. In case we had an optional record, choose
- // just the record.
- currentSchema = currentSchema.getField(entityName).schema();
- currentSchema = getNonNullSchema(currentSchema);
- inputFieldName = inputFieldName.substring(dotIndex + 1);
- }
-
- // Current should now be in the right place to read the record.
- Field f = currentSchema.getField(inputFieldName);
- if (currentRecord == null) {
- // We may have stepped into a null union type and gotten a null
- // result.
- Schema s = null;
- if (f != null) {
- s = f.schema();
- }
- result.put(outputField.name(),
- convertData(null, s, outputField.schema()));
- } else {
- result.put(
- outputField.name(),
- convertData(currentRecord.get(f.pos()), f.schema(),
- outputField.schema()));
- }
- }
- return result;
- }
-
- public Schema getInputSchema() {
- return inputSchema;
- }
-
- public Schema getOutputSchema() {
- return outputSchema;
- }
-
- /**
- * Converts the data from one schema to another. If the types are the same,
- * no change will be made, but simple conversions will be attempted for
- * other types.
- *
- * @param content
- * The data to convert, generally taken from a field in an input
- * Record.
- * @param inputSchema
- * The schema of the content object
- * @param outputSchema
- * The schema to convert to.
- * @return The content object, converted to the output schema.
- * @throws AvroConversionException
- * When conversion is impossible, either because the output type
- * is not supported or because numeric data failed to parse.
- */
- private Object convertData(Object content, Schema inputSchema,
- Schema outputSchema) throws AvroConversionException {
- if (content == null) {
- // No conversion can happen here.
- if (supportsNull(outputSchema)) {
- return null;
- }
- throw new AvroConversionException("Output schema " + outputSchema
- + " does not support null");
- }
-
- Schema nonNillInput = getNonNullSchema(inputSchema);
- Schema nonNillOutput = getNonNullSchema(outputSchema);
- if (nonNillInput.getType().equals(nonNillOutput.getType())) {
- return content;
- } else {
- if (nonNillOutput.getType() == Schema.Type.STRING) {
- return content.toString();
- }
-
- // For the non-string cases of these, we will try to convert through
- // string using Scanner to validate types. This means we could
- // return questionable results when a String starts with a number
- // but then contains other content
- Scanner scanner = new Scanner(content.toString());
- scanner.useLocale(locale);
- switch (nonNillOutput.getType()) {
- case LONG:
- if (scanner.hasNextLong()) {
- return scanner.nextLong();
- } else {
- throw new AvroConversionException("Cannot convert "
- + content + " to long");
- }
- case INT:
- if (scanner.hasNextInt()) {
- return scanner.nextInt();
- } else {
- throw new AvroConversionException("Cannot convert "
- + content + " to int");
- }
- case DOUBLE:
- if (scanner.hasNextDouble()) {
- return scanner.nextDouble();
- } else {
- throw new AvroConversionException("Cannot convert "
- + content + " to double");
- }
- case FLOAT:
- if (scanner.hasNextFloat()) {
- return scanner.nextFloat();
- } else {
- throw new AvroConversionException("Cannot convert "
- + content + " to float");
- }
- default:
- throw new AvroConversionException("Cannot convert to type "
- + nonNillOutput.getType());
- }
- }
- }
-
- /**
- * If s is a union schema of some type with null, returns that type.
- * Otherwise just return schema itself.
- *
- * Does not handle unions of schemas with anything except null and one type.
- *
- * @param s
- * Schema to remove nillable from.
- * @return The Schema of the non-null part of a the union, if the input was
- * a union type. Otherwise returns the input schema.
- */
- protected static Schema getNonNullSchema(Schema s) {
- // Handle the case where s is a union type. Assert that this must be a
- // union that only includes one non-null type.
- if (s.getType() == Schema.Type.UNION) {
- List<Schema> types = s.getTypes();
- boolean foundOne = false;
- Schema result = s;
- for (Schema type : types) {
- if (!type.getType().equals(Schema.Type.NULL)) {
- Preconditions.checkArgument(foundOne == false,
- "Cannot handle union of two non-null types");
- foundOne = true;
- result = type;
- }
- }
- return result;
- } else {
- return s;
- }
- }
-
- protected static boolean supportsNull(Schema s) {
- if (s.getType() == Schema.Type.NULL) {
- return true;
- } else if (s.getType() == Schema.Type.UNION) {
- for (Schema type : s.getTypes()) {
- if (type.getType() == Schema.Type.NULL) {
- return true;
- }
- }
- }
- return false;
- }
-
- /**
- * Exception thrown when Avro conversion fails.
- */
- public class AvroConversionException extends Exception {
- public AvroConversionException(String string, IOException e) {
- super(string, e);
- }
-
- public AvroConversionException(String string) {
- super(string);
- }
- }
-}
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroUtil.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroUtil.java
deleted file mode 100644
index 53075c7..0000000
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroUtil.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nifi.processors.kite;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.DatumWriter;
-
-class AvroUtil {
-
- @SuppressWarnings("unchecked")
- public static <D> DatumWriter<D> newDatumWriter(Schema schema, Class<D> dClass) {
- return (DatumWriter<D>) GenericData.get().createDatumWriter(schema);
- }
-
- @SuppressWarnings("unchecked")
- public static <D> DatumReader<D> newDatumReader(Schema schema, Class<D> dClass) {
- return (DatumReader<D>) GenericData.get().createDatumReader(schema);
- }
-
-}
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java
deleted file mode 100644
index d5d016f..0000000
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java
+++ /dev/null
@@ -1,396 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nifi.processors.kite;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileStream;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericData.Record;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.commons.lang.LocaleUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.nifi.annotation.behavior.DynamicProperty;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.Validator;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.StreamCallback;
-import org.apache.nifi.processors.kite.AvroRecordConverter.AvroConversionException;
-import org.kitesdk.data.DatasetException;
-import org.kitesdk.data.DatasetIOException;
-import org.kitesdk.data.SchemaNotFoundException;
-import org.kitesdk.data.spi.DefaultConfiguration;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.regex.Pattern;
-
-@Tags({ "avro", "convert", "kite" })
-@CapabilityDescription("Convert records from one Avro schema to another, including support for flattening and simple type conversions")
-@InputRequirement(Requirement.INPUT_REQUIRED)
-@DynamicProperty(name = "Field name from input schema",
-value = "Field name for output schema",
-description = "Explicit mappings from input schema to output schema, which supports renaming fields and stepping into nested records on the input schema using notation like parent.id")
-public class ConvertAvroSchema extends AbstractKiteConvertProcessor {
-
- private static final Relationship SUCCESS = new Relationship.Builder()
- .name("success")
- .description("Avro content that converted successfully").build();
-
- private static final Relationship FAILURE = new Relationship.Builder()
- .name("failure").description("Avro content that failed to convert")
- .build();
-
- /**
- * Makes sure the output schema is a valid output schema and that all its
- * fields can be mapped either automatically or are explicitly mapped.
- */
- protected static final Validator MAPPED_SCHEMA_VALIDATOR = new Validator() {
- @Override
- public ValidationResult validate(String subject, String uri,
- ValidationContext context) {
- Configuration conf = getConfiguration(context.getProperty(
- CONF_XML_FILES).getValue());
- String inputUri = context.getProperty(INPUT_SCHEMA).getValue();
- String error = null;
-
- final boolean elPresent = context
- .isExpressionLanguageSupported(subject)
- && context.isExpressionLanguagePresent(uri);
- if (!elPresent) {
- try {
- Schema outputSchema = getSchema(uri, conf);
- Schema inputSchema = getSchema(inputUri, conf);
- // Get the explicitly mapped fields. This is identical to
- // logic in onTrigger, but ValidationContext and
- // ProcessContext share no ancestor, so we cannot generalize
- // the code.
- Map<String, String> fieldMapping = new HashMap<>();
- for (final Map.Entry<PropertyDescriptor, String> entry : context
- .getProperties().entrySet()) {
- if (entry.getKey().isDynamic()) {
- fieldMapping.put(entry.getKey().getName(),
- entry.getValue());
- }
- }
- AvroRecordConverter converter = new AvroRecordConverter(
- inputSchema, outputSchema, fieldMapping);
- Collection<String> unmappedFields = converter
- .getUnmappedFields();
- if (unmappedFields.size() > 0) {
- error = "The following fields are unmapped: "
- + unmappedFields;
- }
-
- } catch (SchemaNotFoundException e) {
- error = e.getMessage();
- }
- }
- return new ValidationResult.Builder().subject(subject).input(uri)
- .explanation(error).valid(error == null).build();
- }
- };
-
- public static final String DEFAULT_LOCALE_VALUE = "default";
- public static final Validator LOCALE_VALIDATOR = new Validator() {
- @Override
- public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
- String reason = null;
- if (!value.equals(DEFAULT_LOCALE_VALUE)) {
- try {
- final Locale locale = LocaleUtils.toLocale(value);
- if (locale == null) {
- reason = "null locale returned";
- } else if (!LocaleUtils.isAvailableLocale(locale)) {
- reason = "locale not available";
- }
- } catch (final IllegalArgumentException e) {
- reason = "invalid format for locale";
- }
- }
- return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build();
- }
- };
-
- @VisibleForTesting
- static final PropertyDescriptor INPUT_SCHEMA = new PropertyDescriptor.Builder()
- .name("Input Schema")
- .description("Avro Schema of Input Flowfiles. This can be a URI (dataset, view, or resource) or literal JSON schema.")
- .addValidator(SCHEMA_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .required(true)
- .build();
-
- @VisibleForTesting
- static final PropertyDescriptor OUTPUT_SCHEMA = new PropertyDescriptor.Builder()
- .name("Output Schema")
- .description("Avro Schema of Output Flowfiles. This can be a URI (dataset, view, or resource) or literal JSON schema.")
- .addValidator(MAPPED_SCHEMA_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .required(true).build();
-
- @VisibleForTesting
- static final PropertyDescriptor LOCALE = new PropertyDescriptor.Builder()
- .name("Locale")
- .description("Locale to use for scanning data (see https://docs.oracle.com/javase/7/docs/api/java/util/Locale.html)" +
- "or \" " + DEFAULT_LOCALE_VALUE + "\" for JVM default")
- .addValidator(LOCALE_VALIDATOR)
- .defaultValue(DEFAULT_LOCALE_VALUE).build();
-
- private static final List<PropertyDescriptor> PROPERTIES = ImmutableList
- .<PropertyDescriptor> builder()
- .add(INPUT_SCHEMA)
- .add(OUTPUT_SCHEMA)
- .add(LOCALE)
- .add(COMPRESSION_TYPE)
- .add(CONF_XML_FILES)
- .build();
-
- private static final Set<Relationship> RELATIONSHIPS = ImmutableSet
- .<Relationship> builder().add(SUCCESS).add(FAILURE).build();
-
- private static final Pattern AVRO_FIELDNAME_PATTERN = Pattern
- .compile("[A-Za-z_][A-Za-z0-9_\\.]*");
-
- /**
- * Validates that the input and output fields (from dynamic properties) are
- * all valid avro field names including "." to step into records.
- */
- protected static final Validator AVRO_FIELDNAME_VALIDATOR = new Validator() {
- @Override
- public ValidationResult validate(final String subject,
- final String value, final ValidationContext context) {
- if (context.isExpressionLanguageSupported(subject)
- && context.isExpressionLanguagePresent(value)) {
- return new ValidationResult.Builder().subject(subject)
- .input(value)
- .explanation("Expression Language Present").valid(true)
- .build();
- }
-
- String reason = "";
- if (!AVRO_FIELDNAME_PATTERN.matcher(subject).matches()) {
- reason = subject + " is not a valid Avro fieldname";
- }
- if (!AVRO_FIELDNAME_PATTERN.matcher(value).matches()) {
- reason = reason + value + " is not a valid Avro fieldname";
- }
-
- return new ValidationResult.Builder().subject(subject).input(value)
- .explanation(reason).valid(reason.equals("")).build();
- }
- };
-
- @Override
- protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(
- final String propertyDescriptorName) {
- return new PropertyDescriptor.Builder()
- .name(propertyDescriptorName)
- .description(
- "Field mapping between schemas. The property name is the field name for the input "
- + "schema, and the property value is the field name for the output schema. For fields "
- + "not listed, the processor tries to match names from the input to the output record.")
- .dynamic(true).addValidator(AVRO_FIELDNAME_VALIDATOR).build();
- }
-
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return PROPERTIES;
- }
-
- @Override
- public Set<Relationship> getRelationships() {
- return RELATIONSHIPS;
- }
-
- @Override
- public void onTrigger(ProcessContext context, final ProcessSession session)
- throws ProcessException {
- FlowFile incomingAvro = session.get();
- if (incomingAvro == null) {
- return;
- }
-
- String inputSchemaProperty = context.getProperty(INPUT_SCHEMA)
- .evaluateAttributeExpressions(incomingAvro).getValue();
- final Schema inputSchema;
- try {
- inputSchema = getSchema(inputSchemaProperty,
- DefaultConfiguration.get());
- } catch (SchemaNotFoundException e) {
- getLogger().error("Cannot find schema: " + inputSchemaProperty);
- session.transfer(incomingAvro, FAILURE);
- return;
- }
- String outputSchemaProperty = context.getProperty(OUTPUT_SCHEMA)
- .evaluateAttributeExpressions(incomingAvro).getValue();
- final Schema outputSchema;
- try {
- outputSchema = getSchema(outputSchemaProperty,
- DefaultConfiguration.get());
- } catch (SchemaNotFoundException e) {
- getLogger().error("Cannot find schema: " + outputSchemaProperty);
- session.transfer(incomingAvro, FAILURE);
- return;
- }
- final Map<String, String> fieldMapping = new HashMap<>();
- for (final Map.Entry<PropertyDescriptor, String> entry : context
- .getProperties().entrySet()) {
- if (entry.getKey().isDynamic()) {
- fieldMapping.put(entry.getKey().getName(), entry.getValue());
- }
- }
- // Set locale
- final String localeProperty = context.getProperty(LOCALE).getValue();
- final Locale locale = localeProperty.equals(DEFAULT_LOCALE_VALUE) ? Locale.getDefault() : LocaleUtils.toLocale(localeProperty);
- final AvroRecordConverter converter = new AvroRecordConverter(
- inputSchema, outputSchema, fieldMapping, locale);
-
- final DataFileWriter<Record> writer = new DataFileWriter<>(
- AvroUtil.newDatumWriter(outputSchema, Record.class));
- writer.setCodec(getCodecFactory(context.getProperty(COMPRESSION_TYPE).getValue()));
-
- final DataFileWriter<Record> failureWriter = new DataFileWriter<>(
- AvroUtil.newDatumWriter(outputSchema, Record.class));
- failureWriter.setCodec(getCodecFactory(context.getProperty(COMPRESSION_TYPE).getValue()));
-
- try {
- final AtomicLong written = new AtomicLong(0L);
- final FailureTracker failures = new FailureTracker();
-
- final List<Record> badRecords = Lists.newLinkedList();
- FlowFile incomingAvroCopy = session.clone(incomingAvro);
- FlowFile outgoingAvro = session.write(incomingAvro,
- new StreamCallback() {
- @Override
- public void process(InputStream in, OutputStream out)
- throws IOException {
- try (DataFileStream<Record> stream = new DataFileStream<Record>(
- in, new GenericDatumReader<Record>(
- converter.getInputSchema()))) {
- try (DataFileWriter<Record> w = writer.create(
- outputSchema, out)) {
- for (Record record : stream) {
- try {
- Record converted = converter
- .convert(record);
- w.append(converted);
- written.incrementAndGet();
- } catch (AvroConversionException e) {
- failures.add(e);
- getLogger().error(
- "Error converting data: "
- + e.getMessage());
- badRecords.add(record);
- }
- }
- }
- }
- }
- });
-
- FlowFile badOutput = session.write(incomingAvroCopy,
- new StreamCallback() {
- @Override
- public void process(InputStream in, OutputStream out)
- throws IOException {
-
- try (DataFileWriter<Record> w = failureWriter
- .create(inputSchema, out)) {
- for (Record record : badRecords) {
- w.append(record);
- }
- }
-
- }
- });
-
- long errors = failures.count();
-
- // update only if file transfer is successful
- session.adjustCounter("Converted records", written.get(), false);
- // update only if file transfer is successful
- session.adjustCounter("Conversion errors", errors, false);
-
- if (written.get() > 0L) {
- outgoingAvro = session.putAttribute(outgoingAvro, CoreAttributes.MIME_TYPE.key(), InferAvroSchema.AVRO_MIME_TYPE);
- session.transfer(outgoingAvro, SUCCESS);
- } else {
- session.remove(outgoingAvro);
-
- if (errors == 0L) {
- badOutput = session.putAttribute(badOutput, "errors",
- "No incoming records");
- session.transfer(badOutput, FAILURE);
- }
- }
-
- if (errors > 0L) {
- getLogger().warn(
- "Failed to convert {}/{} records between Avro Schemas",
- new Object[] { errors, errors + written.get() });
- badOutput = session.putAttribute(badOutput, "errors",
- failures.summary());
- session.transfer(badOutput, FAILURE);
- } else {
- session.remove(badOutput);
- }
- } catch (ProcessException | DatasetIOException e) {
- getLogger().error("Failed reading or writing", e);
- session.transfer(incomingAvro, FAILURE);
- } catch (DatasetException e) {
- getLogger().error("Failed to read FlowFile", e);
- session.transfer(incomingAvro, FAILURE);
- } finally {
- try {
- writer.close();
- } catch (IOException e) {
- getLogger().warn("Unable to close writer ressource", e);
- }
- try {
- failureWriter.close();
- } catch (IOException e) {
- getLogger().warn("Unable to close writer ressource", e);
- }
- }
- }
-}
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
deleted file mode 100644
index bacef3b..0000000
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
+++ /dev/null
@@ -1,311 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nifi.processors.kite;
-
-import static org.apache.nifi.processor.util.StandardValidators.createLongValidator;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericData.Record;
-import org.apache.commons.text.StringEscapeUtils;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.Validator;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.StreamCallback;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.kitesdk.data.DatasetException;
-import org.kitesdk.data.DatasetIOException;
-import org.kitesdk.data.DatasetRecordException;
-import org.kitesdk.data.SchemaNotFoundException;
-import org.kitesdk.data.spi.DefaultConfiguration;
-import org.kitesdk.data.spi.filesystem.CSVFileReader;
-import org.kitesdk.data.spi.filesystem.CSVProperties;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import java.util.concurrent.atomic.AtomicLong;
-
-@Tags({"kite", "csv", "avro"})
-@InputRequirement(Requirement.INPUT_REQUIRED)
-@CapabilityDescription("Converts CSV files to Avro according to an Avro Schema")
-public class ConvertCSVToAvro extends AbstractKiteConvertProcessor {
-
- private static final CSVProperties DEFAULTS = new CSVProperties.Builder().build();
-
- private static final Validator CHAR_VALIDATOR = new Validator() {
- @Override
- public ValidationResult validate(String subject, String input, ValidationContext context) {
- // Allows special, escaped characters as input, which is then unescaped and converted to a single character.
- // Examples for special characters: \t (or \u0009), \f.
- input = unescapeString(input);
-
- return new ValidationResult.Builder()
- .subject(subject)
- .input(input)
- .explanation("Only non-null single characters are supported")
- .valid((input.length() == 1 && input.charAt(0) != 0) || context.isExpressionLanguagePresent(input))
- .build();
- }
- };
-
- private static final Relationship SUCCESS = new Relationship.Builder()
- .name("success")
- .description("Avro content that was converted successfully from CSV")
- .build();
-
- private static final Relationship FAILURE = new Relationship.Builder()
- .name("failure")
- .description("CSV content that could not be processed")
- .build();
-
- private static final Relationship INCOMPATIBLE = new Relationship.Builder()
- .name("incompatible")
- .description("CSV content that could not be converted")
- .build();
-
- @VisibleForTesting
- static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder()
- .name("Record schema")
- .description("Outgoing Avro schema for each record created from a CSV row")
- .addValidator(SCHEMA_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .required(true)
- .build();
-
- @VisibleForTesting
- static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
- .name("CSV charset")
- .description("Character set for CSV files")
- .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .defaultValue(DEFAULTS.charset)
- .build();
-
- @VisibleForTesting
- static final PropertyDescriptor DELIMITER = new PropertyDescriptor.Builder()
- .name("CSV delimiter")
- .description("Delimiter character for CSV records")
- .addValidator(CHAR_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .defaultValue(DEFAULTS.delimiter)
- .build();
-
- @VisibleForTesting
- static final PropertyDescriptor QUOTE = new PropertyDescriptor.Builder()
- .name("CSV quote character")
- .description("Quote character for CSV values")
- .addValidator(CHAR_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .defaultValue(DEFAULTS.quote)
- .build();
-
- @VisibleForTesting
- static final PropertyDescriptor ESCAPE = new PropertyDescriptor.Builder()
- .name("CSV escape character")
- .description("Escape character for CSV values")
- .addValidator(CHAR_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .defaultValue(DEFAULTS.escape)
- .build();
-
- @VisibleForTesting
- static final PropertyDescriptor HAS_HEADER = new PropertyDescriptor.Builder()
- .name("Use CSV header line")
- .description("Whether to use the first line as a header")
- .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .defaultValue(String.valueOf(DEFAULTS.useHeader))
- .build();
-
- @VisibleForTesting
- static final PropertyDescriptor LINES_TO_SKIP = new PropertyDescriptor.Builder()
- .name("Lines to skip")
- .description("Number of lines to skip before reading header or data")
- .addValidator(createLongValidator(0L, Integer.MAX_VALUE, true))
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .defaultValue(String.valueOf(DEFAULTS.linesToSkip))
- .build();
-
- private static final List<PropertyDescriptor> PROPERTIES = ImmutableList.<PropertyDescriptor> builder()
- .addAll(AbstractKiteProcessor.getProperties())
- .add(SCHEMA)
- .add(CHARSET)
- .add(DELIMITER)
- .add(QUOTE)
- .add(ESCAPE)
- .add(HAS_HEADER)
- .add(LINES_TO_SKIP)
- .add(COMPRESSION_TYPE)
- .build();
-
- private static final Set<Relationship> RELATIONSHIPS = ImmutableSet.<Relationship> builder()
- .add(SUCCESS)
- .add(FAILURE)
- .add(INCOMPATIBLE)
- .build();
-
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return PROPERTIES;
- }
-
- @Override
- public Set<Relationship> getRelationships() {
- return RELATIONSHIPS;
- }
-
- @OnScheduled
- public void createCSVProperties(ProcessContext context) throws IOException {
- super.setDefaultConfiguration(context);
- }
-
- @Override
- public void onTrigger(ProcessContext context, final ProcessSession session)
- throws ProcessException {
- FlowFile incomingCSV = session.get();
- if (incomingCSV == null) {
- return;
- }
-
- CSVProperties props = new CSVProperties.Builder()
- .charset(context.getProperty(CHARSET).evaluateAttributeExpressions(incomingCSV).getValue())
- .delimiter(context.getProperty(DELIMITER).evaluateAttributeExpressions(incomingCSV).getValue())
- .quote(context.getProperty(QUOTE).evaluateAttributeExpressions(incomingCSV).getValue())
- .escape(context.getProperty(ESCAPE).evaluateAttributeExpressions(incomingCSV).getValue())
- .hasHeader(context.getProperty(HAS_HEADER).evaluateAttributeExpressions(incomingCSV).asBoolean())
- .linesToSkip(context.getProperty(LINES_TO_SKIP).evaluateAttributeExpressions(incomingCSV).asInteger())
- .build();
-
- String schemaProperty = context.getProperty(SCHEMA)
- .evaluateAttributeExpressions(incomingCSV)
- .getValue();
- final Schema schema;
- try {
- schema = getSchema(schemaProperty, DefaultConfiguration.get());
- } catch (SchemaNotFoundException e) {
- getLogger().error("Cannot find schema: " + schemaProperty);
- session.transfer(incomingCSV, FAILURE);
- return;
- }
-
- try (final DataFileWriter<Record> writer = new DataFileWriter<>(AvroUtil.newDatumWriter(schema, Record.class))) {
- writer.setCodec(getCodecFactory(context.getProperty(COMPRESSION_TYPE).getValue()));
-
- try {
- final AtomicLong written = new AtomicLong(0L);
- final FailureTracker failures = new FailureTracker();
-
- FlowFile badRecords = session.clone(incomingCSV);
- FlowFile outgoingAvro = session.write(incomingCSV, new StreamCallback() {
- @Override
- public void process(InputStream in, OutputStream out) throws IOException {
- try (CSVFileReader<Record> reader = new CSVFileReader<>(
- in, props, schema, Record.class)) {
- reader.initialize();
- try (DataFileWriter<Record> w = writer.create(schema, out)) {
- while (reader.hasNext()) {
- try {
- Record record = reader.next();
- w.append(record);
- written.incrementAndGet();
- } catch (DatasetRecordException e) {
- failures.add(e);
- }
- }
- }
- }
- }
- });
-
- long errors = failures.count();
-
- session.adjustCounter("Converted records", written.get(),
- false /* update only if file transfer is successful */);
- session.adjustCounter("Conversion errors", errors,
- false /* update only if file transfer is successful */);
-
- if (written.get() > 0L) {
- outgoingAvro = session.putAttribute(outgoingAvro, CoreAttributes.MIME_TYPE.key(), InferAvroSchema.AVRO_MIME_TYPE);
- session.transfer(outgoingAvro, SUCCESS);
-
- if (errors > 0L) {
- getLogger().warn("Failed to convert {}/{} records from CSV to Avro",
- new Object[] {errors, errors + written.get()});
- badRecords = session.putAttribute(
- badRecords, "errors", failures.summary());
- session.transfer(badRecords, INCOMPATIBLE);
- } else {
- session.remove(badRecords);
- }
-
- } else {
- session.remove(outgoingAvro);
-
- if (errors > 0L) {
- getLogger().warn("Failed to convert {}/{} records from CSV to Avro",
- new Object[] {errors, errors});
- badRecords = session.putAttribute(
- badRecords, "errors", failures.summary());
- } else {
- badRecords = session.putAttribute(
- badRecords, "errors", "No incoming records");
- }
-
- session.transfer(badRecords, FAILURE);
- }
-
- } catch (ProcessException | DatasetIOException e) {
- getLogger().error("Failed reading or writing", e);
- session.transfer(incomingCSV, FAILURE);
- } catch (DatasetException e) {
- getLogger().error("Failed to read FlowFile", e);
- session.transfer(incomingCSV, FAILURE);
- }
- } catch (final IOException ioe) {
- throw new RuntimeException("Unable to close Avro Writer", ioe);
- }
- }
-
- private static String unescapeString(String input) {
- if (input.length() > 1) {
- input = StringEscapeUtils.unescapeJava(input);
- }
- return input;
- }
-}
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java
deleted file mode 100644
index c8b8d51..0000000
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nifi.processors.kite;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericData.Record;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.StreamCallback;
-import org.kitesdk.data.DatasetException;
-import org.kitesdk.data.DatasetIOException;
-import org.kitesdk.data.DatasetRecordException;
-import org.kitesdk.data.SchemaNotFoundException;
-import org.kitesdk.data.spi.DefaultConfiguration;
-import org.kitesdk.data.spi.filesystem.JSONFileReader;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-
-@Tags({"kite", "json", "avro"})
-@InputRequirement(Requirement.INPUT_REQUIRED)
-@CapabilityDescription("Converts JSON files to Avro according to an Avro Schema")
-public class ConvertJSONToAvro extends AbstractKiteConvertProcessor {
-
- private static final Relationship SUCCESS = new Relationship.Builder()
- .name("success")
- .description("Avro content that was converted successfully from JSON")
- .build();
-
- private static final Relationship FAILURE = new Relationship.Builder()
- .name("failure")
- .description("JSON content that could not be processed")
- .build();
-
- private static final Relationship INCOMPATIBLE = new Relationship.Builder()
- .name("incompatible")
- .description("JSON content that could not be converted")
- .build();
-
- @VisibleForTesting
- static final PropertyDescriptor SCHEMA
- = new PropertyDescriptor.Builder()
- .name("Record schema")
- .description("Outgoing Avro schema for each record created from a JSON object")
- .addValidator(SCHEMA_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .required(true)
- .build();
-
- private static final List<PropertyDescriptor> PROPERTIES
- = ImmutableList.<PropertyDescriptor>builder()
- .addAll(AbstractKiteProcessor.getProperties())
- .add(SCHEMA)
- .add(COMPRESSION_TYPE)
- .build();
-
- private static final Set<Relationship> RELATIONSHIPS
- = ImmutableSet.<Relationship>builder()
- .add(SUCCESS)
- .add(FAILURE)
- .add(INCOMPATIBLE)
- .build();
-
- public ConvertJSONToAvro() {
- }
-
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return PROPERTIES;
- }
-
- @Override
- public Set<Relationship> getRelationships() {
- return RELATIONSHIPS;
- }
-
- @Override
- public void onTrigger(final ProcessContext context, final ProcessSession session)
- throws ProcessException {
- FlowFile incomingJSON = session.get();
- if (incomingJSON == null) {
- return;
- }
-
- String schemaProperty = context.getProperty(SCHEMA)
- .evaluateAttributeExpressions(incomingJSON)
- .getValue();
- final Schema schema;
- try {
- schema = getSchema(schemaProperty, DefaultConfiguration.get());
- } catch (SchemaNotFoundException e) {
- getLogger().error("Cannot find schema: " + schemaProperty);
- session.transfer(incomingJSON, FAILURE);
- return;
- }
-
- final DataFileWriter<Record> writer = new DataFileWriter<>(
- AvroUtil.newDatumWriter(schema, Record.class));
- writer.setCodec(getCodecFactory(context.getProperty(COMPRESSION_TYPE).getValue()));
-
- try {
- final AtomicLong written = new AtomicLong(0L);
- final FailureTracker failures = new FailureTracker();
-
- FlowFile badRecords = session.clone(incomingJSON);
- FlowFile outgoingAvro = session.write(incomingJSON, new StreamCallback() {
- @Override
- public void process(InputStream in, OutputStream out) throws IOException {
- try (JSONFileReader<Record> reader = new JSONFileReader<>(
- in, schema, Record.class)) {
- reader.initialize();
- try (DataFileWriter<Record> w = writer.create(schema, out)) {
- while (reader.hasNext()) {
- try {
- Record record = reader.next();
- w.append(record);
- written.incrementAndGet();
-
- } catch (final DatasetRecordException e) {
- failures.add(e);
- }
- }
- }
- }
- }
- });
-
- long errors = failures.count();
-
- session.adjustCounter("Converted records", written.get(),
- false /* update only if file transfer is successful */);
- session.adjustCounter("Conversion errors", errors,
- false /* update only if file transfer is successful */);
-
- if (written.get() > 0L) {
- outgoingAvro = session.putAttribute(outgoingAvro, CoreAttributes.MIME_TYPE.key(), InferAvroSchema.AVRO_MIME_TYPE);
- session.transfer(outgoingAvro, SUCCESS);
-
- if (errors > 0L) {
- getLogger().warn("Failed to convert {}/{} records from JSON to Avro",
- new Object[] { errors, errors + written.get() });
- badRecords = session.putAttribute(
- badRecords, "errors", failures.summary());
- session.transfer(badRecords, INCOMPATIBLE);
- } else {
- session.remove(badRecords);
- }
-
- } else {
- session.remove(outgoingAvro);
-
- if (errors > 0L) {
- getLogger().warn("Failed to convert {}/{} records from JSON to Avro",
- new Object[] { errors, errors });
- badRecords = session.putAttribute(
- badRecords, "errors", failures.summary());
- } else {
- badRecords = session.putAttribute(
- badRecords, "errors", "No incoming records");
- }
-
- session.transfer(badRecords, FAILURE);
- }
-
- } catch (ProcessException | DatasetIOException e) {
- getLogger().error("Failed reading or writing", e);
- session.transfer(incomingJSON, FAILURE);
- } catch (DatasetException e) {
- getLogger().error("Failed to read FlowFile", e);
- session.transfer(incomingJSON, FAILURE);
- } finally {
- try {
- writer.close();
- } catch (IOException e) {
- getLogger().warn("Unable to close writer ressource", e);
- }
- }
- }
-
-}
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/FailureTracker.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/FailureTracker.java
deleted file mode 100644
index de86761..0000000
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/FailureTracker.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.nifi.processors.kite;
-
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Maps;
-import java.util.Map;
-
-class FailureTracker {
- private static final Splitter REASON_SEPARATOR = Splitter.on(':').limit(2);
-
- private final Map<String, String> examples = Maps.newLinkedHashMap();
- private final Map<String, Integer> occurrences = Maps.newLinkedHashMap();
- long count = 0L;
-
- public void add(Throwable throwable) {
- add(reason(throwable));
- }
-
- public void add(String reason) {
- count += 1;
- String problem = Iterators.getNext(REASON_SEPARATOR.split(reason).iterator(), "Unknown");
- if (examples.containsKey(problem)) {
- occurrences.put(problem, occurrences.get(problem) + 1);
- } else {
- examples.put(problem, reason);
- occurrences.put(problem, 1);
- }
- }
-
- public long count() {
- return count;
- }
-
- public String summary() {
- boolean first = true;
- StringBuilder sb = new StringBuilder();
- for (String problem : examples.keySet()) {
- if (first) {
- first = false;
- } else {
- sb.append(", ");
- }
- sb.append(examples.get(problem));
- int similar = occurrences.get(problem) - 1;
- if (similar == 1) {
- sb.append(" (1 similar failure)");
- } else if (similar > 1) {
- sb.append(" (").append(similar).append(" similar failures)");
- }
- }
- return sb.toString();
- }
-
- private static String reason(Throwable t) {
- StringBuilder sb = new StringBuilder();
- for (Throwable current = t; current != null; current = current.getCause()) {
- if (current != t) {
- sb.append(": ");
- }
- sb.append(current.getMessage());
- }
- return sb.toString();
- }
-}
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchema.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchema.java
deleted file mode 100644
index 69545dd..0000000
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchema.java
+++ /dev/null
@@ -1,499 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nifi.processors.kite;
-
-import org.apache.avro.Schema;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.text.StringEscapeUtils;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.ReadsAttribute;
-import org.apache.nifi.annotation.behavior.ReadsAttributes;
-import org.apache.nifi.annotation.behavior.WritesAttributes;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.Validator;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessorInitializationContext;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.processor.io.OutputStreamCallback;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.kitesdk.data.spi.JsonUtil;
-import org.kitesdk.data.spi.filesystem.CSVProperties;
-import org.kitesdk.data.spi.filesystem.CSVUtil;
-
-import java.io.InputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.BufferedReader;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Set;
-import java.util.HashSet;
-import java.util.Collections;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.regex.Pattern;
-
-@Tags({"kite", "avro", "infer", "schema", "csv", "json"})
-@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
-@CapabilityDescription("Examines the contents of the incoming FlowFile to infer an Avro schema. The processor will" +
- " use the Kite SDK to make an attempt to automatically generate an Avro schema from the incoming content." +
- " When inferring the schema from JSON data the key names will be used in the resulting Avro schema" +
- " definition. When inferring from CSV data a \"header definition\" must be present either as the first line of the incoming data" +
- " or the \"header definition\" must be explicitly set in the property \"CSV Header Definition\". A \"header definition\"" +
- " is simply a single comma separated line defining the names of each column. The \"header definition\" is" +
- " required in order to determine the names that should be given to each field in the resulting Avro definition." +
- " When inferring data types the higher order data type is always used if there is ambiguity." +
- " For example when examining numerical values the type may be set to \"long\" instead of \"integer\" since a long can" +
- " safely hold the value of any \"integer\". Only CSV and JSON content is currently supported for automatically inferring an" +
- " Avro schema. The type of content present in the incoming FlowFile is set by using the property \"Input Content Type\"." +
- " The property can either be explicitly set to CSV, JSON, or \"use mime.type value\" which will examine the" +
- " value of the mime.type attribute on the incoming FlowFile to determine the type of content present.")
-@ReadsAttributes({
- @ReadsAttribute(attribute = "mime.type", description = "If configured by property \"Input Content Type\" will" +
- " use this value to determine what sort of content should be inferred from the incoming FlowFile content."),
-})
-@WritesAttributes({
- @WritesAttribute(attribute = "inferred.avro.schema", description = "If configured by \"Schema output destination\" to" +
- " write to an attribute this will hold the resulting Avro schema from inferring the incoming FlowFile content."),
-})
-public class InferAvroSchema
- extends AbstractKiteProcessor {
-
- private static final Validator CHAR_VALIDATOR = new Validator() {
- @Override
- public ValidationResult validate(String subject, String input, ValidationContext context) {
- // Allows special, escaped characters as input, which is then unescaped and converted to a single character.
- // Examples for special characters: \t (or \u0009), \f.
- input = unescapeString(input);
-
- return new ValidationResult.Builder()
- .subject(subject)
- .input(input)
- .explanation("Only non-null single characters are supported")
- .valid(input.length() == 1 && input.charAt(0) != 0 || context.isExpressionLanguagePresent(input))
- .build();
- }
- };
-
- public static final String USE_MIME_TYPE = "use mime.type value";
- public static final String JSON_CONTENT = "json";
- public static final String CSV_CONTENT = "csv";
-
- public static final String AVRO_SCHEMA_ATTRIBUTE_NAME = "inferred.avro.schema";
- public static final String DESTINATION_ATTRIBUTE = "flowfile-attribute";
- public static final String DESTINATION_CONTENT = "flowfile-content";
- public static final String JSON_MIME_TYPE = "application/json";
- public static final String CSV_MIME_TYPE = "text/csv";
- public static final String AVRO_MIME_TYPE = "application/avro-binary";
- public static final String AVRO_FILE_EXTENSION = ".avro";
- public static final Pattern AVRO_RECORD_NAME_PATTERN = Pattern.compile("[A-Za-z_]+[A-Za-z0-9_.]*[^.]");
-
- public static final PropertyDescriptor SCHEMA_DESTINATION = new PropertyDescriptor.Builder()
- .name("Schema Output Destination")
- .description("Control if Avro schema is written as a new flowfile attribute '" + AVRO_SCHEMA_ATTRIBUTE_NAME + "' " +
- "or written in the flowfile content. Writing to flowfile content will overwrite any " +
- "existing flowfile content.")
- .required(true)
- .allowableValues(DESTINATION_ATTRIBUTE, DESTINATION_CONTENT)
- .defaultValue(DESTINATION_CONTENT)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor INPUT_CONTENT_TYPE = new PropertyDescriptor.Builder()
- .name("Input Content Type")
- .description("Content Type of data present in the incoming FlowFile's content. Only \"" +
- JSON_CONTENT + "\" or \"" + CSV_CONTENT + "\" are supported." +
- " If this value is set to \"" + USE_MIME_TYPE + "\" the incoming Flowfile's attribute \"" + CoreAttributes.MIME_TYPE + "\"" +
- " will be used to determine the Content Type.")
- .allowableValues(USE_MIME_TYPE, JSON_CONTENT, CSV_CONTENT)
- .defaultValue(USE_MIME_TYPE)
- .required(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor GET_CSV_HEADER_DEFINITION_FROM_INPUT = new PropertyDescriptor.Builder()
- .name("Get CSV Header Definition From Data")
- .description("This property only applies to CSV content type. If \"true\" the processor will attempt to read the CSV header definition from the" +
- " first line of the input data.")
- .required(true)
- .allowableValues("true", "false")
- .defaultValue("true")
- .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor CSV_HEADER_DEFINITION = new PropertyDescriptor.Builder()
- .name("CSV Header Definition")
- .description("This property only applies to CSV content type. Comma separated string defining the column names expected in the CSV data." +
- " EX: \"fname,lname,zip,address\". The elements present in this string should be in the same order" +
- " as the underlying data. Setting this property will cause the value of" +
- " \"" + GET_CSV_HEADER_DEFINITION_FROM_INPUT.getName() + "\" to be ignored instead using this value.")
- .required(false)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .defaultValue(null)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
-
- public static final PropertyDescriptor HEADER_LINE_SKIP_COUNT = new PropertyDescriptor.Builder()
- .name("CSV Header Line Skip Count")
- .description("This property only applies to CSV content type. Specifies the number of lines that should be skipped when reading the CSV data." +
- " Setting this value to 0 is equivalent to saying \"the entire contents of the file should be read\". If the" +
- " property \"" + GET_CSV_HEADER_DEFINITION_FROM_INPUT.getName() + "\" is set then the first line of the CSV " +
- " file will be read in and treated as the CSV header definition. Since this will remove the header line from the data" +
- " care should be taken to make sure the value of \"CSV header Line Skip Count\" is set to 0 to ensure" +
- " no data is skipped.")
- .required(true)
- .defaultValue("0")
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor DELIMITER = new PropertyDescriptor.Builder()
- .name("CSV delimiter")
- .description("Delimiter character for CSV records")
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(CHAR_VALIDATOR)
- .defaultValue(",")
- .build();
-
- public static final PropertyDescriptor ESCAPE_STRING = new PropertyDescriptor.Builder()
- .name("CSV Escape String")
- .description("This property only applies to CSV content type. String that represents an escape sequence" +
- " in the CSV FlowFile content data.")
- .required(true)
- .defaultValue("\\")
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor QUOTE_STRING = new PropertyDescriptor.Builder()
- .name("CSV Quote String")
- .description("This property only applies to CSV content type. String that represents a literal quote" +
- " character in the CSV FlowFile content data.")
- .required(true)
- .defaultValue("'")
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor RECORD_NAME = new PropertyDescriptor.Builder()
- .name("Avro Record Name")
- .description("Value to be placed in the Avro record schema \"name\" field. The value must adhere to the Avro naming "
- + "rules for fullname. If Expression Language is present then the evaluated value must adhere to the Avro naming rules.")
- .required(true)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(StandardValidators.createRegexMatchingValidator(AVRO_RECORD_NAME_PATTERN))
- .build();
-
- public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
- .name("Charset")
- .description("Character encoding of CSV data.")
- .required(true)
- .defaultValue("UTF-8")
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor PRETTY_AVRO_OUTPUT = new PropertyDescriptor.Builder()
- .name("Pretty Avro Output")
- .description("If true the Avro output will be formatted.")
- .required(true)
- .defaultValue("true")
- .allowableValues("true", "false")
- .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor NUM_RECORDS_TO_ANALYZE = new PropertyDescriptor.Builder()
- .name("Number Of Records To Analyze")
- .description("This property only applies to JSON content type. The number of JSON records that should be" +
- " examined to determine the Avro schema. The higher the value the better chance kite has of detecting" +
- " the appropriate type. However the default value of 10 is almost always enough.")
- .required(true)
- .defaultValue("10")
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
- .build();
-
-
- public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
- .description("Successfully created Avro schema from data.").build();
-
- public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original")
- .description("Original incoming FlowFile data").build();
-
- public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
- .description("Failed to create Avro schema from data.").build();
-
- public static final Relationship REL_UNSUPPORTED_CONTENT = new Relationship.Builder().name("unsupported content")
- .description("The content found in the flowfile content is not of the required format.").build();
-
- private List<PropertyDescriptor> properties;
- private Set<Relationship> relationships;
-
- @Override
- protected void init(final ProcessorInitializationContext context) {
- final List<PropertyDescriptor> properties = new ArrayList<>();
- properties.add(SCHEMA_DESTINATION);
- properties.add(INPUT_CONTENT_TYPE);
- properties.add(CSV_HEADER_DEFINITION);
- properties.add(GET_CSV_HEADER_DEFINITION_FROM_INPUT);
- properties.add(HEADER_LINE_SKIP_COUNT);
- properties.add(DELIMITER);
- properties.add(ESCAPE_STRING);
- properties.add(QUOTE_STRING);
- properties.add(PRETTY_AVRO_OUTPUT);
- properties.add(RECORD_NAME);
- properties.add(NUM_RECORDS_TO_ANALYZE);
- properties.add(CHARSET);
- this.properties = Collections.unmodifiableList(properties);
-
- final Set<Relationship> relationships = new HashSet<>();
- relationships.add(REL_SUCCESS);
- relationships.add(REL_FAILURE);
- relationships.add(REL_ORIGINAL);
- relationships.add(REL_UNSUPPORTED_CONTENT);
- this.relationships = Collections.unmodifiableSet(relationships);
- }
-
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return properties;
- }
-
- @Override
- public Set<Relationship> getRelationships() {
- return relationships;
- }
-
-
- @Override
- public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
- final FlowFile original = session.get();
- if (original == null) {
- return;
- }
-
- try {
-
- final AtomicReference<String> avroSchema = new AtomicReference<>();
- switch (context.getProperty(INPUT_CONTENT_TYPE).getValue()) {
- case USE_MIME_TYPE:
- avroSchema.set(inferAvroSchemaFromMimeType(original, context, session));
- break;
- case JSON_CONTENT:
- avroSchema.set(inferAvroSchemaFromJSON(original, context, session));
- break;
- case CSV_CONTENT:
- avroSchema.set(inferAvroSchemaFromCSV(original, context, session));
- break;
- default:
- //Shouldn't be possible but just in case
- session.transfer(original, REL_UNSUPPORTED_CONTENT);
- break;
- }
-
-
- if (StringUtils.isNotEmpty(avroSchema.get())) {
-
- String destination = context.getProperty(SCHEMA_DESTINATION).getValue();
- FlowFile avroSchemaFF = null;
-
- switch (destination) {
- case DESTINATION_ATTRIBUTE:
- avroSchemaFF = session.putAttribute(session.clone(original), AVRO_SCHEMA_ATTRIBUTE_NAME, avroSchema.get());
- //Leaves the original CoreAttributes.MIME_TYPE in place.
- break;
- case DESTINATION_CONTENT:
- avroSchemaFF = session.write(session.create(), new OutputStreamCallback() {
- @Override
- public void process(OutputStream out) throws IOException {
- out.write(avroSchema.get().getBytes());
- }
- });
- avroSchemaFF = session.putAttribute(avroSchemaFF, CoreAttributes.MIME_TYPE.key(), AVRO_MIME_TYPE);
- break;
- default:
- break;
- }
-
- //Transfer the sessions.
- avroSchemaFF = session.putAttribute(avroSchemaFF, CoreAttributes.FILENAME.key(), (original.getAttribute(CoreAttributes.FILENAME.key()) + AVRO_FILE_EXTENSION));
- session.transfer(avroSchemaFF, REL_SUCCESS);
- session.transfer(original, REL_ORIGINAL);
- } else {
- //If the avroSchema is null then the content type is unknown and therefore unsupported
- session.transfer(original, REL_UNSUPPORTED_CONTENT);
- }
-
- } catch (Exception ex) {
- getLogger().error("Failed to infer Avro schema for {} due to {}", new Object[] {original, ex});
- session.transfer(original, REL_FAILURE);
- }
- }
-
-
- /**
- * Infers the Avro schema from the input Flowfile content. To infer an Avro schema for CSV content a header line is
- * required. You can configure the processor to pull that header line from the first line of the CSV data if it is
- * present OR you can manually supply the desired header line as a property value.
- *
- * @param inputFlowFile
- * The original input FlowFile containing the CSV content as it entered this processor.
- *
- * @param context
- * ProcessContext to pull processor configurations.
- *
- * @param session
- * ProcessSession to transfer FlowFiles
- */
- private String inferAvroSchemaFromCSV(final FlowFile inputFlowFile, final ProcessContext context, final ProcessSession session) {
-
- //Determines the header line either from the property input or the first line of the delimited file.
- final AtomicReference<String> header = new AtomicReference<>();
- final AtomicReference<Boolean> hasHeader = new AtomicReference<>();
-
- if (context.getProperty(GET_CSV_HEADER_DEFINITION_FROM_INPUT).asBoolean() == Boolean.TRUE) {
- //Read the first line of the file to get the header value.
- session.read(inputFlowFile, new InputStreamCallback() {
- @Override
- public void process(InputStream in) throws IOException {
- BufferedReader br = new BufferedReader(new InputStreamReader(in));
- header.set(br.readLine());
- hasHeader.set(Boolean.TRUE);
- br.close();
- }
- });
- hasHeader.set(Boolean.TRUE);
- } else {
- header.set(context.getProperty(CSV_HEADER_DEFINITION).evaluateAttributeExpressions(inputFlowFile).getValue());
- hasHeader.set(Boolean.FALSE);
- }
-
- //Prepares the CSVProperties for kite
- CSVProperties props = new CSVProperties.Builder()
- .charset(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue())
- .delimiter(context.getProperty(DELIMITER).evaluateAttributeExpressions(inputFlowFile).getValue())
- .quote(context.getProperty(QUOTE_STRING).evaluateAttributeExpressions(inputFlowFile).getValue())
- .escape(context.getProperty(ESCAPE_STRING).evaluateAttributeExpressions(inputFlowFile).getValue())
- .linesToSkip(context.getProperty(HEADER_LINE_SKIP_COUNT).evaluateAttributeExpressions(inputFlowFile).asInteger())
- .header(header.get())
- .hasHeader(hasHeader.get())
- .build();
-
- final AtomicReference<String> avroSchema = new AtomicReference<>();
-
- session.read(inputFlowFile, new InputStreamCallback() {
- @Override
- public void process(InputStream in) throws IOException {
- avroSchema.set(CSVUtil
- .inferSchema(
- context.getProperty(RECORD_NAME).evaluateAttributeExpressions(inputFlowFile).getValue(), in, props)
- .toString(context.getProperty(PRETTY_AVRO_OUTPUT).asBoolean()));
- }
- });
-
- return avroSchema.get();
- }
-
- /**
- * Infers the Avro schema from the input Flowfile content.
- *
- * @param inputFlowFile
- * The original input FlowFile containing the JSON content as it entered this processor.
- *
- * @param context
- * ProcessContext to pull processor configurations.
- *
- * @param session
- * ProcessSession to transfer FlowFiles
- */
- private String inferAvroSchemaFromJSON(final FlowFile inputFlowFile, final ProcessContext context, final ProcessSession session) {
-
- final AtomicReference<String> avroSchema = new AtomicReference<>();
- session.read(inputFlowFile, new InputStreamCallback() {
- @Override
- public void process(InputStream in) throws IOException {
- Schema as = JsonUtil.inferSchema(
- in, context.getProperty(RECORD_NAME).evaluateAttributeExpressions(inputFlowFile).getValue(),
- context.getProperty(NUM_RECORDS_TO_ANALYZE).evaluateAttributeExpressions(inputFlowFile).asInteger());
- avroSchema.set(as.toString(context.getProperty(PRETTY_AVRO_OUTPUT).asBoolean()));
-
- }
- });
-
- return avroSchema.get();
- }
-
- /**
- * Examines the incoming FlowFiles mime.type attribute to determine if the schema should be inferred for CSV or JSON data.
- *
- * @param inputFlowFile
- * The original input FlowFile containing the content.
- *
- * @param context
- * ProcessContext to pull processor configurations.
- *
- * @param session
- * ProcessSession to transfer FlowFiles
- */
- private String inferAvroSchemaFromMimeType(final FlowFile inputFlowFile, final ProcessContext context, final ProcessSession session) {
-
- String mimeType = inputFlowFile.getAttribute(CoreAttributes.MIME_TYPE.key());
- String avroSchema = "";
-
- if (mimeType!= null) {
- switch (mimeType) {
- case JSON_MIME_TYPE:
- getLogger().debug("Inferred content type as JSON from \"{}\" value of \"{}\"", new Object[]{CoreAttributes.MIME_TYPE.key(),
- inputFlowFile.getAttribute(CoreAttributes.MIME_TYPE.key())});
- avroSchema = inferAvroSchemaFromJSON(inputFlowFile, context, session);
- break;
- case CSV_MIME_TYPE:
- getLogger().debug("Inferred content type as CSV from \"{}\" value of \"{}\"", new Object[]{CoreAttributes.MIME_TYPE.key(),
- inputFlowFile.getAttribute(CoreAttributes.MIME_TYPE.key())});
- avroSchema = inferAvroSchemaFromCSV(inputFlowFile, context, session);
- break;
- default:
- getLogger().warn("Unable to infer Avro Schema from {} because its mime type is {}, " +
- " which is not supported by this Processor", new Object[] {inputFlowFile, mimeType} );
- break;
- }
- }
-
- return avroSchema;
- }
-
- private static String unescapeString(String input) {
- if (input.length() > 1) {
- input = StringEscapeUtils.unescapeJava(input);
- }
- return input;
- }
-}
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java
deleted file mode 100644
index 4aa359a..0000000
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nifi.processors.kite;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileStream;
-import org.apache.avro.generic.GenericData.Record;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.resource.ResourceCardinality;
-import org.apache.nifi.components.resource.ResourceType;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.util.StopWatch;
-import org.kitesdk.data.DatasetIOException;
-import org.kitesdk.data.DatasetWriter;
-import org.kitesdk.data.Datasets;
-import org.kitesdk.data.IncompatibleSchemaException;
-import org.kitesdk.data.ValidationException;
-import org.kitesdk.data.View;
-import org.kitesdk.data.spi.SchemaValidationUtil;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-@InputRequirement(Requirement.INPUT_REQUIRED)
-@Tags({"kite", "avro", "parquet", "hadoop", "hive", "hdfs", "hbase"})
-@CapabilityDescription("Stores Avro records in a Kite dataset")
-public class StoreInKiteDataset extends AbstractKiteProcessor {
-
- private static final Relationship SUCCESS = new Relationship.Builder()
- .name("success")
- .description("FlowFile content has been successfully saved")
- .build();
-
- private static final Relationship INCOMPATIBLE = new Relationship.Builder()
- .name("incompatible")
- .description("FlowFile content is not compatible with the target dataset")
- .build();
-
- private static final Relationship FAILURE = new Relationship.Builder()
- .name("failure")
- .description("FlowFile content could not be processed")
- .build();
-
- public static final PropertyDescriptor KITE_DATASET_URI
- = new PropertyDescriptor.Builder()
- .name("Target dataset URI")
- .description("URI that identifies a Kite dataset where data will be stored")
- .addValidator(RECOGNIZED_URI)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .required(true)
- .build();
-
- public static final PropertyDescriptor ADDITIONAL_CLASSPATH_RESOURCES = new PropertyDescriptor.Builder()
- .name("additional-classpath-resources")
- .displayName("Additional Classpath Resources")
- .description("A comma-separated list of paths to files and/or directories that will be added to the classpath. When specifying a " +
- "directory, all files with in the directory will be added to the classpath, but further sub-directories will not be included.")
- .required(false)
- .identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE, ResourceType.DIRECTORY)
- .dynamicallyModifiesClasspath(true)
- .build();
-
- private static final List<PropertyDescriptor> PROPERTIES = ImmutableList.<PropertyDescriptor>builder()
- .addAll(AbstractKiteProcessor.getProperties())
- .add(KITE_DATASET_URI)
- .add(ADDITIONAL_CLASSPATH_RESOURCES)
- .build();
-
- private static final Set<Relationship> RELATIONSHIPS = ImmutableSet.<Relationship>builder()
- .add(SUCCESS)
- .add(INCOMPATIBLE)
- .add(FAILURE)
- .build();
-
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return PROPERTIES;
- }
-
- @Override
- public Set<Relationship> getRelationships() {
- return RELATIONSHIPS;
- }
-
- @Override
- public void onTrigger(ProcessContext context, final ProcessSession session)
- throws ProcessException {
- FlowFile flowFile = session.get();
- if (flowFile == null) {
- return;
- }
-
- final View<Record> target = load(context, flowFile);
- final Schema schema = target.getDataset().getDescriptor().getSchema();
-
- try {
- StopWatch timer = new StopWatch(true);
- session.read(flowFile, new InputStreamCallback() {
- @Override
- public void process(InputStream in) throws IOException {
- try (DataFileStream<Record> stream = new DataFileStream<>(
- in, AvroUtil.newDatumReader(schema, Record.class))) {
- IncompatibleSchemaException.check(
- SchemaValidationUtil.canRead(stream.getSchema(), schema),
- "Incompatible file schema %s, expected %s",
- stream.getSchema(), schema);
-
- long written = 0L;
- try (DatasetWriter<Record> writer = target.newWriter()) {
- for (Record record : stream) {
- writer.write(record);
- written += 1;
- }
- } finally {
- session.adjustCounter("Stored records", written,
- true /* cannot roll back the write */);
- }
- }
- }
- });
- timer.stop();
-
- session.getProvenanceReporter().send(flowFile,
- target.getUri().toString(),
- timer.getDuration(TimeUnit.MILLISECONDS),
- true /* cannot roll back the write */);
-
- session.transfer(flowFile, SUCCESS);
-
- } catch (ProcessException | DatasetIOException e) {
- getLogger().error("Failed to read FlowFile", e);
- session.transfer(flowFile, FAILURE);
-
- } catch (ValidationException e) {
- getLogger().error(e.getMessage());
- getLogger().debug("Incompatible schema error", e);
- session.transfer(flowFile, INCOMPATIBLE);
- }
- }
-
- private View<Record> load(ProcessContext context, FlowFile file) {
- String uri = context.getProperty(KITE_DATASET_URI)
- .evaluateAttributeExpressions(file)
- .getValue();
- return Datasets.load(uri, Record.class);
- }
-}
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
deleted file mode 100644
index 59fbe2d..0000000
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ /dev/null
@@ -1,19 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-org.apache.nifi.processors.kite.StoreInKiteDataset
-org.apache.nifi.processors.kite.ConvertCSVToAvro
-org.apache.nifi.processors.kite.ConvertJSONToAvro
-org.apache.nifi.processors.kite.ConvertAvroSchema
-org.apache.nifi.processors.kite.InferAvroSchema
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/docs/org.apache.nifi.processors.kite.ConvertAvroSchema/additionalDetails.html b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/docs/org.apache.nifi.processors.kite.ConvertAvroSchema/additionalDetails.html
deleted file mode 100644
index 27ad803..0000000
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/docs/org.apache.nifi.processors.kite.ConvertAvroSchema/additionalDetails.html
+++ /dev/null
@@ -1,142 +0,0 @@
-<!DOCTYPE html>
-<html lang="en">
- <!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- -->
- <head>
- <meta charset="utf-8" />
- <title>ConvertAvroSchema</title>
-
- <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
- </head>
-
- <body>
- <!-- Processor Documentation ================================================== -->
- <h2>Description:</h2>
- <p>This processor is used to convert data between two Avro formats, such as those coming from the <code>ConvertCSVToAvro</code> or
- <code>ConvertJSONToAvro</code> processors. The input and output content of the flow files should be Avro data files. The processor
- includes support for the following basic type conversions:
- <ul>
- <li>Anything to String, using the data's default String representation</li>
- <li>String types to numeric types int, long, double, and float</li>
- <li>Conversion to and from optional Avro types</li>
- </ul>
- In addition, fields can be renamed or unpacked from a record type by using the dynamic properties.
- </p>
- <h2>Mapping Example:</h2>
- <p>
- Throughout this example, we will refer to input data with the following schema:
- <pre>
-{
- "type": "record",
- "name": "CustomerInput",
- "namespace": "org.apache.example",
- "fields": [
- {
- "name": "id",
- "type": "string"
- },
- {
- "name": "companyName",
- "type": ["null", "string"],
- "default": null
- },
- {
- "name": "revenue",
- "type": ["null", "string"],
- "default": null
- },
- {
- "name" : "parent",
- "type" : [ "null", {
- "type" : "record",
- "name" : "parent",
- "fields" : [ {
- "name" : "name",
- "type" : ["null", "string"],
- "default" : null
- }, {
- "name" : "id",
- "type" : "string"
- } ]
- } ],
- "default" : null
- }
- ]
-}
- </pre>
- Where even though the revenue and id fields are mapped as string, they are logically long and double respectively.
- By default, fields with matching names will be mapped automatically, so the following output schema could be converted
- without using dynamic properties:
- <pre>
-{
- "type": "record",
- "name": "SimpleCustomerOutput",
- "namespace": "org.apache.example",
- "fields": [
- {
- "name": "id",
- "type": "long"
- },
- {
- "name": "companyName",
- "type": ["null", "string"],
- "default": null
- },
- {
- "name": "revenue",
- "type": ["null", "double"],
- "default": null
- }
- ]
-}
- </pre>
- To rename companyName to name and to extract the parent's id field, both a schema and a dynamic properties must be provided.
- For example, to convert to the following schema:
- <pre>
-{
- "type": "record",
- "name": "SimpleCustomerOutput",
- "namespace": "org.apache.example",
- "fields": [
- {
- "name": "id",
- "type": "long"
- },
- {
- "name": "name",
- "type": ["null", "string"],
- "default": null
- },
- {
- "name": "revenue",
- "type": ["null", "double"],
- "default": null
- },
- {
- "name": "parentId",
- "type": ["null", "long"],
- "default": null
- }
- ]
-}
- </pre>
- The following dynamic properties would be used:
- <pre>
-"companyName" -> "name"
-"parent.id" -> "parentId"
- </pre>
- </p>
-</body>
-</html>
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestAvroRecordConverter.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestAvroRecordConverter.java
deleted file mode 100644
index 11e86bf..0000000
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestAvroRecordConverter.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nifi.processors.kite;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-import java.util.Collections;
-import java.util.Map;
-
-import org.apache.avro.Schema;
-import org.apache.avro.SchemaBuilder;
-import org.apache.avro.generic.GenericData.Record;
-import org.apache.commons.lang.LocaleUtils;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.junit.Test;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-
-public class TestAvroRecordConverter {
- final static ObjectMapper OBJECT_MAPPER = new ObjectMapper();
- final static Map<String, String> EMPTY_MAPPING = ImmutableMap.of();
- final static String NESTED_RECORD_SCHEMA_STRING = "{\n"
- + " \"type\": \"record\",\n"
- + " \"name\": \"NestedInput\",\n"
- + " \"namespace\": \"org.apache.example\",\n"
- + " \"fields\": [\n" + " {\n"
- + " \"name\": \"l1\",\n"
- + " \"type\": \"long\"\n"
- + " },\n"
- + " {\n" + " \"name\": \"s1\",\n"
- + " \"type\": \"string\"\n"
- + " },\n"
- + " {\n"
- + " \"name\": \"parent\",\n"
- + " \"type\": [\"null\", {\n"
- + " \"type\": \"record\",\n"
- + " \"name\": \"parent\",\n"
- + " \"fields\": [\n"
- + " { \"name\": \"id\", \"type\": \"long\" },\n"
- + " { \"name\": \"name\", \"type\": \"string\" }\n"
- + " ]"
- + " } ]"
- + " }"
- + " ] }";
- final static Schema NESTED_RECORD_SCHEMA = new Schema.Parser()
- .parse(NESTED_RECORD_SCHEMA_STRING);
- final static Schema NESTED_PARENT_SCHEMA = AvroRecordConverter
- .getNonNullSchema(NESTED_RECORD_SCHEMA.getField("parent").schema());
- final static Schema UNNESTED_OUTPUT_SCHEMA = SchemaBuilder.record("Output")
- .namespace("org.apache.example").fields().requiredLong("l1")
- .requiredLong("s1").optionalLong("parentId").endRecord();
-
- /**
- * Tests the case where we don't use a mapping file and just map records by
- * name.
- */
- @Test
- public void testDefaultConversion() throws Exception {
- // We will convert s1 from string to long (or leave it null), ignore s2,
- // convert s3 to from string to double, convert l1 from long to string,
- // and leave l2 the same.
- Schema input = SchemaBuilder.record("Input")
- .namespace("com.cloudera.edh").fields()
- .nullableString("s1", "").requiredString("s2")
- .requiredString("s3").optionalLong("l1").requiredLong("l2")
- .endRecord();
- Schema output = SchemaBuilder.record("Output")
- .namespace("com.cloudera.edh").fields().optionalLong("s1")
- .optionalString("l1").requiredLong("l2").requiredDouble("s3")
- .endRecord();
-
- AvroRecordConverter converter = new AvroRecordConverter(input, output,
- EMPTY_MAPPING, LocaleUtils.toLocale("en_US"));
-
- Record inputRecord = new Record(input);
- inputRecord.put("s1", null);
- inputRecord.put("s2", "blah");
- inputRecord.put("s3", "5.5");
- inputRecord.put("l1", null);
- inputRecord.put("l2", 5L);
- Record outputRecord = converter.convert(inputRecord);
- assertNull(outputRecord.get("s1"));
- assertNull(outputRecord.get("l1"));
- assertEquals(5L, outputRecord.get("l2"));
- assertEquals(5.5, outputRecord.get("s3"));
-
- inputRecord.put("s1", "500");
- inputRecord.put("s2", "blah");
- inputRecord.put("s3", "5.5e-5");
- inputRecord.put("l1", 100L);
- inputRecord.put("l2", 2L);
- outputRecord = converter.convert(inputRecord);
- assertEquals(500L, outputRecord.get("s1"));
- assertEquals("100", outputRecord.get("l1"));
- assertEquals(2L, outputRecord.get("l2"));
- assertEquals(5.5e-5, outputRecord.get("s3"));
- }
-
- /**
- * Tests the case where we want to default map one field and explicitly map
- * another.
- */
- @Test
- public void testExplicitMapping() throws Exception {
- // We will convert s1 from string to long (or leave it null), ignore s2,
- // convert l1 from long to string, and leave l2 the same.
- Schema input = NESTED_RECORD_SCHEMA;
- Schema parent = NESTED_PARENT_SCHEMA;
- Schema output = UNNESTED_OUTPUT_SCHEMA;
- Map<String, String> mapping = ImmutableMap.of("parent.id", "parentId");
-
- AvroRecordConverter converter = new AvroRecordConverter(input, output,
- mapping);
-
- Record inputRecord = new Record(input);
- inputRecord.put("l1", 5L);
- inputRecord.put("s1", "1000");
- Record parentRecord = new Record(parent);
- parentRecord.put("id", 200L);
- parentRecord.put("name", "parent");
- inputRecord.put("parent", parentRecord);
- Record outputRecord = converter.convert(inputRecord);
- assertEquals(5L, outputRecord.get("l1"));
- assertEquals(1000L, outputRecord.get("s1"));
- assertEquals(200L, outputRecord.get("parentId"));
- }
-
- /**
- * Tests the case where we try to convert a string to a long incorrectly.
- */
- @Test(expected = org.apache.nifi.processors.kite.AvroRecordConverter.AvroConversionException.class)
- public void testIllegalConversion() throws Exception {
- // We will convert s1 from string to long (or leave it null), ignore s2,
- // convert l1 from long to string, and leave l2 the same.
- Schema input = SchemaBuilder.record("Input")
- .namespace("com.cloudera.edh").fields()
- .nullableString("s1", "").requiredString("s2")
- .optionalLong("l1").requiredLong("l2").endRecord();
- Schema output = SchemaBuilder.record("Output")
- .namespace("com.cloudera.edh").fields().optionalLong("s1")
- .optionalString("l1").requiredLong("l2").endRecord();
-
- AvroRecordConverter converter = new AvroRecordConverter(input, output,
- EMPTY_MAPPING);
-
- Record inputRecord = new Record(input);
- inputRecord.put("s1", "blah");
- inputRecord.put("s2", "blah");
- inputRecord.put("l1", null);
- inputRecord.put("l2", 5L);
- converter.convert(inputRecord);
- }
-
- @Test
- public void testGetUnmappedFields() throws Exception {
- Schema input = SchemaBuilder.record("Input")
- .namespace("com.cloudera.edh").fields()
- .nullableString("s1", "").requiredString("s2")
- .optionalLong("l1").requiredLong("l2").endRecord();
- Schema output = SchemaBuilder.record("Output")
- .namespace("com.cloudera.edh").fields().optionalLong("field")
- .endRecord();
-
- // Test the case where the field isn't mapped at all.
- AvroRecordConverter converter = new AvroRecordConverter(input, output,
- EMPTY_MAPPING);
- assertEquals(ImmutableList.of("field"), converter.getUnmappedFields());
-
- // Test the case where we tried to map from a non-existent field.
- converter = new AvroRecordConverter(input, output, ImmutableMap.of(
- "nonExistentField", "field"));
- assertEquals(ImmutableList.of("field"), converter.getUnmappedFields());
-
- // Test the case where we tried to map from a non-existent record.
- converter = new AvroRecordConverter(input, output, ImmutableMap.of(
- "parent.nonExistentField", "field"));
- assertEquals(ImmutableList.of("field"), converter.getUnmappedFields());
-
- // Test a valid case
- converter = new AvroRecordConverter(input, output, ImmutableMap.of(
- "l2", "field"));
- assertEquals(Collections.EMPTY_LIST, converter.getUnmappedFields());
- }
-}
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
deleted file mode 100644
index 58d9ce8..0000000
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nifi.processors.kite;
-
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-
-import org.apache.avro.Schema;
-import org.apache.avro.SchemaBuilder;
-import org.apache.avro.file.DataFileStream;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumReader;
-import org.apache.commons.lang3.SystemUtils;
-import org.apache.nifi.processors.kite.AbstractKiteConvertProcessor.CodecType;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Assert;
-import org.junit.Assume;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import static org.apache.nifi.processors.kite.TestUtil.streamFor;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-public class TestCSVToAvroProcessor {
-
- public static final Schema SCHEMA = SchemaBuilder.record("Test").fields()
- .requiredLong("id")
- .requiredString("color")
- .optionalDouble("price")
- .endRecord();
-
- public static final String CSV_CONTENT = ""
- + "1,green\n"
- + ",blue,\n" + // invalid, ID is missing
- "2,grey,12.95";
-
- public static final String FAILURE_CONTENT = ""
- + ",blue,\n"; // invalid, ID is missing
-
- public static final String TSV_CONTENT = ""
- + "1\tgreen\n"
- + "\tblue\t\n" + // invalid, ID is missing
- "2\tgrey\t12.95";
-
- public static final String FAILURE_SUMMARY = "" +
- "Field id: cannot make \"long\" value: '': Field id type:LONG pos:0 not set and has no default value";
-
- @BeforeClass
- public static void setUpSuite() {
- Assume.assumeTrue("Test only runs on *nix", !SystemUtils.IS_OS_WINDOWS);
- }
-
- /**
- * Test for a schema that is not a JSON but does not throw exception when trying to parse as an URI
- */
- @Test
- public void testSchemeValidation() throws IOException {
- TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
- runner.setProperty(ConvertCSVToAvro.SCHEMA, "column1;column2");
- runner.assertNotValid();
- runner.setProperty(ConvertCSVToAvro.SCHEMA, "src/test/resources/Shapes_header.csv.avro");
- runner.assertNotValid();
- runner.setProperty(ConvertCSVToAvro.SCHEMA, "file:" + new File("src/test/resources/Shapes_header.csv.avro").getAbsolutePath());
- runner.assertValid();
- runner.setProperty(ConvertCSVToAvro.SCHEMA, "");
- runner.assertNotValid();
- }
-
- /**
- * Basic test for tab separated files, similar to #test
- */
- @Test
- public void testTabSeparatedConversion() throws IOException {
- TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
- runner.assertNotValid();
- runner.setProperty(ConvertCSVToAvro.SCHEMA, SCHEMA.toString());
- runner.setProperty(ConvertCSVToAvro.DELIMITER, "\\t");
- runner.assertValid();
-
- runner.enqueue(streamFor(TSV_CONTENT));
- runner.run();
-
- long converted = runner.getCounterValue("Converted records");
- long errors = runner.getCounterValue("Conversion errors");
- Assert.assertEquals("Should convert 2 rows", 2, converted);
- Assert.assertEquals("Should reject 1 row", 1, errors);
-
- runner.assertTransferCount("success", 1);
- runner.assertTransferCount("failure", 0);
- runner.assertTransferCount("incompatible", 1);
-
- MockFlowFile incompatible = runner.getFlowFilesForRelationship("incompatible").get(0);
- String failureContent = new String(runner.getContentAsByteArray(incompatible),
- StandardCharsets.UTF_8);
-
- Assert.assertEquals("Should reject an invalid string and double",
- TSV_CONTENT, failureContent);
- Assert.assertEquals("Should accumulate error messages",
- FAILURE_SUMMARY, incompatible.getAttribute("errors"));
- }
-
- @Test
- public void testBasicConversion() throws IOException {
- TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
- runner.assertNotValid();
- runner.setProperty(ConvertCSVToAvro.SCHEMA, SCHEMA.toString());
- runner.assertValid();
-
- runner.enqueue(streamFor(CSV_CONTENT));
- runner.run();
-
- long converted = runner.getCounterValue("Converted records");
- long errors = runner.getCounterValue("Conversion errors");
- Assert.assertEquals("Should convert 2 rows", 2, converted);
- Assert.assertEquals("Should reject 1 row", 1, errors);
-
- runner.assertTransferCount("success", 1);
- runner.assertTransferCount("failure", 0);
- runner.assertTransferCount("incompatible", 1);
-
- MockFlowFile incompatible = runner.getFlowFilesForRelationship("incompatible").get(0);
- String failureContent = new String(runner.getContentAsByteArray(incompatible),
- StandardCharsets.UTF_8);
- Assert.assertEquals("Should reject an invalid string and double",
- CSV_CONTENT, failureContent);
- Assert.assertEquals("Should accumulate error messages",
- FAILURE_SUMMARY, incompatible.getAttribute("errors"));
- }
-
- @Test
- public void testBasicConversionWithCompression() throws IOException {
- TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
- runner.assertNotValid();
- runner.setProperty(ConvertCSVToAvro.SCHEMA, SCHEMA.toString());
- runner.setProperty(AbstractKiteConvertProcessor.COMPRESSION_TYPE, CodecType.DEFLATE.toString());
- runner.assertValid();
-
- runner.enqueue(streamFor(CSV_CONTENT));
- runner.run();
-
- long converted = runner.getCounterValue("Converted records");
- long errors = runner.getCounterValue("Conversion errors");
- Assert.assertEquals("Should convert 2 rows", 2, converted);
- Assert.assertEquals("Should reject 1 row", 1, errors);
-
- runner.assertTransferCount("success", 1);
- runner.assertTransferCount("failure", 0);
- runner.assertTransferCount("incompatible", 1);
-
- MockFlowFile incompatible = runner.getFlowFilesForRelationship("incompatible").get(0);
- String failureContent = new String(runner.getContentAsByteArray(incompatible),
- StandardCharsets.UTF_8);
- Assert.assertEquals("Should reject an invalid string and double",
- CSV_CONTENT, failureContent);
- Assert.assertEquals("Should accumulate error messages",
- FAILURE_SUMMARY, incompatible.getAttribute("errors"));
- }
-
- @Test
- public void testAlternateCharset() throws IOException {
- TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
- runner.setProperty(ConvertCSVToAvro.SCHEMA, SCHEMA.toString());
- runner.setProperty(ConvertCSVToAvro.CHARSET, "utf16");
- runner.assertValid();
-
- runner.enqueue(streamFor(CSV_CONTENT, Charset.forName("UTF-16")));
- runner.run();
-
- long converted = runner.getCounterValue("Converted records");
- long errors = runner.getCounterValue("Conversion errors");
- Assert.assertEquals("Should convert 2 rows", 2, converted);
- Assert.assertEquals("Should reject 1 row", 1, errors);
-
- runner.assertTransferCount("success", 1);
- runner.assertTransferCount("failure", 0);
- runner.assertTransferCount("incompatible", 1);
-
- MockFlowFile incompatible = runner.getFlowFilesForRelationship("incompatible").get(0);
- Assert.assertEquals("Should accumulate error messages",
- FAILURE_SUMMARY, incompatible.getAttribute("errors"));
- }
-
- @Test
- public void testOnlyErrors() throws IOException {
- TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
- runner.assertNotValid();
- runner.setProperty(ConvertCSVToAvro.SCHEMA, SCHEMA.toString());
- runner.assertValid();
-
- runner.enqueue(streamFor(FAILURE_CONTENT));
- runner.run();
-
- long converted = runner.getCounterValue("Converted records");
- long errors = runner.getCounterValue("Conversion errors");
- Assert.assertEquals("Should convert 0 rows", 0, converted);
- Assert.assertEquals("Should reject 1 row", 1, errors);
-
- runner.assertTransferCount("success", 0);
- runner.assertTransferCount("failure", 1);
- runner.assertTransferCount("incompatible", 0);
-
- MockFlowFile incompatible = runner.getFlowFilesForRelationship("failure").get(0);
- Assert.assertEquals("Should set an error message",
- FAILURE_SUMMARY, incompatible.getAttribute("errors"));
- }
-
- @Test
- public void testEmptyContent() throws IOException {
- TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
- runner.assertNotValid();
- runner.setProperty(ConvertCSVToAvro.SCHEMA, SCHEMA.toString());
- runner.assertValid();
-
- runner.enqueue(streamFor(""));
- runner.run();
-
- long converted = runner.getCounterValue("Converted records");
- long errors = runner.getCounterValue("Conversion errors");
- Assert.assertEquals("Should convert 0 rows", 0, converted);
- Assert.assertEquals("Should reject 0 row", 0, errors);
-
- runner.assertTransferCount("success", 0);
- runner.assertTransferCount("failure", 1);
- runner.assertTransferCount("incompatible", 0);
-
- MockFlowFile incompatible = runner.getFlowFilesForRelationship("failure").get(0);
- Assert.assertEquals("Should set an error message",
- "No incoming records", incompatible.getAttribute("errors"));
- }
-
- @Test
- public void testBasicConversionNoErrors() throws IOException {
- TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
- runner.assertNotValid();
- runner.setProperty(ConvertCSVToAvro.SCHEMA, SCHEMA.toString());
- runner.assertValid();
-
- runner.enqueue(streamFor("1,green\n2,blue,\n3,grey,12.95"));
- runner.run();
-
- long converted = runner.getCounterValue("Converted records");
- long errors = runner.getCounterValue("Conversion errors");
- Assert.assertEquals("Should convert 3 rows", 3, converted);
- Assert.assertEquals("Should reject 0 row", 0, errors);
-
- runner.assertTransferCount("success", 1);
- runner.assertTransferCount("failure", 0);
- runner.assertTransferCount("incompatible", 0);
- }
-
- @Test
- public void testExpressionLanguageBasedCSVProperties() throws IOException {
- TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
- runner.assertNotValid();
- runner.setProperty(ConvertCSVToAvro.SCHEMA, SCHEMA.toString());
- runner.assertValid();
-
- runner.setProperty(ConvertCSVToAvro.DELIMITER, "${csv.delimiter}");
- runner.setProperty(ConvertCSVToAvro.QUOTE, "${csv.quote}");
-
- HashMap<String, String> flowFileAttributes = new HashMap<String,String>();
- flowFileAttributes.put("csv.delimiter", "|");
- flowFileAttributes.put("csv.quote", "~");
-
- runner.enqueue(streamFor("1|green\n2|~blue|field~|\n3|grey|12.95"), flowFileAttributes);
- runner.run();
-
- long converted = runner.getCounterValue("Converted records");
- long errors = runner.getCounterValue("Conversion errors");
- Assert.assertEquals("Should convert 3 rows", 3, converted);
- Assert.assertEquals("Should reject 0 row", 0, errors);
-
- runner.assertTransferCount("success", 1);
- runner.assertTransferCount("failure", 0);
- runner.assertTransferCount("incompatible", 0);
-
- final InputStream in = new ByteArrayInputStream(runner.getFlowFilesForRelationship("success").get(0).toByteArray());
- final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
- try (DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(in, datumReader)) {
- assertTrue(dataFileReader.hasNext());
- GenericRecord record = dataFileReader.next();
- assertEquals(1L, record.get("id"));
- assertEquals("green", record.get("color").toString());
- assertNull(record.get("price"));
-
- assertTrue(dataFileReader.hasNext());
- record = dataFileReader.next();
- assertEquals(2L, record.get("id"));
- assertEquals("blue|field", record.get("color").toString());
- assertNull(record.get("price"));
-
- assertTrue(dataFileReader.hasNext());
- record = dataFileReader.next();
- assertEquals(3L, record.get("id"));
- assertEquals("grey", record.get("color").toString());
- assertEquals(12.95, record.get("price"));
- }
- }
-}
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConfigurationProperty.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConfigurationProperty.java
deleted file mode 100644
index a8d9d75..0000000
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConfigurationProperty.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nifi.processors.kite;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import org.apache.avro.generic.GenericData.Record;
-import org.apache.commons.lang3.SystemUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Assert;
-import org.junit.Assume;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.After;
-import org.junit.rules.TemporaryFolder;
-import org.kitesdk.data.DatasetDescriptor;
-import org.kitesdk.data.Datasets;
-import org.kitesdk.data.spi.DefaultConfiguration;
-import org.kitesdk.data.Dataset;
-
-public class TestConfigurationProperty {
-
- @Rule
- public final TemporaryFolder temp = new TemporaryFolder();
- public File confLocation;
-
- private String datasetUri = null;
- private Dataset<Record> dataset = null;
-
- @BeforeClass
- public static void setUpSuite() {
- Assume.assumeTrue("Test only runs on *nix", !SystemUtils.IS_OS_WINDOWS);
- }
-
- @Before
- public void saveConfiguration() throws IOException {
- Configuration conf = new Configuration(false);
- conf.setBoolean("nifi.config.canary", true);
-
- confLocation = temp.newFile("nifi-conf.xml");
- FileOutputStream out = new FileOutputStream(confLocation);
- conf.writeXml(out);
- out.close();
- }
-
- @Before
- public void createDataset() throws Exception {
- DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
- .schema(TestUtil.USER_SCHEMA)
- .build();
- this.datasetUri = "dataset:file:" + temp.newFolder("ns", "temp").toString();
- this.dataset = Datasets.create(datasetUri, descriptor, Record.class);
- }
-
- @After
- public void deleteDataset() throws Exception {
- Datasets.delete(datasetUri);
- }
-
- @Test
- public void testConfigurationCanary() throws IOException {
- TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
- runner.setProperty(
- AbstractKiteProcessor.CONF_XML_FILES, confLocation.toString());
-
- Assert.assertFalse("Should not contain canary value",
- DefaultConfiguration.get().getBoolean("nifi.config.canary", false));
-
- AbstractKiteProcessor processor = new StoreInKiteDataset();
- ProcessContext context = runner.getProcessContext();
- processor.setDefaultConfiguration(context);
-
- Assert.assertTrue("Should contain canary value",
- DefaultConfiguration.get().getBoolean("nifi.config.canary", false));
- }
-
- @Test
- public void testFilesMustExist() throws IOException {
- TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
- runner.setProperty(
- AbstractKiteProcessor.CONF_XML_FILES, temp.newFile().toString());
- runner.assertNotValid();
- }
-
- @Test
- public void testConfigurationExpressionLanguage() throws IOException {
- TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
- runner.setProperty(
- AbstractKiteProcessor.CONF_XML_FILES, "${filename:substring(0,0):append('pom.xml')}");
- runner.setProperty(
- StoreInKiteDataset.KITE_DATASET_URI, datasetUri);
- runner.assertValid();
- // botch the Expression Language evaluation
- runner.setProperty(
- AbstractKiteProcessor.CONF_XML_FILES, "${filename:substring(0,0):");
- runner.assertNotValid();
- }
-}
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConvertAvroSchema.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConvertAvroSchema.java
deleted file mode 100644
index 7a62ac5..0000000
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConvertAvroSchema.java
+++ /dev/null
@@ -1,372 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nifi.processors.kite;
-
-import static org.apache.nifi.processors.kite.TestUtil.streamFor;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.text.NumberFormat;
-import java.text.ParseException;
-import java.util.List;
-import java.util.Locale;
-
-import org.apache.avro.Schema;
-import org.apache.avro.SchemaBuilder;
-import org.apache.avro.file.DataFileStream;
-import org.apache.avro.generic.GenericData.Record;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.commons.lang.LocaleUtils;
-import org.apache.nifi.processors.kite.AbstractKiteConvertProcessor.CodecType;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-public class TestConvertAvroSchema {
-
- public static final Schema INPUT_SCHEMA = SchemaBuilder.record("InputTest")
- .fields().requiredString("id").requiredString("primaryColor")
- .optionalString("secondaryColor").optionalString("price")
- .endRecord();
-
- public static final Schema OUTPUT_SCHEMA = SchemaBuilder.record("Test")
- .fields().requiredLong("id").requiredString("color")
- .optionalDouble("price").endRecord();
-
- public static final String MAPPING = "[{\"source\":\"primaryColor\", \"target\":\"color\"}]";
-
- public static final String FAILURE_SUMMARY = "Cannot convert free to double";
-
- @Test
- public void testBasicConversion() throws IOException {
- TestRunner runner = TestRunners.newTestRunner(ConvertAvroSchema.class);
- runner.assertNotValid();
- runner.setProperty(ConvertAvroSchema.INPUT_SCHEMA,
- INPUT_SCHEMA.toString());
- runner.setProperty(ConvertAvroSchema.OUTPUT_SCHEMA,
- OUTPUT_SCHEMA.toString());
- Locale locale = Locale.getDefault();
- runner.setProperty("primaryColor", "color");
- runner.assertValid();
-
- NumberFormat format = NumberFormat.getInstance(locale);
-
- // Two valid rows, and one invalid because "free" is not a double.
- Record goodRecord1 = dataBasic("1", "blue", null, null);
- Record goodRecord2 = dataBasic("2", "red", "yellow", format.format(5.5));
- Record badRecord = dataBasic("3", "red", "yellow", "free");
- List<Record> input = Lists.newArrayList(goodRecord1, goodRecord2,
- badRecord);
-
- runner.enqueue(streamFor(input));
- runner.run();
-
- long converted = runner.getCounterValue("Converted records");
- long errors = runner.getCounterValue("Conversion errors");
- Assert.assertEquals("Should convert 2 rows", 2, converted);
- Assert.assertEquals("Should reject 1 rows", 1, errors);
-
- runner.assertTransferCount("success", 1);
- runner.assertTransferCount("failure", 1);
-
- MockFlowFile incompatible = runner.getFlowFilesForRelationship(
- "failure").get(0);
- GenericDatumReader<Record> reader = new GenericDatumReader<Record>(
- INPUT_SCHEMA);
- DataFileStream<Record> stream = new DataFileStream<Record>(
- new ByteArrayInputStream(
- runner.getContentAsByteArray(incompatible)), reader);
- int count = 0;
- for (Record r : stream) {
- Assert.assertEquals(badRecord, r);
- count++;
- }
- stream.close();
- Assert.assertEquals(1, count);
- Assert.assertEquals("Should accumulate error messages",
- FAILURE_SUMMARY, incompatible.getAttribute("errors"));
-
- GenericDatumReader<Record> successReader = new GenericDatumReader<Record>(
- OUTPUT_SCHEMA);
- DataFileStream<Record> successStream = new DataFileStream<Record>(
- new ByteArrayInputStream(runner.getContentAsByteArray(runner
- .getFlowFilesForRelationship("success").get(0))),
- successReader);
- count = 0;
- for (Record r : successStream) {
- if (count == 0) {
- Assert.assertEquals(convertBasic(goodRecord1, locale), r);
- } else {
- Assert.assertEquals(convertBasic(goodRecord2, locale), r);
- }
- count++;
- }
- successStream.close();
- Assert.assertEquals(2, count);
- }
-
- @Test
- public void testBasicConversionWithCompression() throws IOException {
- TestRunner runner = TestRunners.newTestRunner(ConvertAvroSchema.class);
- runner.assertNotValid();
- runner.setProperty(ConvertAvroSchema.INPUT_SCHEMA, INPUT_SCHEMA.toString());
- runner.setProperty(ConvertAvroSchema.OUTPUT_SCHEMA, OUTPUT_SCHEMA.toString());
- runner.setProperty(AbstractKiteConvertProcessor.COMPRESSION_TYPE, CodecType.BZIP2.toString());
- Locale locale = Locale.getDefault();
- runner.setProperty("primaryColor", "color");
- runner.assertValid();
-
- NumberFormat format = NumberFormat.getInstance(locale);
-
- // Two valid rows, and one invalid because "free" is not a double.
- Record goodRecord1 = dataBasic("1", "blue", null, null);
- Record goodRecord2 = dataBasic("2", "red", "yellow", format.format(5.5));
- Record badRecord = dataBasic("3", "red", "yellow", "free");
- List<Record> input = Lists.newArrayList(goodRecord1, goodRecord2,
- badRecord);
-
- runner.enqueue(streamFor(input));
- runner.run();
-
- long converted = runner.getCounterValue("Converted records");
- long errors = runner.getCounterValue("Conversion errors");
- Assert.assertEquals("Should convert 2 rows", 2, converted);
- Assert.assertEquals("Should reject 1 rows", 1, errors);
-
- runner.assertTransferCount("success", 1);
- runner.assertTransferCount("failure", 1);
-
- MockFlowFile incompatible = runner.getFlowFilesForRelationship(
- "failure").get(0);
- GenericDatumReader<Record> reader = new GenericDatumReader<Record>(
- INPUT_SCHEMA);
- DataFileStream<Record> stream = new DataFileStream<Record>(
- new ByteArrayInputStream(
- runner.getContentAsByteArray(incompatible)), reader);
- int count = 0;
- for (Record r : stream) {
- Assert.assertEquals(badRecord, r);
- count++;
- }
- stream.close();
- Assert.assertEquals(1, count);
- Assert.assertEquals("Should accumulate error messages",
- FAILURE_SUMMARY, incompatible.getAttribute("errors"));
-
- GenericDatumReader<Record> successReader = new GenericDatumReader<Record>(
- OUTPUT_SCHEMA);
- DataFileStream<Record> successStream = new DataFileStream<Record>(
- new ByteArrayInputStream(runner.getContentAsByteArray(runner
- .getFlowFilesForRelationship("success").get(0))),
- successReader);
- count = 0;
- for (Record r : successStream) {
- if (count == 0) {
- Assert.assertEquals(convertBasic(goodRecord1, locale), r);
- } else {
- Assert.assertEquals(convertBasic(goodRecord2, locale), r);
- }
- count++;
- }
- successStream.close();
- Assert.assertEquals(2, count);
- }
-
- @Test
- public void testBasicConversionWithLocales() throws IOException {
- testBasicConversionWithLocale("en_US");
- testBasicConversionWithLocale("fr_FR");
- }
-
- public void testBasicConversionWithLocale(String localeString) throws IOException {
- TestRunner runner = TestRunners.newTestRunner(ConvertAvroSchema.class);
- runner.assertNotValid();
- runner.setProperty(ConvertAvroSchema.INPUT_SCHEMA,
- INPUT_SCHEMA.toString());
- runner.setProperty(ConvertAvroSchema.OUTPUT_SCHEMA,
- OUTPUT_SCHEMA.toString());
- Locale locale = LocaleUtils.toLocale(localeString);
- runner.setProperty(ConvertAvroSchema.LOCALE, localeString);
- runner.setProperty("primaryColor", "color");
- runner.assertValid();
-
- NumberFormat format = NumberFormat.getInstance(locale);
-
- // Two valid rows, and one invalid because "free" is not a double.
- Record goodRecord1 = dataBasic("1", "blue", null, null);
- Record goodRecord2 = dataBasic("2", "red", "yellow", format.format(5.5));
- Record badRecord = dataBasic("3", "red", "yellow", "free");
- List<Record> input = Lists.newArrayList(goodRecord1, goodRecord2,
- badRecord);
-
- runner.enqueue(streamFor(input));
- runner.run();
-
- long converted = runner.getCounterValue("Converted records");
- long errors = runner.getCounterValue("Conversion errors");
- Assert.assertEquals("Should convert 2 rows", 2, converted);
- Assert.assertEquals("Should reject 1 rows", 1, errors);
-
- runner.assertTransferCount("success", 1);
- runner.assertTransferCount("failure", 1);
-
- MockFlowFile incompatible = runner.getFlowFilesForRelationship(
- "failure").get(0);
- GenericDatumReader<Record> reader = new GenericDatumReader<Record>(
- INPUT_SCHEMA);
- DataFileStream<Record> stream = new DataFileStream<Record>(
- new ByteArrayInputStream(
- runner.getContentAsByteArray(incompatible)), reader);
- int count = 0;
- for (Record r : stream) {
- Assert.assertEquals(badRecord, r);
- count++;
- }
- stream.close();
- Assert.assertEquals(1, count);
- Assert.assertEquals("Should accumulate error messages",
- FAILURE_SUMMARY, incompatible.getAttribute("errors"));
-
- GenericDatumReader<Record> successReader = new GenericDatumReader<Record>(
- OUTPUT_SCHEMA);
- DataFileStream<Record> successStream = new DataFileStream<Record>(
- new ByteArrayInputStream(runner.getContentAsByteArray(runner
- .getFlowFilesForRelationship("success").get(0))),
- successReader);
- count = 0;
- for (Record r : successStream) {
- if (count == 0) {
- Assert.assertEquals(convertBasic(goodRecord1, locale), r);
- } else {
- Assert.assertEquals(convertBasic(goodRecord2, locale), r);
- }
- count++;
- }
- successStream.close();
- Assert.assertEquals(2, count);
- }
-
- @Test
- public void testNestedConversion() throws IOException {
- TestRunner runner = TestRunners.newTestRunner(ConvertAvroSchema.class);
- runner.assertNotValid();
- runner.setProperty(ConvertAvroSchema.INPUT_SCHEMA,
- TestAvroRecordConverter.NESTED_RECORD_SCHEMA.toString());
- runner.setProperty(ConvertAvroSchema.OUTPUT_SCHEMA,
- TestAvroRecordConverter.UNNESTED_OUTPUT_SCHEMA.toString());
- runner.setProperty("parent.id", "parentId");
- runner.assertValid();
-
- // Two valid rows
- Record goodRecord1 = dataNested(1L, "200", null, null);
- Record goodRecord2 = dataNested(2L, "300", 5L, "ParentCompany");
- List<Record> input = Lists.newArrayList(goodRecord1, goodRecord2);
-
- runner.enqueue(streamFor(input));
- runner.run();
-
- long converted = runner.getCounterValue("Converted records");
- long errors = runner.getCounterValue("Conversion errors");
- Assert.assertEquals("Should convert 2 rows", 2, converted);
- Assert.assertEquals("Should reject 0 rows", 0, errors);
-
- runner.assertTransferCount("success", 1);
- runner.assertTransferCount("failure", 0);
-
- GenericDatumReader<Record> successReader = new GenericDatumReader<Record>(
- TestAvroRecordConverter.UNNESTED_OUTPUT_SCHEMA);
- DataFileStream<Record> successStream = new DataFileStream<Record>(
- new ByteArrayInputStream(runner.getContentAsByteArray(runner
- .getFlowFilesForRelationship("success").get(0))),
- successReader);
- int count = 0;
- for (Record r : successStream) {
- if (count == 0) {
- Assert.assertEquals(convertNested(goodRecord1), r);
- } else {
- Assert.assertEquals(convertNested(goodRecord2), r);
- }
- count++;
- }
- successStream.close();
- Assert.assertEquals(2, count);
- }
-
- private Record convertBasic(Record inputRecord, Locale locale) {
- Record result = new Record(OUTPUT_SCHEMA);
- result.put("id", Long.parseLong(inputRecord.get("id").toString()));
- result.put("color", inputRecord.get("primaryColor").toString());
- if (inputRecord.get("price") == null) {
- result.put("price", null);
- } else {
- final NumberFormat format = NumberFormat.getInstance(locale);
- double price;
- try {
- price = format.parse(inputRecord.get("price").toString()).doubleValue();
- } catch (ParseException e) {
- // Shouldn't happen
- throw new RuntimeException(e);
- }
- result.put("price", price);
- }
- return result;
- }
-
- private Record dataBasic(String id, String primaryColor,
- String secondaryColor, String price) {
- Record result = new Record(INPUT_SCHEMA);
- result.put("id", id);
- result.put("primaryColor", primaryColor);
- result.put("secondaryColor", secondaryColor);
- result.put("price", price);
- return result;
- }
-
- private Record convertNested(Record inputRecord) {
- Record result = new Record(
- TestAvroRecordConverter.UNNESTED_OUTPUT_SCHEMA);
- result.put("l1", inputRecord.get("l1"));
- result.put("s1", Long.parseLong(inputRecord.get("s1").toString()));
- if (inputRecord.get("parent") != null) {
- // output schema doesn't have parent name.
- result.put("parentId",
- ((Record) inputRecord.get("parent")).get("id"));
- }
- return result;
- }
-
- private Record dataNested(long id, String companyName, Long parentId,
- String parentName) {
- Record result = new Record(TestAvroRecordConverter.NESTED_RECORD_SCHEMA);
- result.put("l1", id);
- result.put("s1", companyName);
- if (parentId != null || parentName != null) {
- Record parent = new Record(
- TestAvroRecordConverter.NESTED_PARENT_SCHEMA);
- parent.put("id", parentId);
- parent.put("name", parentName);
- result.put("parent", parent);
- }
- return result;
- }
-}
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestGetSchema.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestGetSchema.java
deleted file mode 100644
index 9354e8f..0000000
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestGetSchema.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nifi.processors.kite;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.charset.Charset;
-
-import org.apache.avro.Schema;
-import org.apache.avro.SchemaBuilder;
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.kitesdk.data.DatasetDescriptor;
-import org.kitesdk.data.Datasets;
-import org.kitesdk.data.spi.DefaultConfiguration;
-
-import static org.apache.nifi.processors.kite.TestUtil.bytesFor;
-
-public class TestGetSchema {
-
- public static final Schema SCHEMA = SchemaBuilder.record("Test").fields()
- .requiredLong("id")
- .requiredString("color")
- .optionalDouble("price")
- .endRecord();
-
- @Rule
- public TemporaryFolder temp = new TemporaryFolder();
-
- @Test
- @Ignore("Does not work on windows")
- public void testSchemaFromFileSystem() throws IOException {
- File schemaFile = temp.newFile("schema.avsc");
- FileOutputStream out = new FileOutputStream(schemaFile);
- out.write(bytesFor(SCHEMA.toString(), Charset.forName("utf8")));
- out.close();
-
- Schema schema = AbstractKiteProcessor.getSchema(
- schemaFile.toString(), DefaultConfiguration.get());
-
- Assert.assertEquals("Schema from file should match", SCHEMA, schema);
- }
-
- @Test
- @Ignore("Does not work on windows")
- public void testSchemaFromKiteURIs() throws IOException {
- String location = temp.newFolder("ns", "temp").toString();
- if (location.endsWith("/")) {
- location = location.substring(0, location.length() - 1);
- }
- String datasetUri = "dataset:" + location;
- DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
- .schema(SCHEMA)
- .build();
-
- Datasets.create(datasetUri, descriptor);
-
- Schema schema = AbstractKiteProcessor.getSchema(
- datasetUri, DefaultConfiguration.get());
- Assert.assertEquals("Schema from dataset URI should match", SCHEMA, schema);
-
- schema = AbstractKiteProcessor.getSchema(
- "view:file:" + location + "?color=orange", DefaultConfiguration.get());
- Assert.assertEquals("Schema from view URI should match", SCHEMA, schema);
- }
-
- @Test
- public void testSchemaFromResourceURI() throws IOException {
- DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
- .schemaUri("resource:schema/user.avsc") // in kite-data-core test-jar
- .build();
- Schema expected = descriptor.getSchema();
-
- Schema schema = AbstractKiteProcessor.getSchema(
- "resource:schema/user.avsc", DefaultConfiguration.get());
-
- Assert.assertEquals("Schema from resource URI should match",
- expected, schema);
- }
-}
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestInferAvroSchema.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestInferAvroSchema.java
deleted file mode 100644
index d76214a..0000000
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestInferAvroSchema.java
+++ /dev/null
@@ -1,340 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nifi.processors.kite;
-
-import org.apache.commons.lang3.SystemUtils;
-import java.io.BufferedInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.StringWriter;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.nio.file.StandardOpenOption;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.Assume;
-import org.junit.BeforeClass;
-
-public class TestInferAvroSchema {
-
- @BeforeClass
- public static void setupClass() {
- Assume.assumeTrue("Test only runs on *nix", !SystemUtils.IS_OS_WINDOWS);
- }
-
- private TestRunner runner = null;
-
- @Before
- public void setup() {
- runner = TestRunners.newTestRunner(InferAvroSchema.class);
-
- // Prepare the common setup.
- runner.assertNotValid();
-
- runner.setProperty(InferAvroSchema.INPUT_CONTENT_TYPE, InferAvroSchema.USE_MIME_TYPE);
- runner.setProperty(InferAvroSchema.GET_CSV_HEADER_DEFINITION_FROM_INPUT, "true");
- runner.setProperty(InferAvroSchema.SCHEMA_DESTINATION, InferAvroSchema.DESTINATION_CONTENT);
- runner.setProperty(InferAvroSchema.HEADER_LINE_SKIP_COUNT, "0");
- runner.setProperty(InferAvroSchema.ESCAPE_STRING, "\\");
- runner.setProperty(InferAvroSchema.QUOTE_STRING, "'");
- runner.setProperty(InferAvroSchema.RECORD_NAME, "org.apache.nifi.contact");
- runner.setProperty(InferAvroSchema.CHARSET, "UTF-8");
- runner.setProperty(InferAvroSchema.PRETTY_AVRO_OUTPUT, "true");
- }
-
- @Test
- public void testRecordName() throws Exception {
-
- // Dot at the end is invalid
- runner.setProperty(InferAvroSchema.RECORD_NAME, "org.apache.nifi.contact.");
- runner.assertNotValid();
- // Dashes are invalid
- runner.setProperty(InferAvroSchema.RECORD_NAME, "avro-schema");
- runner.assertNotValid();
- // Name cannot start with a digit
- runner.setProperty(InferAvroSchema.RECORD_NAME, "1Record");
- runner.assertNotValid();
- // Name cannot start with a dot
- runner.setProperty(InferAvroSchema.RECORD_NAME, ".record");
- runner.assertNotValid();
-
- runner.setProperty(InferAvroSchema.RECORD_NAME, "avro_schema");
- runner.assertValid();
- runner.setProperty(InferAvroSchema.RECORD_NAME, "org.apache.nifi.contact");
- runner.assertValid();
- runner.setProperty(InferAvroSchema.RECORD_NAME, "${filename}"); // EL is valid, although its value may not be when evaluated
- runner.assertValid();
- }
-
- @Test
- public void inferAvroSchemaFromHeaderDefinitionOfCSVFile() throws Exception {
-
- runner.assertValid();
-
- Map<String, String> attributes = new HashMap<>();
- attributes.put(CoreAttributes.MIME_TYPE.key(), "text/csv");
- runner.enqueue(new File("src/test/resources/Shapes_Header.csv").toPath(), attributes);
-
- runner.run();
- runner.assertTransferCount(InferAvroSchema.REL_UNSUPPORTED_CONTENT, 0);
- runner.assertTransferCount(InferAvroSchema.REL_FAILURE, 0);
- runner.assertTransferCount(InferAvroSchema.REL_ORIGINAL, 1);
- runner.assertTransferCount(InferAvroSchema.REL_SUCCESS, 1);
-
- MockFlowFile flowFile = runner.getFlowFilesForRelationship(InferAvroSchema.REL_SUCCESS).get(0);
- flowFile.assertContentEquals(unix2PlatformSpecificLineEndings(new File("src/test/resources/Shapes_header.csv.avro")));
- flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/avro-binary");
- }
-
- @Test
- public void inferAvroSchemaFromJSONFile() throws Exception {
-
- runner.assertValid();
-
- runner.setProperty(InferAvroSchema.INPUT_CONTENT_TYPE, InferAvroSchema.USE_MIME_TYPE);
-
- // Purposely set to True to test that none of the JSON file is read which would cause issues.
- runner.setProperty(InferAvroSchema.GET_CSV_HEADER_DEFINITION_FROM_INPUT, "true");
- runner.setProperty(InferAvroSchema.SCHEMA_DESTINATION, InferAvroSchema.DESTINATION_ATTRIBUTE);
-
- Map<String, String> attributes = new HashMap<>();
- attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
- runner.enqueue(new File("src/test/resources/Shapes.json").toPath(), attributes);
-
- runner.run();
- runner.assertTransferCount(InferAvroSchema.REL_UNSUPPORTED_CONTENT, 0);
- runner.assertTransferCount(InferAvroSchema.REL_FAILURE, 0);
- runner.assertTransferCount(InferAvroSchema.REL_ORIGINAL, 1);
- runner.assertTransferCount(InferAvroSchema.REL_SUCCESS, 1);
-
- MockFlowFile data = runner.getFlowFilesForRelationship(InferAvroSchema.REL_SUCCESS).get(0);
- String avroSchema = data.getAttribute(InferAvroSchema.AVRO_SCHEMA_ATTRIBUTE_NAME);
- String knownSchema = new String(unix2PlatformSpecificLineEndings(new File("src/test/resources/Shapes.json.avro")), StandardCharsets.UTF_8);
- Assert.assertEquals(avroSchema, knownSchema);
-
- // Since that avro schema is written to an attribute this should be teh same as the original
- data.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json");
- }
-
- @Test
- public void inferAvroSchemaFromCSVFile() throws Exception {
-
- runner.assertValid();
-
- // Read in the header
- StringWriter writer = new StringWriter();
- IOUtils.copy((Files.newInputStream(Paths.get("src/test/resources/ShapesHeader.csv"), StandardOpenOption.READ)), writer, "UTF-8");
- runner.setProperty(InferAvroSchema.CSV_HEADER_DEFINITION, writer.toString());
- runner.setProperty(InferAvroSchema.GET_CSV_HEADER_DEFINITION_FROM_INPUT, "false");
-
- Map<String, String> attributes = new HashMap<>();
- attributes.put(CoreAttributes.MIME_TYPE.key(), "text/csv");
- runner.enqueue(new File("src/test/resources/Shapes_NoHeader.csv").toPath(), attributes);
-
- runner.run();
- runner.assertTransferCount(InferAvroSchema.REL_UNSUPPORTED_CONTENT, 0);
- runner.assertTransferCount(InferAvroSchema.REL_FAILURE, 0);
- runner.assertTransferCount(InferAvroSchema.REL_ORIGINAL, 1);
- runner.assertTransferCount(InferAvroSchema.REL_SUCCESS, 1);
-
- MockFlowFile data = runner.getFlowFilesForRelationship(InferAvroSchema.REL_SUCCESS).get(0);
- data.assertContentEquals(unix2PlatformSpecificLineEndings(new File("src/test/resources/Shapes_header.csv.avro")));
- data.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/avro-binary");
- }
-
- @Test
- public void inferSchemaFormHeaderLinePropertyOfProcessor() throws Exception {
-
- final String CSV_HEADER_LINE = FileUtils.readFileToString(new File("src/test/resources/ShapesHeader.csv"));
-
- runner.assertValid();
-
- runner.setProperty(InferAvroSchema.GET_CSV_HEADER_DEFINITION_FROM_INPUT, "false");
- runner.setProperty(InferAvroSchema.CSV_HEADER_DEFINITION, CSV_HEADER_LINE);
- runner.setProperty(InferAvroSchema.HEADER_LINE_SKIP_COUNT, "1");
-
- runner.assertValid();
-
- Map<String, String> attributes = new HashMap<>();
- attributes.put(CoreAttributes.MIME_TYPE.key(), "text/csv");
- runner.enqueue((CSV_HEADER_LINE + "\nJane,Doe,29,55555").getBytes(), attributes);
-
- runner.run();
- runner.assertTransferCount(InferAvroSchema.REL_FAILURE, 0);
- runner.assertTransferCount(InferAvroSchema.REL_ORIGINAL, 1);
- runner.assertTransferCount(InferAvroSchema.REL_SUCCESS, 1);
-
- MockFlowFile data = runner.getFlowFilesForRelationship(InferAvroSchema.REL_SUCCESS).get(0);
- data.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/avro-binary");
- }
-
- @Test
- public void inferSchemaFromEmptyContent() throws Exception {
- runner.assertValid();
-
- Map<String, String> attributes = new HashMap<>();
- attributes.put(CoreAttributes.MIME_TYPE.key(), "text/csv");
- runner.enqueue("", attributes);
-
- runner.run();
- runner.assertTransferCount(InferAvroSchema.REL_FAILURE, 1);
- runner.assertTransferCount(InferAvroSchema.REL_ORIGINAL, 0);
- runner.assertTransferCount(InferAvroSchema.REL_SUCCESS, 0);
- }
-
- @Test
- public void inferAvroSchemaFromHeaderDefinitionOfCSVTabDelimitedFile() throws Exception {
-
- runner.setProperty(InferAvroSchema.DELIMITER, "\\t");
- runner.assertValid();
-
- Map<String, String> attributes = new HashMap<>();
- attributes.put(CoreAttributes.MIME_TYPE.key(), "text/csv");
- runner.enqueue(new File("src/test/resources/Shapes_Header_TabDelimited.csv").toPath(), attributes);
-
- runner.run();
- runner.assertTransferCount(InferAvroSchema.REL_UNSUPPORTED_CONTENT, 0);
- runner.assertTransferCount(InferAvroSchema.REL_FAILURE, 0);
- runner.assertTransferCount(InferAvroSchema.REL_ORIGINAL, 1);
- runner.assertTransferCount(InferAvroSchema.REL_SUCCESS, 1);
-
- MockFlowFile flowFile = runner.getFlowFilesForRelationship(InferAvroSchema.REL_SUCCESS).get(0);
- flowFile.assertContentEquals(unix2PlatformSpecificLineEndings(new File("src/test/resources/Shapes_header.csv.avro")));
- flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/avro-binary");
- }
-
- @Test
- public void inferAvroSchemaFromHeaderDefinitionOfCSVTabDelimitedFileNegativeTest() throws Exception {
-
- // Inproper InferAvroSchema.DELIMITER > original goes to InferAvroSchema.REL_FAILURE
- runner.setProperty(InferAvroSchema.DELIMITER, ";");
- runner.assertValid();
-
- Map<String, String> attributes = new HashMap<>();
- attributes.put(CoreAttributes.MIME_TYPE.key(), "text/csv");
- runner.enqueue(new File("src/test/resources/Shapes_Header_TabDelimited.csv").toPath(), attributes);
-
- runner.run();
- runner.assertTransferCount(InferAvroSchema.REL_UNSUPPORTED_CONTENT, 0);
- runner.assertTransferCount(InferAvroSchema.REL_FAILURE, 1);
- runner.assertTransferCount(InferAvroSchema.REL_ORIGINAL, 0);
- runner.assertTransferCount(InferAvroSchema.REL_SUCCESS, 0);
-
- MockFlowFile flowFile = runner.getFlowFilesForRelationship(InferAvroSchema.REL_FAILURE).get(0);
- flowFile.assertContentEquals(new File("src/test/resources/Shapes_Header_TabDelimited.csv").toPath());
- flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "text/csv");
- }
-
-
- @Test
- public void specifyCSVparametersInExpressionLanguage() throws Exception {
- runner.setProperty(InferAvroSchema.DELIMITER, "${csv.delimiter}");
- runner.setProperty(InferAvroSchema.ESCAPE_STRING, "${csv.escape}");
- runner.setProperty(InferAvroSchema.QUOTE_STRING, "${csv.quote}");
- runner.setProperty(InferAvroSchema.CHARSET, "${csv.charset}");
- runner.setProperty(InferAvroSchema.GET_CSV_HEADER_DEFINITION_FROM_INPUT, "true");
-
- runner.assertValid();
-
- @SuppressWarnings("serial")
- Map<String, String> attributes = new HashMap<String, String>() {
- {
- put("csv.delimiter",",");
- put("csv.escape", "\\");
- put("csv.quote", "\"");
- put("csv.charset", "UTF-8");
- put(CoreAttributes.MIME_TYPE.key(), "text/csv");
- }
- };
-
- runner.enqueue(new File("src/test/resources/Shapes_Header.csv").toPath(), attributes);
-
- runner.run();
- runner.assertTransferCount(InferAvroSchema.REL_UNSUPPORTED_CONTENT, 0);
- runner.assertTransferCount(InferAvroSchema.REL_FAILURE, 0);
- runner.assertTransferCount(InferAvroSchema.REL_ORIGINAL, 1);
- runner.assertTransferCount(InferAvroSchema.REL_SUCCESS, 1);
-
- MockFlowFile flowFile = runner.getFlowFilesForRelationship(InferAvroSchema.REL_SUCCESS).get(0);
- flowFile.assertContentEquals(unix2PlatformSpecificLineEndings(new File("src/test/resources/Shapes_header.csv.avro")));
- flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/avro-binary");
-
- }
-
- @Test
- public void specifyJsonParametersInExpressionLanguage() throws Exception {
- runner.assertValid();
- runner.setProperty(InferAvroSchema.INPUT_CONTENT_TYPE, InferAvroSchema.USE_MIME_TYPE);
-
- // Purposely set to True to test that none of the JSON file is read which would cause issues.
- runner.setProperty(InferAvroSchema.GET_CSV_HEADER_DEFINITION_FROM_INPUT, "true");
- runner.setProperty(InferAvroSchema.SCHEMA_DESTINATION, InferAvroSchema.DESTINATION_ATTRIBUTE);
- runner.setProperty(InferAvroSchema.RECORD_NAME, "${record.name}");
- runner.setProperty(InferAvroSchema.NUM_RECORDS_TO_ANALYZE, "${records.analyze}");
-
- Map<String, String> attributes = new HashMap<>();
- attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
- attributes.put("record.name", "myrecord");
- attributes.put("records.analyze", "2");
- runner.enqueue(new File("src/test/resources/Shapes.json").toPath(), attributes);
-
- runner.run();
- runner.assertTransferCount(InferAvroSchema.REL_UNSUPPORTED_CONTENT, 0);
- runner.assertTransferCount(InferAvroSchema.REL_FAILURE, 0);
- runner.assertTransferCount(InferAvroSchema.REL_ORIGINAL, 1);
- runner.assertTransferCount(InferAvroSchema.REL_SUCCESS, 1);
-
- MockFlowFile data = runner.getFlowFilesForRelationship(InferAvroSchema.REL_SUCCESS).get(0);
- String avroSchema = data.getAttribute(InferAvroSchema.AVRO_SCHEMA_ATTRIBUTE_NAME);
- Assert.assertTrue(avroSchema.contains("\"name\" : \"myrecord\""));
-
- // Since that avro schema is written to an attribute this should be teh same as the original
- data.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json");
- }
-
-
- static byte[] unix2PlatformSpecificLineEndings(final File file) throws IOException {
- try (final BufferedInputStream in = new BufferedInputStream(new FileInputStream(file)); final ByteArrayOutputStream out = new ByteArrayOutputStream()) {
- byte eol[] = System.lineSeparator().getBytes(StandardCharsets.UTF_8);
- int justRead;
- while ((justRead = in.read()) != -1) {
- if (justRead == '\n') {
- out.write(eol);
- } else {
- out.write(justRead);
- }
- }
- return out.toByteArray();
- }
- }
-
-}
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java
deleted file mode 100644
index 776e2f3..0000000
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nifi.processors.kite;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-
-import org.apache.avro.Schema;
-import org.apache.avro.SchemaBuilder;
-import org.apache.nifi.processors.kite.AbstractKiteConvertProcessor.CodecType;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Assert;
-import org.junit.Test;
-
-import static org.apache.nifi.processors.kite.TestUtil.streamFor;
-
-public class TestJSONToAvroProcessor {
-
- public static final Schema SCHEMA = SchemaBuilder.record("Test").fields()
- .requiredLong("id")
- .requiredString("color")
- .optionalDouble("price")
- .endRecord();
-
- public static final String JSON_CONTENT = ""
- + "{\"id\": 1,\"color\": \"green\"}"
- + "{\"id\": \"120V\", \"color\": \"blue\"}\n" // invalid, ID is a string
- + "{\"id\": 10, \"color\": 15.23}\n" // invalid, color as double
- + "{\"id\": 20, \"color\": 34}\n" // invalid, color as int
- + "{\"id\": 2, \"color\": \"grey\", \"price\": 12.95 }";
-
- public static final String FAILURE_CONTENT = ""
- + "{\"id\": \"120V\", \"color\": \"blue\"}\n"
- + "{\"id\": 10, \"color\": 15.23}\n"
- + "{\"id\": 20, \"color\": 34}\n";
-
- public static final String FAILURE_SUMMARY = ""
- + "Cannot convert field id: Cannot convert to long: \"120V\", "
- + "Cannot convert field color: Cannot convert to string: 15.23 (1 similar failure)";
-
- @Test
- public void testBasicConversion() throws IOException {
- TestRunner runner = TestRunners.newTestRunner(ConvertJSONToAvro.class);
- runner.assertNotValid();
- runner.setProperty(ConvertJSONToAvro.SCHEMA, SCHEMA.toString());
- runner.assertValid();
-
- runner.enqueue(streamFor(JSON_CONTENT));
- runner.run();
-
- long converted = runner.getCounterValue("Converted records");
- long errors = runner.getCounterValue("Conversion errors");
- Assert.assertEquals("Should convert 2 rows", 2, converted);
- Assert.assertEquals("Should reject 3 rows", 3, errors);
-
- runner.assertTransferCount("success", 1);
- runner.assertTransferCount("failure", 0);
- runner.assertTransferCount("incompatible", 1);
-
- MockFlowFile incompatible = runner.getFlowFilesForRelationship("incompatible").get(0);
- String failureContent = new String(runner.getContentAsByteArray(incompatible),
- StandardCharsets.UTF_8);
- Assert.assertEquals("Should reject an invalid string and double",
- JSON_CONTENT, failureContent);
- Assert.assertEquals("Should accumulate error messages",
- FAILURE_SUMMARY, incompatible.getAttribute("errors"));
- }
-
- @Test
- public void testBasicConversionWithCompression() throws IOException {
- TestRunner runner = TestRunners.newTestRunner(ConvertJSONToAvro.class);
- runner.assertNotValid();
- runner.setProperty(ConvertJSONToAvro.SCHEMA, SCHEMA.toString());
- runner.setProperty(AbstractKiteConvertProcessor.COMPRESSION_TYPE, CodecType.NONE.toString());
- runner.assertValid();
-
- runner.enqueue(streamFor(JSON_CONTENT));
- runner.run();
-
- long converted = runner.getCounterValue("Converted records");
- long errors = runner.getCounterValue("Conversion errors");
- Assert.assertEquals("Should convert 2 rows", 2, converted);
- Assert.assertEquals("Should reject 3 rows", 3, errors);
-
- runner.assertTransferCount("success", 1);
- runner.assertTransferCount("failure", 0);
- runner.assertTransferCount("incompatible", 1);
-
- MockFlowFile incompatible = runner.getFlowFilesForRelationship("incompatible").get(0);
- String failureContent = new String(runner.getContentAsByteArray(incompatible),
- StandardCharsets.UTF_8);
- Assert.assertEquals("Should reject an invalid string and double",
- JSON_CONTENT, failureContent);
- Assert.assertEquals("Should accumulate error messages",
- FAILURE_SUMMARY, incompatible.getAttribute("errors"));
- }
-
- @Test
- public void testOnlyErrors() throws IOException {
- TestRunner runner = TestRunners.newTestRunner(ConvertJSONToAvro.class);
- runner.assertNotValid();
- runner.setProperty(ConvertJSONToAvro.SCHEMA, SCHEMA.toString());
- runner.assertValid();
-
- runner.enqueue(streamFor(FAILURE_CONTENT));
- runner.run();
-
- long converted = runner.getCounterValue("Converted records");
- long errors = runner.getCounterValue("Conversion errors");
- Assert.assertEquals("Should convert 0 rows", 0, converted);
- Assert.assertEquals("Should reject 1 row", 3, errors);
-
- runner.assertTransferCount("success", 0);
- runner.assertTransferCount("failure", 1);
- runner.assertTransferCount("incompatible", 0);
-
- MockFlowFile incompatible = runner.getFlowFilesForRelationship("failure").get(0);
- Assert.assertEquals("Should set an error message",
- FAILURE_SUMMARY, incompatible.getAttribute("errors"));
- }
-
- @Test
- public void testEmptyContent() throws IOException {
- TestRunner runner = TestRunners.newTestRunner(ConvertJSONToAvro.class);
- runner.assertNotValid();
- runner.setProperty(ConvertJSONToAvro.SCHEMA, SCHEMA.toString());
- runner.assertValid();
-
- runner.enqueue(streamFor(""));
- runner.run();
-
- long converted = runner.getCounterValue("Converted records");
- long errors = runner.getCounterValue("Conversion errors");
- Assert.assertEquals("Should convert 0 rows", 0, converted);
- Assert.assertEquals("Should reject 0 row", 0, errors);
-
- runner.assertTransferCount("success", 0);
- runner.assertTransferCount("failure", 1);
- runner.assertTransferCount("incompatible", 0);
-
- MockFlowFile incompatible = runner.getFlowFilesForRelationship("failure").get(0);
- Assert.assertEquals("Should set an error message",
- "No incoming records", incompatible.getAttribute("errors"));
- }
-
- @Test
- public void testBasicConversionNoErrors() throws IOException {
- TestRunner runner = TestRunners.newTestRunner(ConvertJSONToAvro.class);
- runner.assertNotValid();
- runner.setProperty(ConvertJSONToAvro.SCHEMA, SCHEMA.toString());
- runner.assertValid();
-
- runner.enqueue(streamFor(
- "{\"id\": 1,\"color\": \"green\"}\n" +
- "{\"id\": 2, \"color\": \"grey\", \"price\": 12.95 }"));
- runner.run();
-
- long converted = runner.getCounterValue("Converted records");
- long errors = runner.getCounterValue("Conversion errors");
- Assert.assertEquals("Should convert 2 rows", 2, converted);
- Assert.assertEquals("Should reject 0 row", 0, errors);
-
- runner.assertTransferCount("success", 1);
- runner.assertTransferCount("failure", 0);
- runner.assertTransferCount("incompatible", 0);
- }
-}
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestKiteProcessorsCluster.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestKiteProcessorsCluster.java
deleted file mode 100644
index 087e1cb..0000000
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestKiteProcessorsCluster.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nifi.processors.kite;
-
-import com.google.common.collect.Lists;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.charset.Charset;
-import java.util.List;
-
-import org.apache.avro.Schema;
-import org.apache.avro.SchemaBuilder;
-import org.apache.avro.generic.GenericData.Record;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.kitesdk.data.Dataset;
-import org.kitesdk.data.DatasetDescriptor;
-import org.kitesdk.data.Datasets;
-import org.kitesdk.data.spi.DefaultConfiguration;
-import org.kitesdk.minicluster.HdfsService;
-import org.kitesdk.minicluster.HiveService;
-import org.kitesdk.minicluster.MiniCluster;
-
-import static org.apache.nifi.processors.kite.TestUtil.USER_SCHEMA;
-import static org.apache.nifi.processors.kite.TestUtil.bytesFor;
-import static org.apache.nifi.processors.kite.TestUtil.streamFor;
-import static org.apache.nifi.processors.kite.TestUtil.user;
-
-@Ignore("Does not work on windows")
-public class TestKiteProcessorsCluster {
-
- public static MiniCluster cluster = null;
- public static DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
- .schema(USER_SCHEMA)
- .build();
-
- @BeforeClass
- public static void startCluster() throws IOException, InterruptedException {
- long rand = Math.abs((long) (Math.random() * 1000000));
- cluster = new MiniCluster.Builder()
- .workDir("/tmp/minicluster-" + rand)
- .clean(true)
- .addService(HdfsService.class)
- .addService(HiveService.class)
- .bindIP("127.0.0.1")
- .hiveMetastorePort(9083)
- .build();
- cluster.start();
- }
-
- @AfterClass
- public static void stopCluster() throws IOException, InterruptedException {
- if (cluster != null) {
- cluster.stop();
- cluster = null;
- }
- }
-
- @Test
- public void testBasicStoreToHive() throws IOException {
- String datasetUri = "dataset:hive:ns/test";
-
- Dataset<Record> dataset = Datasets.create(datasetUri, descriptor, Record.class);
-
- TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
- runner.assertNotValid();
-
- runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri);
- runner.assertValid();
-
- List<Record> users = Lists.newArrayList(
- user("a", "a@example.com"),
- user("b", "b@example.com"),
- user("c", "c@example.com")
- );
-
- runner.enqueue(streamFor(users));
- runner.run();
-
- runner.assertAllFlowFilesTransferred("success", 1);
- List<Record> stored = Lists.newArrayList(
- (Iterable<Record>) dataset.newReader());
- Assert.assertEquals("Records should match", users, stored);
-
- Datasets.delete(datasetUri);
- }
-
- @Test
- public void testSchemaFromDistributedFileSystem() throws IOException {
- Schema expected = SchemaBuilder.record("Test").fields()
- .requiredLong("id")
- .requiredString("color")
- .optionalDouble("price")
- .endRecord();
-
- Path schemaPath = new Path("hdfs:/tmp/schema.avsc");
- FileSystem fs = schemaPath.getFileSystem(DefaultConfiguration.get());
- OutputStream out = fs.create(schemaPath);
- out.write(bytesFor(expected.toString(), Charset.forName("utf8")));
- out.close();
-
- Schema schema = AbstractKiteProcessor.getSchema(
- schemaPath.toString(), DefaultConfiguration.get());
-
- Assert.assertEquals("Schema from file should match", expected, schema);
- }
-}
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestKiteStorageProcessor.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestKiteStorageProcessor.java
deleted file mode 100644
index 3fcae4f..0000000
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestKiteStorageProcessor.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nifi.processors.kite;
-
-import com.google.common.collect.Lists;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.avro.Schema;
-import org.apache.avro.SchemaBuilder;
-import org.apache.avro.generic.GenericData.Record;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.kitesdk.data.Dataset;
-import org.kitesdk.data.DatasetDescriptor;
-import org.kitesdk.data.Datasets;
-
-import static org.apache.nifi.processors.kite.TestUtil.invalidStreamFor;
-import static org.apache.nifi.processors.kite.TestUtil.streamFor;
-import static org.apache.nifi.processors.kite.TestUtil.user;
-
-@Ignore("Does not work on windows")
-public class TestKiteStorageProcessor {
-
- @Rule
- public TemporaryFolder temp = new TemporaryFolder();
-
- private String datasetUri = null;
- private Dataset<Record> dataset = null;
-
- @Before
- public void createDataset() throws Exception {
- DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
- .schema(TestUtil.USER_SCHEMA)
- .build();
- this.datasetUri = "dataset:file:" + temp.newFolder("ns", "temp").toString();
- this.dataset = Datasets.create(datasetUri, descriptor, Record.class);
- }
-
- @After
- public void deleteDataset() throws Exception {
- Datasets.delete(datasetUri);
- }
-
- @Test
- public void testBasicStore() throws IOException {
- TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
- runner.assertNotValid();
-
- runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri);
- runner.assertValid();
-
- List<Record> users = Lists.newArrayList(
- user("a", "a@example.com"),
- user("b", "b@example.com"),
- user("c", "c@example.com")
- );
-
- runner.enqueue(streamFor(users));
- runner.run();
-
- runner.assertAllFlowFilesTransferred("success", 1);
- runner.assertQueueEmpty();
- Assert.assertEquals("Should store 3 values",
- 3, (long) runner.getCounterValue("Stored records"));
-
- List<Record> stored = Lists.newArrayList(
- (Iterable<Record>) dataset.newReader());
- Assert.assertEquals("Records should match", users, stored);
- }
-
- @Test
- public void testViewURI() {
- TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
- runner.setProperty(
- StoreInKiteDataset.KITE_DATASET_URI, "view:hive:ns/table?year=2015");
- runner.assertValid();
- }
-
- @Test
- public void testInvalidURI() {
- TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
- runner.setProperty(
- StoreInKiteDataset.KITE_DATASET_URI, "dataset:unknown");
- runner.assertNotValid();
- }
-
- @Test
- public void testUnreadableContent() throws IOException {
- TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
- runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri);
- runner.assertValid();
-
- runner.enqueue(invalidStreamFor(user("a", "a@example.com")));
- runner.run();
-
- runner.assertAllFlowFilesTransferred("failure", 1);
- }
-
- @Test
- public void testCorruptedBlocks() throws IOException {
- TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
- runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri);
- runner.assertValid();
-
- List<Record> records = Lists.newArrayList();
- for (int i = 0; i < 10000; i += 1) {
- String num = String.valueOf(i);
- records.add(user(num, num + "@example.com"));
- }
-
- runner.enqueue(invalidStreamFor(records));
- runner.run();
-
- long stored = runner.getCounterValue("Stored records");
- Assert.assertTrue("Should store some readable values",
- 0 < stored && stored < 10000);
-
- runner.assertAllFlowFilesTransferred("success", 1);
- }
-
- @Test
- public void testIncompatibleSchema() throws IOException {
- Schema incompatible = SchemaBuilder.record("User").fields()
- .requiredLong("id")
- .requiredString("username")
- .optionalString("email") // the dataset requires this field
- .endRecord();
-
- // this user has the email field and could be stored, but the schema is
- // still incompatible so the entire stream is rejected
- Record incompatibleUser = new Record(incompatible);
- incompatibleUser.put("id", 1L);
- incompatibleUser.put("username", "a");
- incompatibleUser.put("email", "a@example.com");
-
- TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
- runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri);
- runner.assertValid();
-
- runner.enqueue(streamFor(incompatibleUser));
- runner.run();
-
- runner.assertAllFlowFilesTransferred("incompatible", 1);
- }
-}
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestUtil.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestUtil.java
deleted file mode 100644
index 37ddbec..0000000
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestUtil.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nifi.processors.kite;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-import java.nio.charset.CharacterCodingException;
-import java.nio.charset.Charset;
-import java.nio.charset.CharsetEncoder;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.avro.Schema;
-import org.apache.avro.SchemaBuilder;
-import org.apache.avro.file.CodecFactory;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericData.Record;
-
-public class TestUtil {
-
- public static final Schema USER_SCHEMA = SchemaBuilder.record("User").fields()
- .requiredString("username")
- .requiredString("email")
- .endRecord();
-
- public static Record user(String username, String email) {
- Record user = new Record(USER_SCHEMA);
- user.put("username", username);
- user.put("email", email);
- return user;
- }
-
- public static InputStream streamFor(Record... records) throws IOException {
- return streamFor(Arrays.asList(records));
- }
-
- public static InputStream streamFor(List<Record> records) throws IOException {
- return new ByteArrayInputStream(bytesFor(records));
- }
-
- public static InputStream invalidStreamFor(Record... records) throws IOException {
- return invalidStreamFor(Arrays.asList(records));
- }
-
- public static InputStream invalidStreamFor(List<Record> records) throws IOException {
- // purposely truncate the content
- byte[] bytes = bytesFor(records);
- return new ByteArrayInputStream(bytes, 0, bytes.length / 2);
- }
-
- private static byte[] bytesFor(List<Record> records) throws IOException {
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- DataFileWriter<Record> writer = new DataFileWriter<>(
- AvroUtil.newDatumWriter(records.get(0).getSchema(), Record.class));
- writer.setCodec(CodecFactory.snappyCodec());
- writer = writer.create(records.get(0).getSchema(), out);
-
- for (Record record : records) {
- writer.append(record);
- }
-
- writer.flush();
-
- return out.toByteArray();
- }
-
- public static InputStream streamFor(String content) throws CharacterCodingException {
- return streamFor(content, Charset.forName("utf8"));
- }
-
- public static InputStream streamFor(String content, Charset charset) throws CharacterCodingException {
- return new ByteArrayInputStream(bytesFor(content, charset));
- }
-
- public static byte[] bytesFor(String content, Charset charset) throws CharacterCodingException {
- CharBuffer chars = CharBuffer.wrap(content);
- CharsetEncoder encoder = charset.newEncoder();
- ByteBuffer buffer = encoder.encode(chars);
- byte[] bytes = new byte[buffer.remaining()];
- buffer.get(bytes);
- return bytes;
- }
-
-}
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/Shapes.json b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/Shapes.json
deleted file mode 100644
index cf56f5b..0000000
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/Shapes.json
+++ /dev/null
@@ -1,10 +0,0 @@
-{
- "shapes":
- [
- {"shape": "circle", "color": "red", "width": 100, "height": 100},
- {"shape": "square", "color": "red", "width": 100, "height": 100},
- {"shape": "sphere", "color": "red", "width": 100, "height": 100},
- {"shape": "triangle", "color": "red", "width": 100, "height": 100},
- {"shape": "rectangle", "color": "red", "width": 100, "height": 100}
- ]
-}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/Shapes.json.avro b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/Shapes.json.avro
deleted file mode 100644
index cdbf048..0000000
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/Shapes.json.avro
+++ /dev/null
@@ -1,34 +0,0 @@
-{
- "type" : "record",
- "name" : "contact",
- "namespace" : "org.apache.nifi",
- "fields" : [ {
- "name" : "shapes",
- "type" : {
- "type" : "array",
- "items" : {
- "type" : "record",
- "name" : "shapes",
- "namespace" : "",
- "fields" : [ {
- "name" : "shape",
- "type" : "string",
- "doc" : "Type inferred from '\"circle\"'"
- }, {
- "name" : "color",
- "type" : "string",
- "doc" : "Type inferred from '\"red\"'"
- }, {
- "name" : "width",
- "type" : "int",
- "doc" : "Type inferred from '100'"
- }, {
- "name" : "height",
- "type" : "int",
- "doc" : "Type inferred from '100'"
- } ]
- }
- },
- "doc" : "Type inferred from '[{\"shape\":\"circle\",\"color\":\"red\",\"width\":100,\"height\":100},{\"shape\":\"square\",\"color\":\"red\",\"width\":100,\"height\":100},{\"shape\":\"sphere\",\"color\":\"red\",\"width\":100,\"height\":100},{\"shape\":\"triangle\",\"color\":\"red\",\"width\":100,\"height\":100},{\"shape\":\"rectangle\",\"color\":\"red\",\"width\":100,\"height\":100}]'"
- } ]
-}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/ShapesHeader.csv b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/ShapesHeader.csv
deleted file mode 100644
index d2dbc6d..0000000
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/ShapesHeader.csv
+++ /dev/null
@@ -1 +0,0 @@
-shape,color,width,height
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/Shapes_Header.csv b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/Shapes_Header.csv
deleted file mode 100644
index 5d4aeb0..0000000
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/Shapes_Header.csv
+++ /dev/null
@@ -1,352 +0,0 @@
-shape,color,width,height
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/Shapes_Header_TabDelimited.csv b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/Shapes_Header_TabDelimited.csv
deleted file mode 100644
index 7ffa4f9..0000000
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/Shapes_Header_TabDelimited.csv
+++ /dev/null
@@ -1,352 +0,0 @@
-shape color width height
-circle red 100 100
-square red 100 100
-sphere red 100 100
-triangle red 100 100
-rectangle red 100 100
-circle red 100 100
-sphere red 100 100
-circle red 100 100
-circle red 100 100
-triangle red 100 100
-cone red 100 100
-circle red 100 100
-rectangle red 100 100
-circle red 100 100
-square red 100 100
-sphere red 100 100
-triangle red 100 100
-rectangle red 100 100
-circle red 100 100
-sphere red 100 100
-circle red 100 100
-circle red 100 100
-triangle red 100 100
-cone red 100 100
-circle red 100 100
-rectangle red 100 100
-circle red 100 100
-square red 100 100
-sphere red 100 100
-triangle red 100 100
-rectangle red 100 100
-circle red 100 100
-sphere red 100 100
-circle red 100 100
-circle red 100 100
-triangle red 100 100
-cone red 100 100
-circle red 100 100
-rectangle red 100 100
-circle red 100 100
-square red 100 100
-sphere red 100 100
-triangle red 100 100
-rectangle red 100 100
-circle red 100 100
-sphere red 100 100
-circle red 100 100
-circle red 100 100
-triangle red 100 100
-cone red 100 100
-circle red 100 100
-rectangle red 100 100
-circle red 100 100
-square red 100 100
-sphere red 100 100
-triangle red 100 100
-rectangle red 100 100
-circle red 100 100
-sphere red 100 100
-circle red 100 100
-circle red 100 100
-triangle red 100 100
-cone red 100 100
-circle red 100 100
-rectangle red 100 100
-circle red 100 100
-square red 100 100
-sphere red 100 100
-triangle red 100 100
-rectangle red 100 100
-circle red 100 100
-sphere red 100 100
-circle red 100 100
-circle red 100 100
-triangle red 100 100
-cone red 100 100
-circle red 100 100
-rectangle red 100 100
-circle red 100 100
-square red 100 100
-sphere red 100 100
-triangle red 100 100
-rectangle red 100 100
-circle red 100 100
-sphere red 100 100
-circle red 100 100
-circle red 100 100
-triangle red 100 100
-cone red 100 100
-circle red 100 100
-rectangle red 100 100
-circle red 100 100
-square red 100 100
-sphere red 100 100
-triangle red 100 100
-rectangle red 100 100
-circle red 100 100
-sphere red 100 100
-circle red 100 100
-circle red 100 100
-triangle red 100 100
-cone red 100 100
-circle red 100 100
-rectangle red 100 100
-circle red 100 100
-square red 100 100
-sphere red 100 100
-triangle red 100 100
-rectangle red 100 100
-circle red 100 100
-sphere red 100 100
-circle red 100 100
-circle red 100 100
-triangle red 100 100
-cone red 100 100
-circle red 100 100
-rectangle red 100 100
-circle red 100 100
-square red 100 100
-sphere red 100 100
-triangle red 100 100
-rectangle red 100 100
-circle red 100 100
-sphere red 100 100
-circle red 100 100
-circle red 100 100
-triangle red 100 100
-cone red 100 100
-circle red 100 100
-rectangle red 100 100
-circle red 100 100
-square red 100 100
-sphere red 100 100
-triangle red 100 100
-rectangle red 100 100
-circle red 100 100
-sphere red 100 100
-circle red 100 100
-circle red 100 100
-triangle red 100 100
-cone red 100 100
-circle red 100 100
-rectangle red 100 100
-circle red 100 100
-square red 100 100
-sphere red 100 100
-triangle red 100 100
-rectangle red 100 100
-circle red 100 100
-sphere red 100 100
-circle red 100 100
-circle red 100 100
-triangle red 100 100
-cone red 100 100
-circle red 100 100
-rectangle red 100 100
-circle red 100 100
-square red 100 100
-sphere red 100 100
-triangle red 100 100
-rectangle red 100 100
-circle red 100 100
-sphere red 100 100
-circle red 100 100
-circle red 100 100
-triangle red 100 100
-cone red 100 100
-circle red 100 100
-rectangle red 100 100
-circle red 100 100
-square red 100 100
-sphere red 100 100
-triangle red 100 100
-rectangle red 100 100
-circle red 100 100
-sphere red 100 100
-circle red 100 100
-circle red 100 100
-triangle red 100 100
-cone red 100 100
-circle red 100 100
-rectangle red 100 100
-circle red 100 100
-square red 100 100
-sphere red 100 100
-triangle red 100 100
-rectangle red 100 100
-circle red 100 100
-sphere red 100 100
-circle red 100 100
-circle red 100 100
-triangle red 100 100
-cone red 100 100
-circle red 100 100
-rectangle red 100 100
-circle red 100 100
-square red 100 100
-sphere red 100 100
-triangle red 100 100
-rectangle red 100 100
-circle red 100 100
-sphere red 100 100
-circle red 100 100
-circle red 100 100
-triangle red 100 100
-cone red 100 100
-circle red 100 100
-rectangle red 100 100
-circle red 100 100
-square red 100 100
-sphere red 100 100
-triangle red 100 100
-rectangle red 100 100
-circle red 100 100
-sphere red 100 100
-circle red 100 100
-circle red 100 100
-triangle red 100 100
-cone red 100 100
-circle red 100 100
-rectangle red 100 100
-circle red 100 100
-square red 100 100
-sphere red 100 100
-triangle red 100 100
-rectangle red 100 100
-circle red 100 100
-sphere red 100 100
-circle red 100 100
-circle red 100 100
-triangle red 100 100
-cone red 100 100
-circle red 100 100
-rectangle red 100 100
-circle red 100 100
-square red 100 100
-sphere red 100 100
-triangle red 100 100
-rectangle red 100 100
-circle red 100 100
-sphere red 100 100
-circle red 100 100
-circle red 100 100
-triangle red 100 100
-cone red 100 100
-circle red 100 100
-rectangle red 100 100
-circle red 100 100
-square red 100 100
-sphere red 100 100
-triangle red 100 100
-rectangle red 100 100
-circle red 100 100
-sphere red 100 100
-circle red 100 100
-circle red 100 100
-triangle red 100 100
-cone red 100 100
-circle red 100 100
-rectangle red 100 100
-circle red 100 100
-square red 100 100
-sphere red 100 100
-triangle red 100 100
-rectangle red 100 100
-circle red 100 100
-sphere red 100 100
-circle red 100 100
-circle red 100 100
-triangle red 100 100
-cone red 100 100
-circle red 100 100
-rectangle red 100 100
-circle red 100 100
-square red 100 100
-sphere red 100 100
-triangle red 100 100
-rectangle red 100 100
-circle red 100 100
-sphere red 100 100
-circle red 100 100
-circle red 100 100
-triangle red 100 100
-cone red 100 100
-circle red 100 100
-rectangle red 100 100
-circle red 100 100
-square red 100 100
-sphere red 100 100
-triangle red 100 100
-rectangle red 100 100
-circle red 100 100
-sphere red 100 100
-circle red 100 100
-circle red 100 100
-triangle red 100 100
-cone red 100 100
-circle red 100 100
-rectangle red 100 100
-circle red 100 100
-square red 100 100
-sphere red 100 100
-triangle red 100 100
-rectangle red 100 100
-circle red 100 100
-sphere red 100 100
-circle red 100 100
-circle red 100 100
-triangle red 100 100
-cone red 100 100
-circle red 100 100
-rectangle red 100 100
-circle red 100 100
-square red 100 100
-sphere red 100 100
-triangle red 100 100
-rectangle red 100 100
-circle red 100 100
-sphere red 100 100
-circle red 100 100
-circle red 100 100
-triangle red 100 100
-cone red 100 100
-circle red 100 100
-rectangle red 100 100
-circle red 100 100
-square red 100 100
-sphere red 100 100
-triangle red 100 100
-rectangle red 100 100
-circle red 100 100
-sphere red 100 100
-circle red 100 100
-circle red 100 100
-triangle red 100 100
-cone red 100 100
-circle red 100 100
-rectangle red 100 100
-circle red 100 100
-square red 100 100
-sphere red 100 100
-triangle red 100 100
-rectangle red 100 100
-circle red 100 100
-sphere red 100 100
-circle red 100 100
-circle red 100 100
-triangle red 100 100
-cone red 100 100
-circle red 100 100
-rectangle red 100 100
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/Shapes_NoHeader.csv b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/Shapes_NoHeader.csv
deleted file mode 100644
index a3e3017..0000000
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/Shapes_NoHeader.csv
+++ /dev/null
@@ -1,351 +0,0 @@
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-square,red,100,100
-sphere,red,100,100
-triangle,red,100,100
-rectangle,red,100,100
-circle,red,100,100
-sphere,red,100,100
-circle,red,100,100
-circle,red,100,100
-triangle,red,100,100
-cone,red,100,100
-circle,red,100,100
-rectangle,red,100,100
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/Shapes_header.csv.avro b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/Shapes_header.csv.avro
deleted file mode 100644
index 300b069..0000000
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/Shapes_header.csv.avro
+++ /dev/null
@@ -1,23 +0,0 @@
-{
- "type" : "record",
- "name" : "contact",
- "namespace" : "org.apache.nifi",
- "doc" : "Schema generated by Kite",
- "fields" : [ {
- "name" : "shape",
- "type" : "string",
- "doc" : "Type inferred from 'circle'"
- }, {
- "name" : "color",
- "type" : "string",
- "doc" : "Type inferred from 'red'"
- }, {
- "name" : "width",
- "type" : "long",
- "doc" : "Type inferred from '100'"
- }, {
- "name" : "height",
- "type" : "long",
- "doc" : "Type inferred from '100'"
- } ]
-}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-kite-bundle/pom.xml b/nifi-nar-bundles/nifi-kite-bundle/pom.xml
deleted file mode 100644
index 204a20e..0000000
--- a/nifi-nar-bundles/nifi-kite-bundle/pom.xml
+++ /dev/null
@@ -1,89 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
- <!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- -->
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-nar-bundles</artifactId>
- <version>1.16.0-SNAPSHOT</version>
- </parent>
-
- <artifactId>nifi-kite-bundle</artifactId>
- <packaging>pom</packaging>
-
- <description>A bundle of processors that use Kite to store data in Hadoop</description>
-
- <modules>
- <module>nifi-kite-processors</module>
- <module>nifi-kite-nar</module>
- </modules>
-
- <dependencyManagement>
- <dependencies>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-kite-processors</artifactId>
- <version>1.16.0-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
- <version>${netty.3.version}</version>
- </dependency>
- <!-- Override commons-io:2.4 from kite -->
- <dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- <version>2.10.0</version>
- </dependency>
- <!-- Override commons-compress -->
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-compress</artifactId>
- <version>1.21</version>
- </dependency>
- <!-- Override commons-beanutils -->
- <dependency>
- <groupId>commons-beanutils</groupId>
- <artifactId>commons-beanutils</artifactId>
- <version>1.9.4</version>
- </dependency>
- <!-- Override jackson 2.3.1 from kite -->
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-annotations</artifactId>
- <version>${jackson.version}</version>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-core</artifactId>
- <version>${jackson.version}</version>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- <version>${jackson.version}</version>
- </dependency>
- <!-- Override zookeeper -->
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <version>3.4.14</version>
- </dependency>
- </dependencies>
- </dependencyManagement>
-
-</project>
diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml
index fc82b7a..8503db7 100755
--- a/nifi-nar-bundles/pom.xml
+++ b/nifi-nar-bundles/pom.xml
@@ -33,7 +33,6 @@
<module>nifi-standard-services</module>
<module>nifi-update-attribute-bundle</module>
<module>nifi-kafka-bundle</module>
- <module>nifi-kite-bundle</module>
<module>nifi-kudu-bundle</module>
<module>nifi-solr-bundle</module>
<module>nifi-confluent-platform-bundle</module>