You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2018/06/13 18:33:26 UTC
[6/6] nifi git commit: NIFI-4963: Added Hive3 bundle - Incorporated
review comments - Added more defensive code for PutHive3Streaming error
handling
NIFI-4963: Added Hive3 bundle
- Incorporated review comments
- Added more defensive code for PutHive3Streaming error handling
This closes #2755.
Signed-off-by: Bryan Bende <bb...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/da99f873
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/da99f873
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/da99f873
Branch: refs/heads/master
Commit: da99f873a7d2f636465efd86178e578da75674a0
Parents: 8feac9a
Author: Matthew Burgess <ma...@apache.org>
Authored: Mon Jun 4 11:37:48 2018 -0400
Committer: Bryan Bende <bb...@apache.org>
Committed: Wed Jun 13 14:32:58 2018 -0400
----------------------------------------------------------------------
.travis.yml | 2 +-
nifi-assembly/pom.xml | 23 +
.../nifi-hive-bundle/nifi-hive-nar/pom.xml | 2 +
.../nifi-hive-processors/pom.xml | 7 +-
.../nifi-hive-services-api-nar/pom.xml | 2 +
.../nifi-hive-services-api/pom.xml | 6 +
.../apache/nifi/dbcp/hive/Hive3DBCPService.java | 30 +
.../nifi-hive-bundle/nifi-hive3-nar/pom.xml | 49 ++
.../src/main/resources/META-INF/NOTICE | 349 ++++++++
.../nifi-hive3-processors/pom.xml | 140 +++
.../hadoop/hive/ql/io/orc/NiFiOrcUtils.java | 533 +++++++++++
.../apache/hive/streaming/HiveRecordWriter.java | 106 +++
.../apache/hive/streaming/NiFiRecordSerDe.java | 282 ++++++
.../nifi/dbcp/hive/Hive3ConnectionPool.java | 385 ++++++++
.../hive/AbstractHive3QLProcessor.java | 348 ++++++++
.../apache/nifi/processors/hive/PutHive3QL.java | 280 ++++++
.../nifi/processors/hive/PutHive3Streaming.java | 560 ++++++++++++
.../nifi/processors/hive/SelectHive3QL.java | 477 ++++++++++
.../org/apache/nifi/processors/orc/PutORC.java | 175 ++++
.../orc/record/ORCHDFSRecordWriter.java | 110 +++
.../hive/AuthenticationFailedException.java | 23 +
.../apache/nifi/util/hive/CsvOutputOptions.java | 63 ++
.../apache/nifi/util/hive/HiveConfigurator.java | 119 +++
.../apache/nifi/util/hive/HiveJdbcCommon.java | 450 ++++++++++
.../org/apache/nifi/util/hive/HiveOptions.java | 117 +++
.../org/apache/nifi/util/hive/HiveUtils.java | 76 ++
.../nifi/util/hive/ValidationResources.java | 41 +
...org.apache.nifi.controller.ControllerService | 15 +
.../org.apache.nifi.processor.Processor | 18 +
.../hive/streaming/StubConnectionError.java | 31 +
.../hive/streaming/StubSerializationError.java | 23 +
.../hive/streaming/StubStreamingIOFailure.java | 28 +
.../hive/streaming/StubTransactionError.java | 27 +
.../nifi/dbcp/hive/Hive3ConnectionPoolTest.java | 138 +++
.../nifi/processors/hive/TestHive3Parser.java | 292 ++++++
.../nifi/processors/hive/TestPutHive3QL.java | 792 +++++++++++++++++
.../processors/hive/TestPutHive3Streaming.java | 878 +++++++++++++++++++
.../nifi/processors/hive/TestSelectHive3QL.java | 539 ++++++++++++
.../apache/nifi/processors/orc/PutORCTest.java | 416 +++++++++
.../apache/nifi/util/orc/TestNiFiOrcUtils.java | 437 +++++++++
.../src/test/resources/array_of_records.avsc | 38 +
.../src/test/resources/core-site-security.xml | 30 +
.../src/test/resources/core-site.xml | 22 +
.../src/test/resources/fake.keytab | 0
.../src/test/resources/hive-site-security.xml | 26 +
.../src/test/resources/hive-site.xml | 22 +
.../src/test/resources/krb5.conf | 0
.../src/test/resources/user.avsc | 26 +
.../src/test/resources/user_logical_types.avsc | 27 +
nifi-nar-bundles/nifi-hive-bundle/pom.xml | 59 +-
50 files changed, 8587 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/da99f873/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index d6c9b39..05351b5 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -55,4 +55,4 @@ script:
# Note: The reason the sed is done as part of script is to ensure the pom hack
# won't affect the 'clean install' above
- bash .travis.sh
- - mvn -T 2 clean install -Pcontrib-check,include-grpc,include-atlas -Ddir-only | grep -v -F -f .travis-output-filters && exit ${PIPESTATUS[0]}
+ - mvn -T 2 clean install -Pcontrib-check,include-grpc,include-atlas,include-hive3 -Ddir-only | grep -v -F -f .travis-output-filters && exit ${PIPESTATUS[0]}
http://git-wip-us.apache.org/repos/asf/nifi/blob/da99f873/nifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index e610aa0..3f473c8 100755
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -549,6 +549,12 @@ language governing permissions and limitations under the License. -->
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-hive3-nar</artifactId>
+ <version>1.7.0-SNAPSHOT</version>
+ <type>nar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
<artifactId>nifi-site-to-site-reporting-nar</artifactId>
<version>1.7.0-SNAPSHOT</version>
<type>nar</type>
@@ -746,6 +752,23 @@ language governing permissions and limitations under the License. -->
</dependencies>
</profile>
<profile>
+ <id>include-hive3</id>
+ <!-- This profile handles the inclusion of Hive 3 artifacts. The NAR
+ is quite large and makes the resultant binary distribution significantly
+ larger (275+ MB). -->
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-hive3-nar</artifactId>
+ <version>1.7.0-SNAPSHOT</version>
+ <type>nar</type>
+ </dependency>
+ </dependencies>
+ </profile>
+ <profile>
<id>rpm</id>
<activation>
<activeByDefault>false</activeByDefault>
http://git-wip-us.apache.org/repos/asf/nifi/blob/da99f873/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-nar/pom.xml b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-nar/pom.xml
index 41e0159..cb2d60d 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-nar/pom.xml
@@ -28,6 +28,8 @@
<properties>
<maven.javadoc.skip>true</maven.javadoc.skip>
<source.skip>true</source.skip>
+ <!-- Need to override hadoop.version here, for Hive and hadoop-client transitive dependencies -->
+ <hadoop.version>${hive.hadoop.version}</hadoop.version>
</properties>
<dependencies>
http://git-wip-us.apache.org/repos/asf/nifi/blob/da99f873/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml
index f7b7b0b..4a6be6d 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml
@@ -24,13 +24,17 @@
<artifactId>nifi-hive-processors</artifactId>
<packaging>jar</packaging>
+
<properties>
- <hive.version>1.2.1</hive.version>
+ <!-- Need to override hadoop.version here, for Hive and hadoop-client transitive dependencies -->
+ <hadoop.version>${hive.hadoop.version}</hadoop.version>
</properties>
+
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
+ <version>1.7.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
@@ -84,6 +88,7 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
+ <version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
http://git-wip-us.apache.org/repos/asf/nifi/blob/da99f873/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-services-api-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-services-api-nar/pom.xml b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-services-api-nar/pom.xml
index 1060225..b0b9a4c 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-services-api-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-services-api-nar/pom.xml
@@ -28,6 +28,8 @@
<properties>
<maven.javadoc.skip>true</maven.javadoc.skip>
<source.skip>true</source.skip>
+ <!-- Need to override hadoop.version here, for Hive and hadoop-client transitive dependencies -->
+ <hadoop.version>${hive.hadoop.version}</hadoop.version>
</properties>
<dependencies>
http://git-wip-us.apache.org/repos/asf/nifi/blob/da99f873/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-services-api/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-services-api/pom.xml b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-services-api/pom.xml
index 6d85c38..2db9b34 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-services-api/pom.xml
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-services-api/pom.xml
@@ -25,10 +25,16 @@
<artifactId>nifi-hive-services-api</artifactId>
<packaging>jar</packaging>
+ <properties>
+ <!-- Need to override hadoop.version here, for Hive and hadoop-client transitive dependencies -->
+ <hadoop.version>${hive.hadoop.version}</hadoop.version>
+ </properties>
+
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
+ <version>1.7.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
http://git-wip-us.apache.org/repos/asf/nifi/blob/da99f873/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-services-api/src/main/java/org/apache/nifi/dbcp/hive/Hive3DBCPService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-services-api/src/main/java/org/apache/nifi/dbcp/hive/Hive3DBCPService.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-services-api/src/main/java/org/apache/nifi/dbcp/hive/Hive3DBCPService.java
new file mode 100644
index 0000000..e3af3aa
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-services-api/src/main/java/org/apache/nifi/dbcp/hive/Hive3DBCPService.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.dbcp.hive;
+
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+
+/**
+ * Definition for Database Connection Pooling Service.
+ *
+ */
+@Tags({"hive", "dbcp", "jdbc", "database", "connection", "pooling", "store"})
+@CapabilityDescription("Provides Database Connection Pooling Service for Apache Hive. Connections can be asked from pool and returned after usage.")
+public interface Hive3DBCPService extends HiveDBCPService {
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/da99f873/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-nar/pom.xml b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-nar/pom.xml
new file mode 100644
index 0000000..41286d5
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-nar/pom.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-hive-bundle</artifactId>
+ <version>1.7.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>nifi-hive3-nar</artifactId>
+ <version>1.7.0-SNAPSHOT</version>
+ <packaging>nar</packaging>
+ <properties>
+ <maven.javadoc.skip>true</maven.javadoc.skip>
+ <source.skip>true</source.skip>
+ <!-- Need to override hadoop.version here, for Hive and hadoop-client transitive dependencies -->
+ <hadoop.version>${hive3.hadoop.version}</hadoop.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-hive-services-api-nar</artifactId>
+ <version>1.7.0-SNAPSHOT</version>
+ <type>nar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-hive3-processors</artifactId>
+ <version>1.7.0-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+
+</project>
http://git-wip-us.apache.org/repos/asf/nifi/blob/da99f873/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-nar/src/main/resources/META-INF/NOTICE
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-nar/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000..9da3e38
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-nar/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,349 @@
+nifi-hive-nar
+Copyright 2014-2017 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+This includes derived works from the Apache Storm (ASLv2 licensed) project (https://github.com/apache/storm):
+ Copyright 2015 The Apache Software Foundation
+ The derived work is adapted from
+ org/apache/storm/hive/common/HiveWriter.java
+ org/apache/storm/hive/common/HiveOptions.java
+ and can be found in the org.apache.nifi.util.hive package
+
+This includes derived works from the Apache Hive (ASLv2 licensed) project (https://github.com/apache/hive):
+ Copyright 2008-2016 The Apache Software Foundation
+ The derived work is adapted from
+ release-1.2.1/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
+ and can be found in the org.apache.hadoop.hive.ql.io.orc package
+ The derived work is adapted from
+ branch-3.0/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java
+ and can be found in the org.apache.hive.streaming.HiveRecordWriter class
+ The derived work is adapted from
+ branch-3.0/serde/src/java/org/apache/hadoop/hive/serde2/JsonSerDe.java
+ and can be found in the org.apache.hive.streaming.NiFiRecordSerDe class
+
+===========================================
+Apache Software License v2
+===========================================
+
+The following binary components are provided under the Apache Software License v2
+
+ (ASLv2) Apache Ant
+ The following NOTICE information applies:
+ Apache Ant
+ Copyright 1999-2016 The Apache Software Foundation
+
+ (ASLv2) Apache Commons Codec
+ The following NOTICE information applies:
+ Apache Commons Codec
+ Copyright 2002-2014 The Apache Software Foundation
+
+ src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java
+ contains test data from http://aspell.net/test/orig/batch0.tab.
+ Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org)
+
+ ===============================================================================
+
+ The content of package org.apache.commons.codec.language.bm has been translated
+ from the original php source code available at http://stevemorse.org/phoneticinfo.htm
+ with permission from the original authors.
+ Original source copyright:
+ Copyright (c) 2008 Alexander Beider & Stephen P. Morse.
+
+ (ASLv2) Apache Commons DBCP
+ The following NOTICE information applies:
+ Apache Commons DBCP
+ Copyright 2001-2015 The Apache Software Foundation.
+
+ (ASLv2) Apache Commons EL
+ The following NOTICE information applies:
+ Apache Commons EL
+ Copyright 1999-2016 The Apache Software Foundation
+
+ EL-8 patch - Copyright 2004-2007 Jamie Taylor
+ http://issues.apache.org/jira/browse/EL-8
+
+ (ASLv2) Apache HttpComponents
+ The following NOTICE information applies:
+ Apache HttpComponents Client
+ Copyright 1999-2016 The Apache Software Foundation
+ Apache HttpComponents Core - HttpCore
+ Copyright 2006-2009 The Apache Software Foundation
+
+ (ASLv2) Apache Commons Logging
+ The following NOTICE information applies:
+ Apache Commons Logging
+ Copyright 2003-2014 The Apache Software Foundation
+
+ (ASLv2) Apache Commons Pool
+ The following NOTICE information applies:
+ Apache Commons Pool
+ Copyright 1999-2009 The Apache Software Foundation.
+
+ (ASLv2) Apache Commons IO
+ The following NOTICE information applies:
+ Apache Commons IO
+ Copyright 2002-2016 The Apache Software Foundation
+
+ (ASLv2) Apache Hive
+ The following NOTICE information applies:
+ Apache Hive
+ Copyright 2008-2015 The Apache Software Foundation
+
+ This product includes software developed by The Apache Software
+ Foundation (http://www.apache.org/).
+
+ This product includes Jersey (https://jersey.java.net/)
+ Copyright (c) 2010-2014 Oracle and/or its affiliates.
+
+ This project includes software copyrighted by Microsoft Corporation and
+ licensed under the Apache License, Version 2.0.
+
+ This project includes software copyrighted by Dell SecureWorks and
+ licensed under the Apache License, Version 2.0.
+
+ (ASLv2) Jackson JSON processor
+ The following NOTICE information applies:
+ # Jackson JSON processor
+
+ Jackson is a high-performance, Free/Open Source JSON processing library.
+ It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has
+ been in development since 2007.
+ It is currently developed by a community of developers, as well as supported
+ commercially by FasterXML.com.
+
+ ## Licensing
+
+ Jackson core and extension components may licensed under different licenses.
+ To find the details that apply to this artifact see the accompanying LICENSE file.
+ For more information, including possible other licensing options, contact
+ FasterXML.com (http://fasterxml.com).
+
+ ## Credits
+
+ A list of contributors may be found from CREDITS file, which is included
+ in some artifacts (usually source distributions); but is always available
+ from the source code management (SCM) system project uses.
+
+ (ASLv2) BoneCP
+ The following NOTICE information applies:
+ BoneCP
+ Copyright 2010 Wallace Wadge
+
+ (ASLv2) Apache Hadoop
+ The following NOTICE information applies:
+ The binary distribution of this product bundles binaries of
+ org.iq80.leveldb:leveldb-api (https://github.com/dain/leveldb), which has the
+ following notices:
+ * Copyright 2011 Dain Sundstrom <da...@iq80.com>
+ * Copyright 2011 FuseSource Corp. http://fusesource.com
+
+ The binary distribution of this product bundles binaries of
+ org.fusesource.hawtjni:hawtjni-runtime (https://github.com/fusesource/hawtjni),
+ which has the following notices:
+ * This product includes software developed by FuseSource Corp.
+ http://fusesource.com
+ * This product includes software developed at
+ Progress Software Corporation and/or its subsidiaries or affiliates.
+ * This product includes software developed by IBM Corporation and others.
+
+ (ASLv2) Apache HBase
+ The following NOTICE information applies:
+ Apache HBase
+ Copyright 2007-2015 The Apache Software Foundation
+
+ --
+ This product incorporates portions of the 'Hadoop' project
+
+ Copyright 2007-2009 The Apache Software Foundation
+
+ Licensed under the Apache License v2.0
+ --
+ Our Orca logo we got here: http://www.vectorfree.com/jumping-orca
+ It is licensed Creative Commons Attribution 3.0.
+ See https://creativecommons.org/licenses/by/3.0/us/
+ We changed the logo by stripping the colored background, inverting
+ it and then rotating it some.
+
+ Later we found that vectorfree.com image is not properly licensed.
+ The original is owned by vectorportal.com. The original was
+ relicensed so we could use it as Creative Commons Attribution 3.0.
+ The license is bundled with the download available here:
+ http://www.vectorportal.com/subcategory/205/KILLER-WHALE-FREE-VECTOR.eps/ifile/9136/detailtest.asp
+ --
+ This product includes portions of the Bootstrap project v3.0.0
+
+ Copyright 2013 Twitter, Inc.
+
+ Licensed under the Apache License v2.0
+
+ This product uses the Glyphicons Halflings icon set.
+
+ http://glyphicons.com/
+
+ Copyright Jan Kovařík
+
+ Licensed under the Apache License v2.0 as a part of the Bootstrap project.
+
+ --
+ This product includes portions of the Guava project v14, specifically
+ 'hbase-common/src/main/java/org/apache/hadoop/hbase/io/LimitInputStream.java'
+
+ Copyright (C) 2007 The Guava Authors
+
+ Licensed under the Apache License, Version 2.0
+
+ (ASLv2) Apache Commons Lang
+ The following NOTICE information applies:
+ Apache Commons Lang
+ Copyright 2001-2015 The Apache Software Foundation
+
+ (ASLv2) Apache Curator
+ The following NOTICE information applies:
+ Apache Curator
+ Copyright 2013-2014 The Apache Software Foundation
+
+ (ASLv2) Apache Derby
+ The following NOTICE information applies:
+ Apache Derby
+ Copyright 2004-2014 Apache, Apache DB, Apache Derby, Apache Torque, Apache JDO, Apache DDLUtils,
+ the Derby hat logo, the Apache JDO logo, and the Apache feather logo are trademarks of The Apache Software Foundation.
+
+ (ASLv2) Apache DS
+ The following NOTICE information applies:
+ ApacheDS
+ Copyright 2003-2015 The Apache Software Foundation
+
+ (ASLv2) Apache Geronimo
+ The following NOTICE information applies:
+ Apache Geronimo
+ Copyright 2003-2008 The Apache Software Foundation
+
+ (ASLv2) HTrace Core
+ The following NOTICE information applies:
+ In addition, this product includes software dependencies. See
+ the accompanying LICENSE.txt for a listing of dependencies
+ that are NOT Apache licensed (with pointers to their licensing)
+
+ Apache HTrace includes an Apache Thrift connector to Zipkin. Zipkin
+ is a distributed tracing system that is Apache 2.0 Licensed.
+ Copyright 2012 Twitter, Inc.
+
+ (ASLv2) Jettison
+ The following NOTICE information applies:
+ Copyright 2006 Envoi Solutions LLC
+
+ (ASLv2) Jetty
+ The following NOTICE information applies:
+ Jetty Web Container
+ Copyright 1995-2017 Mort Bay Consulting Pty Ltd.
+
+ (ASLv2) Apache log4j
+ The following NOTICE information applies:
+ Apache log4j
+ Copyright 2007 The Apache Software Foundation
+
+ (ASLv2) Parquet MR
+ The following NOTICE information applies:
+ Parquet MR
+ Copyright 2012 Twitter, Inc.
+
+ This project includes code from https://github.com/lemire/JavaFastPFOR
+ parquet-column/src/main/java/parquet/column/values/bitpacking/LemireBitPacking.java
+ Apache License Version 2.0 http://www.apache.org/licenses/.
+ (c) Daniel Lemire, http://lemire.me/en/
+
+ (ASLv2) Apache Thrift
+ The following NOTICE information applies:
+ Apache Thrift
+ Copyright 2006-2010 The Apache Software Foundation.
+
+ (ASLv2) Apache Twill
+ The following NOTICE information applies:
+ Apache Twill
+ Copyright 2013-2016 The Apache Software Foundation
+
+ (ASLv2) Dropwizard Metrics
+ The following NOTICE information applies:
+ Metrics
+ Copyright 2010-2013 Coda Hale and Yammer, Inc.
+
+ This product includes code derived from the JSR-166 project (ThreadLocalRandom, Striped64,
+ LongAdder), which was released with the following comments:
+
+ Written by Doug Lea with assistance from members of JCP JSR-166
+ Expert Group and released to the public domain, as explained at
+ http://creativecommons.org/publicdomain/zero/1.0/
+
+ (ASLv2) Joda Time
+ The following NOTICE information applies:
+ This product includes software developed by
+ Joda.org (http://www.joda.org/).
+
+ (ASLv2) The Netty Project
+ The following NOTICE information applies:
+ The Netty Project
+ Copyright 2011 The Netty Project
+
+ (ASLv2) Apache Tomcat
+ The following NOTICE information applies:
+ Apache Tomcat
+ Copyright 2007 The Apache Software Foundation
+
+ Java Management Extensions (JMX) support is provided by
+ the MX4J package, which is open source software. The
+ original software and related information is available
+ at http://mx4j.sourceforge.net.
+
+ Java compilation software for JSP pages is provided by Eclipse,
+ which is open source software. The orginal software and
+ related infomation is available at
+ http://www.eclipse.org.
+
+ (ASLv2) Apache ZooKeeper
+ The following NOTICE information applies:
+ Apache ZooKeeper
+ Copyright 2009-2012 The Apache Software Foundation
+
+ (ASLv2) Google GSON
+ The following NOTICE information applies:
+ Copyright 2008 Google Inc.
+
+ (ASLv2) JPam
+ The following NOTICE information applies:
+ Copyright 2003-2006 Greg Luck
+
+ ************************
+ Common Development and Distribution License 1.1
+ ************************
+
+ The following binary components are provided under the Common Development and Distribution License 1.1. See project link for details.
+
+ (CDDL 1.1) (GPL2 w/ CPE) jersey-client (com.sun.jersey:jersey-client:jar:1.9 - https://jersey.java.net)
+ (CDDL 1.1) (GPL2 w/ CPE) jersey-core (com.sun.jersey:jersey-core:jar:1.9 - https://jersey.java.net/)
+ (CDDL 1.1) (GPL2 w/ CPE) jersey-json (com.sun.jersey:jersey-json:jar:1.9 - https://jersey.java.net/)
+ (CDDL 1.1) (GPL2 w/ CPE) jersey-server (com.sun.jersey:jersey-server:jar:1.9 - https://jersey.java.net/)
+ (CDDL 1.1) (GPL2 w/ CPE) jersey-guice (com.sun.jersey.contribs:jersey-guice:jar:1.9 - https://jersey.java.net/)
+ (CDDL 1.1) (GPL2 w/ CPE) Java Architecture For XML Binding (javax.xml.bind:jaxb-api:jar:2.2.2 - https://jaxb.dev.java.net/)
+ (CDDL 1.1) (GPL2 w/ CPE) JavaMail API (compat) (javax.mail:mail:jar:1.4.7 - http://kenai.com/projects/javamail/mail)
+
+
+ ************************
+ Common Development and Distribution License 1.0
+ ************************
+
+ The following binary components are provided under the Common Development and Distribution License 1.0. See project link for details.
+
+ (CDDL 1.0) JavaServlet(TM) Specification (javax.servlet:servlet-api:jar:2.5 - no url available)
+ (CDDL 1.0) (GPL3) Streaming API For XML (javax.xml.stream:stax-api:jar:1.0-2 - no url provided)
+ (CDDL 1.0) JavaBeans Activation Framework (JAF) (javax.activation:activation:jar:1.1 - http://java.sun.com/products/javabeans/jaf/index.jsp)
+ (CDDL 1.0) JavaServer Pages(TM) API (javax.servlet.jsp:jsp-api:jar:2.1 - http://jsp.java.net)
+
+ *****************
+ Public Domain
+ *****************
+
+ The following binary components are provided to the 'Public Domain'. See project link for details.
+
+ (Public Domain) AOP Alliance 1.0 (http://aopalliance.sourceforge.net/)
http://git-wip-us.apache.org/repos/asf/nifi/blob/da99f873/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/pom.xml b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/pom.xml
new file mode 100644
index 0000000..c62268b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/pom.xml
@@ -0,0 +1,140 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-hive-bundle</artifactId>
+ <version>1.7.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>nifi-hive3-processors</artifactId>
+ <packaging>jar</packaging>
+
+ <properties>
+ <!-- Need to override hadoop.version here, for Hive and hadoop-client transitive dependencies -->
+ <hadoop.version>${hive3.hadoop.version}</hadoop.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-api</artifactId>
+ <version>1.7.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-processor-utils</artifactId>
+ <version>1.7.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-dbcp-service-api</artifactId>
+ <version>1.7.0-SNAPSHOT</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-hive-services-api</artifactId>
+ <version>1.7.0-SNAPSHOT</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-hadoop-record-utils</artifactId>
+ <version>1.7.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-serialization-service-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-kerberos-credentials-service-api</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-jdbc</artifactId>
+ <version>${hive3.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.json</groupId>
+ <artifactId>json</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-streaming</artifactId>
+ <version>${hive3.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive.hcatalog</groupId>
+ <artifactId>hive-hcatalog-core</artifactId>
+ <version>${hive3.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-hadoop-utils</artifactId>
+ <version>1.7.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>com.github.stephenc.findbugs</groupId>
+ <artifactId>findbugs-annotations</artifactId>
+ <version>1.3.9-1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mock-record-utils</artifactId>
+ <version>1.7.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mock</artifactId>
+ <version>1.7.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/nifi/blob/da99f873/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java
new file mode 100644
index 0000000..7231421
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java
@@ -0,0 +1,533 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.util.Utf8;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.TimestampWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.orc.MemoryManager;
+import org.apache.orc.OrcConf;
+import org.apache.orc.impl.MemoryManagerImpl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+
+/**
+ * Utility methods for ORC support (conversion from Avro, conversion to Hive types, e.g.
+ */
+public class NiFiOrcUtils {
+
+ public static Object convertToORCObject(TypeInfo typeInfo, Object o, final boolean hiveFieldNames) {
+ if (o != null) {
+ if (typeInfo instanceof UnionTypeInfo) {
+ OrcUnion union = new OrcUnion();
+ // Avro uses Utf8 and GenericData.EnumSymbol objects instead of Strings. This is handled in other places in the method, but here
+ // we need to determine the union types from the objects, so choose String.class if the object is one of those Avro classes
+ Class clazzToCompareTo = o.getClass();
+ if (o instanceof org.apache.avro.util.Utf8 || o instanceof GenericData.EnumSymbol) {
+ clazzToCompareTo = String.class;
+ }
+ // Need to find which of the union types correspond to the primitive object
+ TypeInfo objectTypeInfo = TypeInfoUtils.getTypeInfoFromObjectInspector(
+ ObjectInspectorFactory.getReflectionObjectInspector(clazzToCompareTo, ObjectInspectorFactory.ObjectInspectorOptions.JAVA));
+ List<TypeInfo> unionTypeInfos = ((UnionTypeInfo) typeInfo).getAllUnionObjectTypeInfos();
+
+ int index = 0;
+ while (index < unionTypeInfos.size() && !unionTypeInfos.get(index).equals(objectTypeInfo)) {
+ index++;
+ }
+ if (index < unionTypeInfos.size()) {
+ union.set((byte) index, convertToORCObject(objectTypeInfo, o, hiveFieldNames));
+ } else {
+ throw new IllegalArgumentException("Object Type for class " + o.getClass().getName() + " not in Union declaration");
+ }
+ return union;
+ }
+ if (o instanceof Integer) {
+ return new IntWritable((int) o);
+ }
+ if (o instanceof Boolean) {
+ return new BooleanWritable((boolean) o);
+ }
+ if (o instanceof Long) {
+ return new LongWritable((long) o);
+ }
+ if (o instanceof Float) {
+ return new FloatWritable((float) o);
+ }
+ if (o instanceof Double) {
+ return new DoubleWritable((double) o);
+ }
+ if (o instanceof String || o instanceof Utf8 || o instanceof GenericData.EnumSymbol) {
+ return new Text(o.toString());
+ }
+ if (o instanceof ByteBuffer) {
+ return new BytesWritable(((ByteBuffer) o).array());
+ }
+ if (o instanceof Timestamp) {
+ return new TimestampWritable((Timestamp) o);
+ }
+ if (o instanceof Date) {
+ return new DateWritable((Date) o);
+ }
+ if (o instanceof Object[]) {
+ Object[] objArray = (Object[]) o;
+ TypeInfo listTypeInfo = ((ListTypeInfo) typeInfo).getListElementTypeInfo();
+ return Arrays.stream(objArray)
+ .map(o1 -> convertToORCObject(listTypeInfo, o1, hiveFieldNames))
+ .collect(Collectors.toList());
+ }
+ if (o instanceof int[]) {
+ int[] intArray = (int[]) o;
+ return Arrays.stream(intArray)
+ .mapToObj((element) -> convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("int"), element, hiveFieldNames))
+ .collect(Collectors.toList());
+ }
+ if (o instanceof long[]) {
+ long[] longArray = (long[]) o;
+ return Arrays.stream(longArray)
+ .mapToObj((element) -> convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("bigint"), element, hiveFieldNames))
+ .collect(Collectors.toList());
+ }
+ if (o instanceof float[]) {
+ float[] floatArray = (float[]) o;
+ return IntStream.range(0, floatArray.length)
+ .mapToDouble(i -> floatArray[i])
+ .mapToObj((element) -> convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("float"), (float) element, hiveFieldNames))
+ .collect(Collectors.toList());
+ }
+ if (o instanceof double[]) {
+ double[] doubleArray = (double[]) o;
+ return Arrays.stream(doubleArray)
+ .mapToObj((element) -> convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("double"), element, hiveFieldNames))
+ .collect(Collectors.toList());
+ }
+ if (o instanceof boolean[]) {
+ boolean[] booleanArray = (boolean[]) o;
+ return IntStream.range(0, booleanArray.length)
+ .map(i -> booleanArray[i] ? 1 : 0)
+ .mapToObj((element) -> convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("boolean"), element == 1, hiveFieldNames))
+ .collect(Collectors.toList());
+ }
+ if (o instanceof GenericData.Array) {
+ GenericData.Array array = ((GenericData.Array) o);
+ // The type information in this case is interpreted as a List
+ TypeInfo listTypeInfo = ((ListTypeInfo) typeInfo).getListElementTypeInfo();
+ return array.stream().map((element) -> convertToORCObject(listTypeInfo, element, hiveFieldNames)).collect(Collectors.toList());
+ }
+ if (o instanceof List) {
+ return o;
+ }
+ if (o instanceof Map) {
+ Map map = new HashMap();
+ TypeInfo keyInfo = ((MapTypeInfo) typeInfo).getMapKeyTypeInfo();
+ TypeInfo valueInfo = ((MapTypeInfo) typeInfo).getMapValueTypeInfo();
+ // Unions are not allowed as key/value types, so if we convert the key and value objects,
+ // they should return Writable objects
+ ((Map) o).forEach((key, value) -> {
+ Object keyObject = convertToORCObject(keyInfo, key, hiveFieldNames);
+ Object valueObject = convertToORCObject(valueInfo, value, hiveFieldNames);
+ if (keyObject == null) {
+ throw new IllegalArgumentException("Maps' key cannot be null");
+ }
+ map.put(keyObject, valueObject);
+ });
+ return map;
+ }
+ if (o instanceof GenericData.Record) {
+ GenericData.Record record = (GenericData.Record) o;
+ TypeInfo recordSchema = NiFiOrcUtils.getOrcField(record.getSchema(), hiveFieldNames);
+ List<Schema.Field> recordFields = record.getSchema().getFields();
+ if (recordFields != null) {
+ Object[] fieldObjects = new Object[recordFields.size()];
+ for (int i = 0; i < recordFields.size(); i++) {
+ Schema.Field field = recordFields.get(i);
+ Schema fieldSchema = field.schema();
+ Object fieldObject = record.get(field.name());
+ fieldObjects[i] = NiFiOrcUtils.convertToORCObject(NiFiOrcUtils.getOrcField(fieldSchema, hiveFieldNames), fieldObject, hiveFieldNames);
+ }
+ return NiFiOrcUtils.createOrcStruct(recordSchema, fieldObjects);
+ }
+ }
+ throw new IllegalArgumentException("Error converting object of type " + o.getClass().getName() + " to ORC type " + typeInfo.getTypeName());
+ } else {
+ return null;
+ }
+ }
+
+
+ /**
+ * Create an object of OrcStruct given a TypeInfo and a list of objects
+ *
+ * @param typeInfo The TypeInfo object representing the ORC record schema
+ * @param objs ORC objects/Writables
+ * @return an OrcStruct containing the specified objects for the specified schema
+ */
+ @SuppressWarnings("unchecked")
+ public static OrcStruct createOrcStruct(TypeInfo typeInfo, Object... objs) {
+ SettableStructObjectInspector oi = (SettableStructObjectInspector) OrcStruct
+ .createObjectInspector(typeInfo);
+ List<StructField> fields = (List<StructField>) oi.getAllStructFieldRefs();
+ OrcStruct result = (OrcStruct) oi.create();
+ result.setNumFields(fields.size());
+ for (int i = 0; i < fields.size(); i++) {
+ oi.setStructFieldData(result, fields.get(i), objs[i]);
+ }
+ return result;
+ }
+
+ public static String normalizeHiveTableName(String name) {
+ return name.replaceAll("[\\. ]", "_");
+ }
+
+ public static String generateHiveDDL(Schema avroSchema, String tableName, boolean hiveFieldNames) {
+ Schema.Type schemaType = avroSchema.getType();
+ StringBuilder sb = new StringBuilder("CREATE EXTERNAL TABLE IF NOT EXISTS ");
+ sb.append(tableName);
+ sb.append(" (");
+ if (Schema.Type.RECORD.equals(schemaType)) {
+ List<String> hiveColumns = new ArrayList<>();
+ List<Schema.Field> fields = avroSchema.getFields();
+ if (fields != null) {
+ hiveColumns.addAll(
+ fields.stream().map(field -> (hiveFieldNames ? field.name().toLowerCase() : field.name()) + " "
+ + getHiveTypeFromAvroType(field.schema(), hiveFieldNames)).collect(Collectors.toList()));
+ }
+ sb.append(StringUtils.join(hiveColumns, ", "));
+ sb.append(") STORED AS ORC");
+ return sb.toString();
+ } else {
+ throw new IllegalArgumentException("Avro schema is of type " + schemaType.getName() + ", not RECORD");
+ }
+ }
+
+
+ public static TypeInfo getOrcField(Schema fieldSchema, boolean hiveFieldNames) throws IllegalArgumentException {
+ Schema.Type fieldType = fieldSchema.getType();
+ LogicalType logicalType = fieldSchema.getLogicalType();
+
+ switch (fieldType) {
+ case INT:
+ case LONG:
+ // Handle logical types
+ if (logicalType != null) {
+ if (LogicalTypes.date().equals(logicalType)) {
+ return TypeInfoFactory.dateTypeInfo;
+ } else if (LogicalTypes.timeMicros().equals(logicalType)) {
+ // Time micros isn't supported by our Record Field types (see AvroTypeUtil)
+ throw new IllegalArgumentException("time-micros is not a supported field type");
+ } else if (LogicalTypes.timeMillis().equals(logicalType)) {
+ return TypeInfoFactory.intTypeInfo;
+ } else if (LogicalTypes.timestampMicros().equals(logicalType)) {
+ // Timestamp micros isn't supported by our Record Field types (see AvroTypeUtil)
+ throw new IllegalArgumentException("timestamp-micros is not a supported field type");
+ } else if (LogicalTypes.timestampMillis().equals(logicalType)) {
+ return TypeInfoFactory.timestampTypeInfo;
+ }
+ }
+ return getPrimitiveOrcTypeFromPrimitiveAvroType(fieldType);
+ case BYTES:
+ // Handle logical types
+ if (logicalType != null) {
+ if (logicalType instanceof LogicalTypes.Decimal) {
+ return TypeInfoFactory.doubleTypeInfo;
+ }
+ }
+ return getPrimitiveOrcTypeFromPrimitiveAvroType(fieldType);
+
+ case BOOLEAN:
+ case DOUBLE:
+ case FLOAT:
+ case STRING:
+ return getPrimitiveOrcTypeFromPrimitiveAvroType(fieldType);
+
+ case UNION:
+ List<Schema> unionFieldSchemas = fieldSchema.getTypes();
+
+ if (unionFieldSchemas != null) {
+ // Ignore null types in union
+ List<TypeInfo> orcFields = unionFieldSchemas.stream().filter(
+ unionFieldSchema -> !Schema.Type.NULL.equals(unionFieldSchema.getType()))
+ .map((it) -> NiFiOrcUtils.getOrcField(it, hiveFieldNames))
+ .collect(Collectors.toList());
+
+ // Flatten the field if the union only has one non-null element
+ if (orcFields.size() == 1) {
+ return orcFields.get(0);
+ } else {
+ return TypeInfoFactory.getUnionTypeInfo(orcFields);
+ }
+ }
+ return null;
+
+ case ARRAY:
+ return TypeInfoFactory.getListTypeInfo(getOrcField(fieldSchema.getElementType(), hiveFieldNames));
+
+ case MAP:
+ return TypeInfoFactory.getMapTypeInfo(
+ getPrimitiveOrcTypeFromPrimitiveAvroType(Schema.Type.STRING),
+ getOrcField(fieldSchema.getValueType(), hiveFieldNames));
+
+ case RECORD:
+ List<Schema.Field> avroFields = fieldSchema.getFields();
+ if (avroFields != null) {
+ List<String> orcFieldNames = new ArrayList<>(avroFields.size());
+ List<TypeInfo> orcFields = new ArrayList<>(avroFields.size());
+ avroFields.forEach(avroField -> {
+ String fieldName = hiveFieldNames ? avroField.name().toLowerCase() : avroField.name();
+ orcFieldNames.add(fieldName);
+ orcFields.add(getOrcField(avroField.schema(), hiveFieldNames));
+ });
+ return TypeInfoFactory.getStructTypeInfo(orcFieldNames, orcFields);
+ }
+ return null;
+
+ case ENUM:
+ // An enum value is just a String for ORC/Hive
+ return getPrimitiveOrcTypeFromPrimitiveAvroType(Schema.Type.STRING);
+
+ default:
+ throw new IllegalArgumentException("Did not recognize Avro type " + fieldType.getName());
+ }
+
+ }
+
+ public static Schema.Type getAvroSchemaTypeOfObject(Object o) {
+ if (o == null) {
+ return Schema.Type.NULL;
+ } else if (o instanceof Integer) {
+ return Schema.Type.INT;
+ } else if (o instanceof Long) {
+ return Schema.Type.LONG;
+ } else if (o instanceof Boolean) {
+ return Schema.Type.BOOLEAN;
+ } else if (o instanceof byte[]) {
+ return Schema.Type.BYTES;
+ } else if (o instanceof Float) {
+ return Schema.Type.FLOAT;
+ } else if (o instanceof Double) {
+ return Schema.Type.DOUBLE;
+ } else if (o instanceof Enum) {
+ return Schema.Type.ENUM;
+ } else if (o instanceof Object[]) {
+ return Schema.Type.ARRAY;
+ } else if (o instanceof List) {
+ return Schema.Type.ARRAY;
+ } else if (o instanceof Map) {
+ return Schema.Type.MAP;
+ } else {
+ throw new IllegalArgumentException("Object of class " + o.getClass() + " is not a supported Avro Type");
+ }
+ }
+
+ public static TypeInfo getPrimitiveOrcTypeFromPrimitiveAvroType(Schema.Type avroType) throws IllegalArgumentException {
+ if (avroType == null) {
+ throw new IllegalArgumentException("Avro type is null");
+ }
+ switch (avroType) {
+ case INT:
+ return TypeInfoFactory.getPrimitiveTypeInfo("int");
+ case LONG:
+ return TypeInfoFactory.getPrimitiveTypeInfo("bigint");
+ case BOOLEAN:
+ return TypeInfoFactory.getPrimitiveTypeInfo("boolean");
+ case BYTES:
+ return TypeInfoFactory.getPrimitiveTypeInfo("binary");
+ case DOUBLE:
+ return TypeInfoFactory.getPrimitiveTypeInfo("double");
+ case FLOAT:
+ return TypeInfoFactory.getPrimitiveTypeInfo("float");
+ case STRING:
+ return TypeInfoFactory.getPrimitiveTypeInfo("string");
+ default:
+ throw new IllegalArgumentException("Avro type " + avroType.getName() + " is not a primitive type");
+ }
+ }
+
+ public static String getHiveTypeFromAvroType(Schema avroSchema, boolean hiveFieldNames) {
+ if (avroSchema == null) {
+ throw new IllegalArgumentException("Avro schema is null");
+ }
+
+ Schema.Type avroType = avroSchema.getType();
+ LogicalType logicalType = avroSchema.getLogicalType();
+
+ switch (avroType) {
+ case INT:
+ if (logicalType != null) {
+ if (LogicalTypes.date().equals(logicalType)) {
+ return "DATE";
+ }
+ // Time-millis has no current corresponding Hive type, perhaps an INTERVAL type when that is fully supported.
+ }
+ return "INT";
+ case LONG:
+ if (logicalType != null) {
+ if (LogicalTypes.timestampMillis().equals(logicalType)) {
+ return "TIMESTAMP";
+ }
+ // Timestamp-micros and time-micros are not supported by our Record Field type system
+ }
+ return "BIGINT";
+ case BOOLEAN:
+ return "BOOLEAN";
+ case BYTES:
+ if (logicalType != null) {
+ if (logicalType instanceof LogicalTypes.Decimal) {
+ return "DOUBLE";
+ }
+ }
+ return "BINARY";
+ case DOUBLE:
+ return "DOUBLE";
+ case FLOAT:
+ return "FLOAT";
+ case STRING:
+ case ENUM:
+ return "STRING";
+ case UNION:
+ List<Schema> unionFieldSchemas = avroSchema.getTypes();
+ if (unionFieldSchemas != null) {
+ List<String> hiveFields = new ArrayList<>();
+ for (Schema unionFieldSchema : unionFieldSchemas) {
+ Schema.Type unionFieldSchemaType = unionFieldSchema.getType();
+ // Ignore null types in union
+ if (!Schema.Type.NULL.equals(unionFieldSchemaType)) {
+ hiveFields.add(getHiveTypeFromAvroType(unionFieldSchema, hiveFieldNames));
+ }
+ }
+ // Flatten the field if the union only has one non-null element
+ return (hiveFields.size() == 1)
+ ? hiveFields.get(0)
+ : "UNIONTYPE<" + StringUtils.join(hiveFields, ", ") + ">";
+
+ }
+ break;
+ case MAP:
+ return "MAP<STRING, " + getHiveTypeFromAvroType(avroSchema.getValueType(), hiveFieldNames) + ">";
+ case ARRAY:
+ return "ARRAY<" + getHiveTypeFromAvroType(avroSchema.getElementType(), hiveFieldNames) + ">";
+ case RECORD:
+ List<Schema.Field> recordFields = avroSchema.getFields();
+ if (recordFields != null) {
+ List<String> hiveFields = recordFields.stream().map(
+ recordField -> (hiveFieldNames ? recordField.name().toLowerCase() : recordField.name()) + ":"
+ + getHiveTypeFromAvroType(recordField.schema(), hiveFieldNames)).collect(Collectors.toList());
+ return "STRUCT<" + StringUtils.join(hiveFields, ", ") + ">";
+ }
+ break;
+ default:
+ break;
+ }
+
+ throw new IllegalArgumentException("Error converting Avro type " + avroType.getName() + " to Hive type");
+ }
+
+
+ public static Writer createWriter(
+ Path path,
+ Configuration conf,
+ TypeInfo orcSchema,
+ long stripeSize,
+ CompressionKind compress,
+ int bufferSize) throws IOException {
+
+ int rowIndexStride = (int) OrcConf.ROW_INDEX_STRIDE.getLong(conf);
+
+ boolean addBlockPadding = OrcConf.BLOCK_PADDING.getBoolean(conf);
+
+ String versionName = OrcConf.WRITE_FORMAT.getString(conf);
+ OrcFile.Version versionValue = (versionName == null)
+ ? OrcFile.Version.CURRENT
+ : OrcFile.Version.byName(versionName);
+
+ OrcFile.EncodingStrategy encodingStrategy;
+ String enString = OrcConf.ENCODING_STRATEGY.getString(conf);
+ if (enString == null) {
+ encodingStrategy = OrcFile.EncodingStrategy.SPEED;
+ } else {
+ encodingStrategy = OrcFile.EncodingStrategy.valueOf(enString);
+ }
+
+ final double paddingTolerance = OrcConf.BLOCK_PADDING_TOLERANCE.getDouble(conf);
+
+ long blockSizeValue = OrcConf.BLOCK_SIZE.getLong(conf);
+
+ double bloomFilterFpp = OrcConf.BLOOM_FILTER_FPP.getDouble(conf);
+
+ ObjectInspector inspector = OrcStruct.createObjectInspector(orcSchema);
+
+ OrcFile.WriterOptions writerOptions = OrcFile.writerOptions(conf)
+ .rowIndexStride(rowIndexStride)
+ .blockPadding(addBlockPadding)
+ .version(versionValue)
+ .encodingStrategy(encodingStrategy)
+ .paddingTolerance(paddingTolerance)
+ .blockSize(blockSizeValue)
+ .bloomFilterFpp(bloomFilterFpp)
+ .memory(getMemoryManager(conf))
+ .inspector(inspector)
+ .stripeSize(stripeSize)
+ .bufferSize(bufferSize)
+ .compress(compress);
+
+ return OrcFile.createWriter(path, writerOptions);
+ }
+
+ private static MemoryManager memoryManager = null;
+
+ private static synchronized MemoryManager getMemoryManager(Configuration conf) {
+ if (memoryManager == null) {
+ memoryManager = new MemoryManagerImpl(conf);
+ }
+ return memoryManager;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/da99f873/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/HiveRecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/HiveRecordWriter.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/HiveRecordWriter.java
new file mode 100644
index 0000000..6edb374
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/HiveRecordWriter.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.streaming;
+
+import com.google.common.base.Joiner;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.Record;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Properties;
+
+public class HiveRecordWriter extends AbstractRecordWriter {
+
+ private RecordReader recordReader;
+ private NiFiRecordSerDe serde;
+ private ComponentLog log;
+
+ public HiveRecordWriter(RecordReader recordReader, ComponentLog log) {
+ super(null);
+ this.recordReader = recordReader;
+ this.log = log;
+ }
+
+ @Override
+ public AbstractSerDe createSerde() throws SerializationError {
+ try {
+ Properties tableProps = table.getMetadata();
+ tableProps.setProperty(serdeConstants.LIST_COLUMNS, Joiner.on(",").join(inputColumns));
+ tableProps.setProperty(serdeConstants.LIST_COLUMN_TYPES, Joiner.on(":").join(inputTypes));
+ NiFiRecordSerDe serde = new NiFiRecordSerDe(recordReader, log);
+ SerDeUtils.initializeSerDe(serde, conf, tableProps, null);
+ this.serde = serde;
+ return serde;
+ } catch (SerDeException e) {
+ throw new SerializationError("Error initializing serde " + NiFiRecordSerDe.class.getName(), e);
+ }
+ }
+
+ @Override
+ public Object encode(byte[] bytes) {
+ throw new UnsupportedOperationException(this.getClass().getName() + " does not support encoding of records via bytes, only via an InputStream");
+ }
+
+ @Override
+ public void write(long writeId, byte[] record) {
+ throw new UnsupportedOperationException(this.getClass().getName() + " does not support writing of records via bytes, only via an InputStream");
+ }
+
+ @Override
+ public void write(long writeId, InputStream inputStream) throws StreamingException {
+ // The inputStream is already available to the recordReader, so just iterate through the records
+ try {
+ Record record;
+ while ((record = recordReader.nextRecord()) != null) {
+ write(writeId, record);
+ }
+ } catch (MalformedRecordException | IOException e) {
+ throw new StreamingException(e.getLocalizedMessage(), e);
+ }
+ }
+
+ public Object encode(Record record) throws SerializationError {
+ try {
+ ObjectWritable blob = new ObjectWritable(record);
+ return serde.deserialize(blob);
+ } catch (SerDeException e) {
+ throw new SerializationError("Unable to convert Record into Object", e);
+ }
+ }
+
+ private void write(long writeId, Record record) throws StreamingException {
+ checkAutoFlush();
+ try {
+ Object encodedRow = encode(record);
+ int bucket = getBucket(encodedRow);
+ List<String> partitionValues = getPartitionValues(encodedRow);
+ getRecordUpdater(partitionValues, bucket).insert(writeId, encodedRow);
+ conn.getConnectionStats().incrementRecordsWritten();
+ } catch (IOException e) {
+ throw new StreamingIOFailure("Error writing record in transaction write id (" + writeId + ")", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/da99f873/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java
new file mode 100644
index 0000000..d4b444a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.streaming;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hive.common.util.HiveStringUtils;
+import org.apache.hive.common.util.TimestampParser;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class NiFiRecordSerDe extends AbstractSerDe {
+
+ protected RecordReader recordReader;
+ protected ComponentLog log;
+ protected List<String> columnNames;
+ protected StructTypeInfo schema;
+ protected SerDeStats stats;
+
+ protected StandardStructObjectInspector cachedObjectInspector;
+ protected TimestampParser tsParser;
+
+ private final static Pattern INTERNAL_PATTERN = Pattern.compile("_col([0-9]+)");
+
+ private Map<String, Integer> fieldPositionMap;
+
+ public NiFiRecordSerDe(RecordReader recordReader, ComponentLog log) {
+ this.recordReader = recordReader;
+ this.log = log;
+ }
+
+ @Override
+ public void initialize(Configuration conf, Properties tbl) throws SerDeException {
+ List<TypeInfo> columnTypes;
+ StructTypeInfo rowTypeInfo;
+
+ log.debug("Initializing NiFiRecordSerDe: {}", tbl.entrySet().toArray());
+
+ // Get column names and types
+ String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS);
+ String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
+ final String columnNameDelimiter = tbl.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ? tbl
+ .getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : String.valueOf(SerDeUtils.COMMA);
+ // all table column names
+ if (columnNameProperty.isEmpty()) {
+ columnNames = new ArrayList<>(0);
+ } else {
+ columnNames = new ArrayList<>(Arrays.asList(columnNameProperty.split(columnNameDelimiter)));
+ }
+
+ // all column types
+ if (columnTypeProperty.isEmpty()) {
+ columnTypes = new ArrayList<>(0);
+ } else {
+ columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
+ }
+
+ log.debug("columns: {}, {}", new Object[]{columnNameProperty, columnNames});
+ log.debug("types: {}, {} ", new Object[]{columnTypeProperty, columnTypes});
+
+ assert (columnNames.size() == columnTypes.size());
+
+ rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes);
+ schema = rowTypeInfo;
+ log.debug("schema : {}", new Object[]{schema});
+ cachedObjectInspector = (StandardStructObjectInspector) TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(rowTypeInfo);
+ tsParser = new TimestampParser(HiveStringUtils.splitAndUnEscape(tbl.getProperty(serdeConstants.TIMESTAMP_FORMATS)));
+ // Populate mapping of field names to column positions
+ try {
+ populateFieldPositionMap();
+ } catch (MalformedRecordException | IOException e) {
+ throw new SerDeException(e);
+ }
+ stats = new SerDeStats();
+ }
+
+ @Override
+ public Class<? extends Writable> getSerializedClass() {
+ return ObjectWritable.class;
+ }
+
+ @Override
+ public Writable serialize(Object o, ObjectInspector objectInspector) throws SerDeException {
+ throw new UnsupportedOperationException("This SerDe only supports deserialization");
+ }
+
+ @Override
+ public SerDeStats getSerDeStats() {
+ return stats;
+ }
+
+ @Override
+ public Object deserialize(Writable writable) throws SerDeException {
+ ObjectWritable t = (ObjectWritable) writable;
+ Record record = (Record) t.get();
+ List<Object> r = new ArrayList<>(Collections.nCopies(columnNames.size(), null));
+ try {
+ RecordSchema recordSchema = record.getSchema();
+ for (RecordField field : recordSchema.getFields()) {
+ String fieldName = field.getFieldName();
+ String normalizedFieldName = fieldName.toLowerCase();
+
+ // Get column position of field name, and set field value there
+ Integer fpos = fieldPositionMap.get(normalizedFieldName);
+ if(fpos == null || fpos == -1) {
+ // This is either a partition column or not a column in the target table, ignore either way
+ continue;
+ }
+ Object currField = extractCurrentField(record, field, schema.getStructFieldTypeInfo(normalizedFieldName));
+ r.set(fpos, currField);
+ }
+ stats.setRowCount(stats.getRowCount() + 1);
+
+ } catch (Exception e) {
+ log.warn("Error [{}] parsing Record [{}].", new Object[]{e.getLocalizedMessage(), t}, e);
+ throw new SerDeException(e);
+ }
+
+ return r;
+ }
+
+ /**
+ * Utility method to extract current expected field from given JsonParser
+ * isTokenCurrent is a boolean variable also passed in, which determines
+ * if the JsonParser is already at the token we expect to read next, or
+ * needs advancing to the next before we read.
+ */
+ private Object extractCurrentField(Record record, RecordField field, TypeInfo fieldTypeInfo) {
+ Object val;
+ String fieldName = (field != null) ? field.getFieldName() : null;
+
+ switch (fieldTypeInfo.getCategory()) {
+ case PRIMITIVE:
+ PrimitiveObjectInspector.PrimitiveCategory primitiveCategory = PrimitiveObjectInspector.PrimitiveCategory.UNKNOWN;
+ if (fieldTypeInfo instanceof PrimitiveTypeInfo) {
+ primitiveCategory = ((PrimitiveTypeInfo) fieldTypeInfo).getPrimitiveCategory();
+ }
+ switch (primitiveCategory) {
+ case INT:
+ case BYTE:
+ case SHORT:
+ val = record.getAsInt(fieldName);
+ break;
+ case LONG:
+ val = record.getAsLong(fieldName);
+ break;
+ case BOOLEAN:
+ val = record.getAsBoolean(fieldName);
+ break;
+ case FLOAT:
+ val = record.getAsFloat(fieldName);
+ break;
+ case DOUBLE:
+ val = record.getAsDouble(fieldName);
+ break;
+ case STRING:
+ case VARCHAR:
+ case CHAR:
+ val = record.getAsString(fieldName);
+ break;
+ case BINARY:
+ val = AvroTypeUtil.convertByteArray(record.getAsArray(fieldName)).array();
+ break;
+ case DATE:
+ val = record.getAsDate(fieldName, field.getDataType().getFormat());
+ break;
+ case TIMESTAMP:
+ val = DataTypeUtils.toTimestamp(record.getValue(fieldName), () -> DataTypeUtils.getDateFormat(field.getDataType().getFormat()), fieldName);
+ break;
+ case DECIMAL:
+ val = record.getAsDouble(fieldName);
+ break;
+ default:
+ throw new IllegalArgumentException("Field " + fieldName + " cannot be converted to unknown type: " + primitiveCategory.name());
+ }
+ break;
+ case LIST:
+ val = Arrays.asList(record.getAsArray(fieldName));
+ break;
+ case MAP:
+ val = DataTypeUtils.convertRecordFieldtoObject(record.getValue(fieldName), field.getDataType());
+ break;
+ case STRUCT:
+ val = DataTypeUtils.convertRecordFieldtoObject(record.getValue(fieldName), field.getDataType());
+ break;
+ default:
+ log.error("Unknown type found: " + fieldTypeInfo + "for field of type: " + field.getDataType().toString());
+ return null;
+ }
+ return val;
+ }
+
+ @Override
+ public ObjectInspector getObjectInspector() {
+ return cachedObjectInspector;
+ }
+
+ private void populateFieldPositionMap() throws MalformedRecordException, IOException {
+ // Populate the mapping of field names to column positions only once
+ fieldPositionMap = new HashMap<>(columnNames.size());
+
+ RecordSchema recordSchema = recordReader.getSchema();
+ for (RecordField field : recordSchema.getFields()) {
+ String fieldName = field.getFieldName();
+ String normalizedFieldName = fieldName.toLowerCase();
+
+ int fpos = schema.getAllStructFieldNames().indexOf(fieldName.toLowerCase());
+ if (fpos == -1) {
+ Matcher m = INTERNAL_PATTERN.matcher(fieldName);
+ fpos = m.matches() ? Integer.parseInt(m.group(1)) : -1;
+
+ log.debug("NPE finding position for field [{}] in schema [{}],"
+ + " attempting to check if it is an internal column name like _col0", new Object[]{fieldName, schema});
+ if (fpos == -1) {
+ // unknown field, we return. We'll continue from the next field onwards. Log at debug level because partition columns will be "unknown fields"
+ log.debug("Field {} is not found in the target table, ignoring...", new Object[]{field.getFieldName()});
+ continue;
+ }
+ // If we get past this, then the column name did match the hive pattern for an internal
+ // column name, such as _col0, etc, so it *MUST* match the schema for the appropriate column.
+ // This means people can't use arbitrary column names such as _col0, and expect us to ignore it
+ // if we find it.
+ if (!fieldName.equalsIgnoreCase(HiveConf.getColumnInternalName(fpos))) {
+ log.error("Hive internal column name {} and position "
+ + "encoding {} for the column name are at odds", new Object[]{fieldName, fpos});
+ throw new IOException("Hive internal column name (" + fieldName
+ + ") and position encoding (" + fpos
+ + ") for the column name are at odds");
+ }
+ // If we reached here, then we were successful at finding an alternate internal
+ // column mapping, and we're about to proceed.
+ }
+ fieldPositionMap.put(normalizedFieldName, fpos);
+ }
+ }
+}