You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by mattyb149 <gi...@git.apache.org> on 2018/06/04 14:31:02 UTC

[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

GitHub user mattyb149 opened a pull request:

    https://github.com/apache/nifi/pull/2755

    NIFI-4963: Added Hive3 bundle

    You'll need to activate the include-hive3 profile when building the assembly, it is currently being excluded by default due to its size (~200 MB).
    
    ### For all changes:
    - [x] Is there a JIRA ticket associated with this PR? Is it referenced 
         in the commit message?
    
    - [x] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
    
    - [x] Has your PR been rebased against the latest commit within the target branch (typically master)?
    
    - [x] Is your initial contribution a single, squashed commit?
    
    ### For code changes:
    - [x] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
    - [x] Have you written or updated unit tests to verify your changes?
    - [x] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? 
    - [x] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
    - [x] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
    - [x] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?
    
    ### For documentation related changes:
    - [x] Have you ensured that format looks appropriate for the output in which it is rendered?
    
    ### Note:
    Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mattyb149/nifi NIFI-4963

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/2755.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2755
    
----
commit 417bc821d277a0556842f5aa734d854ca225147b
Author: Matthew Burgess <ma...@...>
Date:   2018-06-04T14:29:08Z

    NIFI-4963: Added Hive3 bundle

----


---

[GitHub] nifi issue #2755: NIFI-4963: Added Hive3 bundle

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on the issue:

    https://github.com/apache/nifi/pull/2755
  
    Will also review...


---

[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2755#discussion_r193179938
  
    --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java ---
    @@ -0,0 +1,247 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.hive.streaming;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hive.conf.HiveConf;
    +import org.apache.hadoop.hive.serde.serdeConstants;
    +import org.apache.hadoop.hive.serde2.AbstractSerDe;
    +import org.apache.hadoop.hive.serde2.SerDeException;
    +import org.apache.hadoop.hive.serde2.SerDeStats;
    +import org.apache.hadoop.hive.serde2.SerDeUtils;
    +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
    +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
    +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
    +import org.apache.hadoop.io.ObjectWritable;
    +import org.apache.hadoop.io.Writable;
    +import org.apache.hive.common.util.HiveStringUtils;
    +import org.apache.hive.common.util.TimestampParser;
    +import org.apache.nifi.avro.AvroTypeUtil;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.util.DataTypeUtils;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Properties;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +public class NiFiRecordSerDe extends AbstractSerDe {
    +
    +    protected RecordReader recordReader;
    +    protected ComponentLog log;
    +    protected List<String> columnNames;
    +    protected StructTypeInfo schema;
    +
    +    protected StandardStructObjectInspector cachedObjectInspector;
    +    protected TimestampParser tsParser;
    +
    +    private final static Pattern INTERNAL_PATTERN = Pattern.compile("_col([0-9]+)");
    +
    +    public NiFiRecordSerDe(RecordReader recordReader, ComponentLog log) {
    +        this.recordReader = recordReader;
    +        this.log = log;
    +    }
    +
    +    @Override
    +    public void initialize(Configuration conf, Properties tbl) {
    +        List<TypeInfo> columnTypes;
    +        StructTypeInfo rowTypeInfo;
    +
    +        log.debug("Initializing NiFiRecordSerDe: {}", tbl.entrySet().toArray());
    +
    +        // Get column names and types
    +        String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS);
    +        String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
    +        final String columnNameDelimiter = tbl.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ? tbl
    +                .getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : String.valueOf(SerDeUtils.COMMA);
    +        // all table column names
    +        if (columnNameProperty.isEmpty()) {
    +            columnNames = new ArrayList<>(0);
    +        } else {
    +            columnNames = new ArrayList<>(Arrays.asList(columnNameProperty.split(columnNameDelimiter)));
    +        }
    +
    +        // all column types
    +        if (columnTypeProperty.isEmpty()) {
    +            columnTypes = new ArrayList<>(0);
    +        } else {
    +            columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
    +        }
    +
    +        log.debug("columns: {}, {}", new Object[]{columnNameProperty, columnNames});
    +        log.debug("types: {}, {} ", new Object[]{columnTypeProperty, columnTypes});
    +
    +        assert (columnNames.size() == columnTypes.size());
    +
    +        rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes);
    +        schema = rowTypeInfo;
    +        log.debug("schema : {}", new Object[]{schema});
    +        cachedObjectInspector = (StandardStructObjectInspector) TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(rowTypeInfo);
    +        tsParser = new TimestampParser(HiveStringUtils.splitAndUnEscape(tbl.getProperty(serdeConstants.TIMESTAMP_FORMATS)));
    +    }
    +
    +    @Override
    +    public Class<? extends Writable> getSerializedClass() {
    +        return null;
    +    }
    +
    +    @Override
    +    public Writable serialize(Object o, ObjectInspector objectInspector) throws SerDeException {
    +        return null;
    +    }
    +
    +    @Override
    +    public SerDeStats getSerDeStats() {
    +        return null;
    +    }
    +
    +    @Override
    +    public Object deserialize(Writable writable) throws SerDeException {
    +        ObjectWritable t = (ObjectWritable) writable;
    +        Record record = (Record) t.get();
    +        List<Object> r = new ArrayList<>(Collections.nCopies(columnNames.size(), null));
    +        try {
    +            RecordSchema recordSchema = record.getSchema();
    +            for (RecordField field : recordSchema.getFields()) {
    +                String fieldName = field.getFieldName();
    +                int fpos = schema.getAllStructFieldNames().indexOf(fieldName.toLowerCase());
    --- End diff --
    
    No the schema will be the same for all records, I thought about moving it to init but wasn't sure about the Hive internal columns, I can certainly try moving it to init() (unless we'll remove this entirely based on your next comment)


---

[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

Posted by prasanthj <gi...@git.apache.org>.
Github user prasanthj commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2755#discussion_r194301151
  
    --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java ---
    @@ -397,6 +398,9 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
                     }
     
                     hiveStreamingConnection = makeStreamingConnection(options, reader);
    +                // Add shutdown handler with higher priority than FileSystem shutdown hook so that streaming connection gets closed first before
    +                // filesystem close (to avoid ClosedChannelException)
    +                ShutdownHookManager.addShutdownHook(hiveStreamingConnection::close,  FileSystem.SHUTDOWN_HOOK_PRIORITY + 1);
    --- End diff --
    
    You may also want to add uncaught exception handler as I have seen instances where runtime exception or illegal state exception thrown by some other code which if not caught can create broken files. 


---

[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2755#discussion_r193181866
  
    --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java ---
    @@ -0,0 +1,548 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.hive;
    +
    +import com.google.common.util.concurrent.ThreadFactoryBuilder;
    +import org.apache.hadoop.hive.conf.HiveConf;
    +import org.apache.hadoop.security.UserGroupInformation;
    +import org.apache.hive.streaming.ConnectionError;
    +import org.apache.hive.streaming.HiveStreamingConnection;
    +import org.apache.hive.streaming.InvalidTable;
    +import org.apache.hive.streaming.SerializationError;
    +import org.apache.hive.streaming.StreamingConnection;
    +import org.apache.hive.streaming.StreamingException;
    +import org.apache.hive.streaming.StreamingIOFailure;
    +import org.apache.hive.streaming.TransactionError;
    +import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.hadoop.SecurityUtil;
    +import org.apache.nifi.kerberos.KerberosCredentialsService;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processor.util.pattern.DiscontinuedException;
    +import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
    +import org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.nifi.util.hive.AuthenticationFailedException;
    +import org.apache.nifi.util.hive.HiveConfigurator;
    +import org.apache.nifi.util.hive.HiveOptions;
    +import org.apache.hive.streaming.HiveRecordWriter;
    +import org.apache.nifi.util.hive.HiveUtils;
    +import org.apache.nifi.util.hive.ValidationResources;
    +
    +import java.io.BufferedInputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.regex.Pattern;
    +import java.util.stream.Collectors;
    +
    +import static org.apache.nifi.processors.hive.AbstractHive3QLProcessor.ATTR_OUTPUT_TABLES;
    +
    +@Tags({"hive", "streaming", "put", "database", "store"})
    +@CapabilityDescription("This processor uses Hive Streaming to send flow file records to an Apache Hive 3.0+ table. "
    +        + "The partition values are expected to be the 'last' fields of each record, so if the table is partitioned on column A for example, then the last field in "
    +        + "each record should be field A.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "hivestreaming.record.count", description = "This attribute is written on the flow files routed to the 'success' "
    +                + "and 'failure' relationships, and contains the number of records from the incoming flow file. All records in a flow file are committed as a single transaction."),
    +        @WritesAttribute(attribute = "query.output.tables", description = "This attribute is written on the flow files routed to the 'success' "
    +                + "and 'failure' relationships, and contains the target table name in 'databaseName.tableName' format.")
    +})
    +@RequiresInstanceClassLoading
    +public class PutHive3Streaming extends AbstractProcessor {
    +    // Attributes
    +    public static final String HIVE_STREAMING_RECORD_COUNT_ATTR = "hivestreaming.record.count";
    +
    +    private static final String CLIENT_CACHE_DISABLED_PROPERTY = "hcatalog.hive.client.cache.disabled";
    +
    +    // Properties
    +    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
    +            .name("record-reader")
    +            .displayName("Record Reader")
    +            .description("The service for reading records from incoming flow files.")
    +            .identifiesControllerService(RecordReaderFactory.class)
    +            .required(true)
    +            .build();
    +
    +    static final PropertyDescriptor METASTORE_URI = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-metastore-uri")
    +            .displayName("Hive Metastore URI")
    +            .description("The URI location for the Hive Metastore. Note that this is not the location of the Hive Server. The default port for the "
    +                    + "Hive metastore is 9043.")
    +            .required(true)
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.URI_VALIDATOR)
    +            .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))) // no start with / or end with /
    +            .build();
    +
    +    static final PropertyDescriptor HIVE_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder()
    +            .name("hive3-config-resources")
    +            .displayName("Hive Configuration Resources")
    +            .description("A file or comma separated list of files which contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop "
    +                    + "will search the classpath for a 'hive-site.xml' file or will revert to a default configuration. Note that to enable authentication "
    +                    + "with Kerberos e.g., the appropriate properties must be set in the configuration files. Also note that if Max Concurrent Tasks is set "
    +                    + "to a number greater than one, the 'hcatalog.hive.client.cache.disabled' property will be forced to 'true' to avoid concurrency issues. "
    +                    + "Please see the Hive documentation for more details.")
    +            .required(false)
    +            .addValidator(HiveUtils.createMultipleFilesExistValidator())
    +            .build();
    +
    +    static final PropertyDescriptor DB_NAME = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-database-name")
    +            .displayName("Database Name")
    +            .description("The name of the database in which to put the data.")
    +            .required(true)
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-table-name")
    +            .displayName("Table Name")
    +            .description("The name of the database table in which to put the data.")
    +            .required(true)
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor PARTITION_VALUES = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-part-vals")
    +            .displayName("Partition Values")
    +            .description("Specifies a comma-separated list of the values for the partition columns of the target table. If the incoming records all have the same values "
    +                    + "for the partition columns, those values can be entered here, resulting in a performance gain. If specified, this property will often contain "
    +                    + "Expression Language, for example if PartitionRecord is upstream and two partitions 'name' and 'age' are used, then this property can be set to "
    +                    + "${name},${age}.")
    +            .required(false)
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor AUTOCREATE_PARTITIONS = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-autocreate-partition")
    +            .displayName("Auto-Create Partitions")
    +            .description("Flag indicating whether partitions should be automatically created")
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    static final PropertyDescriptor CALL_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-call-timeout")
    +            .displayName("Call Timeout")
    +            .description("The number of seconds allowed for a Hive Streaming operation to complete. A value of 0 indicates the processor should wait indefinitely on operations. "
    +                    + "Note that although this property supports Expression Language, it will not be evaluated against incoming FlowFile attributes.")
    +            .defaultValue("0")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +            .build();
    +
    +    static final PropertyDescriptor DISABLE_STREAMING_OPTIMIZATIONS = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-disable-optimizations")
    +            .displayName("Disable Streaming Optimizations")
    +            .description("Whether to disable streaming optimizations. Disabling streaming optimizations will have significant impact to performance and memory consumption.")
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .allowableValues("true", "false")
    +            .defaultValue("false")
    +            .build();
    +
    +
    +    static final PropertyDescriptor ROLLBACK_ON_FAILURE = RollbackOnFailure.createRollbackOnFailureProperty(
    +            "NOTE: When an error occurred after a Hive streaming transaction which is derived from the same input FlowFile is already committed," +
    +                    " (i.e. a FlowFile contains more records than 'Records per Transaction' and a failure occurred at the 2nd transaction or later)" +
    +                    " then the succeeded records will be transferred to 'success' relationship while the original input FlowFile stays in incoming queue." +
    +                    " Duplicated records can be created for the succeeded ones when the same FlowFile is processed again.");
    +
    +    static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
    +            .name("kerberos-credentials-service")
    +            .displayName("Kerberos Credentials Service")
    +            .description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos")
    +            .identifiesControllerService(KerberosCredentialsService.class)
    +            .required(false)
    +            .build();
    +
    +    // Relationships
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("A FlowFile containing Avro records routed to this relationship after the record has been successfully transmitted to Hive.")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("A FlowFile containing Avro records routed to this relationship if the record could not be transmitted to Hive.")
    +            .build();
    +
    +    public static final Relationship REL_RETRY = new Relationship.Builder()
    +            .name("retry")
    +            .description("The incoming FlowFile is routed to this relationship if its records cannot be transmitted to Hive. Note that "
    +                    + "some records may have been processed successfully, they will be routed (as Avro flow files) to the success relationship. "
    +                    + "The combination of the retry, success, and failure relationships indicate how many records succeeded and/or failed. This "
    +                    + "can be used to provide a retry capability since full rollback is not possible.")
    +            .build();
    +
    +    private List<PropertyDescriptor> propertyDescriptors;
    +    private Set<Relationship> relationships;
    +
    +    protected volatile HiveConfigurator hiveConfigurator = new HiveConfigurator();
    +    protected volatile UserGroupInformation ugi;
    +    protected volatile HiveConf hiveConfig;
    +
    +    protected volatile int callTimeout;
    +    protected ExecutorService callTimeoutPool;
    +    protected volatile boolean rollbackOnFailure;
    +
    +    // Holder of cached Configuration information so validation does not reload the same config over and over
    +    private final AtomicReference<ValidationResources> validationResourceHolder = new AtomicReference<>();
    +
    +    @Override
    +    protected void init(ProcessorInitializationContext context) {
    +        List<PropertyDescriptor> props = new ArrayList<>();
    +        props.add(RECORD_READER);
    +        props.add(METASTORE_URI);
    +        props.add(HIVE_CONFIGURATION_RESOURCES);
    +        props.add(DB_NAME);
    +        props.add(TABLE_NAME);
    +        props.add(PARTITION_VALUES);
    +        props.add(AUTOCREATE_PARTITIONS);
    +        props.add(CALL_TIMEOUT);
    +        props.add(DISABLE_STREAMING_OPTIMIZATIONS);
    +        props.add(ROLLBACK_ON_FAILURE);
    +        props.add(KERBEROS_CREDENTIALS_SERVICE);
    +
    +        propertyDescriptors = Collections.unmodifiableList(props);
    +
    +        Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_SUCCESS);
    +        _relationships.add(REL_FAILURE);
    +        _relationships.add(REL_RETRY);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
    +        boolean confFileProvided = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).isSet();
    +
    +        final List<ValidationResult> problems = new ArrayList<>();
    +
    +        final KerberosCredentialsService credentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
    +
    +        final String resolvedPrincipal = credentialsService != null ? credentialsService.getPrincipal() : null;
    +        final String resolvedKeytab = credentialsService != null ? credentialsService.getKeytab() : null;
    +        if (confFileProvided) {
    +            final String configFiles = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
    +            problems.addAll(hiveConfigurator.validate(configFiles, resolvedPrincipal, resolvedKeytab, validationResourceHolder, getLogger()));
    +        }
    +
    +        return problems;
    +    }
    +
    +    @OnScheduled
    +    public void setup(final ProcessContext context) {
    +        ComponentLog log = getLogger();
    +        rollbackOnFailure = context.getProperty(ROLLBACK_ON_FAILURE).asBoolean();
    +
    +        final String configFiles = context.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue();
    +        hiveConfig = hiveConfigurator.getConfigurationFromFiles(configFiles);
    +
    +        // If more than one concurrent task, force 'hcatalog.hive.client.cache.disabled' to true
    +        if (context.getMaxConcurrentTasks() > 1) {
    +            hiveConfig.setBoolean(CLIENT_CACHE_DISABLED_PROPERTY, true);
    +        }
    +
    +        // add any dynamic properties to the Hive configuration
    +        for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
    +            final PropertyDescriptor descriptor = entry.getKey();
    +            if (descriptor.isDynamic()) {
    +                hiveConfig.set(descriptor.getName(), entry.getValue());
    +            }
    +        }
    +
    +        hiveConfigurator.preload(hiveConfig);
    +
    +        if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
    +            final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
    +
    +            final String resolvedPrincipal = credentialsService.getPrincipal();
    +            final String resolvedKeytab = credentialsService.getKeytab();
    +
    +            log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[]{resolvedPrincipal, resolvedKeytab});
    +            try {
    +                ugi = hiveConfigurator.authenticate(hiveConfig, resolvedPrincipal, resolvedKeytab);
    +            } catch (AuthenticationFailedException ae) {
    +                throw new ProcessException("Kerberos authentication failed for Hive Streaming", ae);
    +            }
    +
    +            log.info("Successfully logged in as principal {} with keytab {}", new Object[]{resolvedPrincipal, resolvedKeytab});
    +        } else {
    +            ugi = null;
    +        }
    +
    +        callTimeout = context.getProperty(CALL_TIMEOUT).evaluateAttributeExpressions().asInteger() * 1000; // milliseconds
    +        String timeoutName = "put-hive3-streaming-%d";
    +        this.callTimeoutPool = Executors.newFixedThreadPool(1,
    +                new ThreadFactoryBuilder().setNameFormat(timeoutName).build());
    +    }
    +
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
    +        final String dbName = context.getProperty(DB_NAME).evaluateAttributeExpressions(flowFile).getValue();
    +        final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
    +
    +        final ComponentLog log = getLogger();
    +        final String metastoreUri = context.getProperty(METASTORE_URI).evaluateAttributeExpressions(flowFile).getValue();
    +
    +        final String partitionValuesString = context.getProperty(PARTITION_VALUES).evaluateAttributeExpressions(flowFile).getValue();
    +        final boolean autoCreatePartitions = context.getProperty(AUTOCREATE_PARTITIONS).asBoolean();
    +        final boolean disableStreamingOptimizations = context.getProperty(DISABLE_STREAMING_OPTIMIZATIONS).asBoolean();
    +
    +        HiveOptions o = new HiveOptions(metastoreUri, dbName, tableName)
    +                .withHiveConf(hiveConfig)
    +                .withAutoCreatePartitions(autoCreatePartitions)
    +                .withCallTimeout(callTimeout)
    +                .withStreamingOptimizations(!disableStreamingOptimizations);
    +
    +        if (!StringUtils.isEmpty(partitionValuesString)) {
    +            List<String> staticPartitionValues = Arrays.stream(partitionValuesString.split(",")).filter(Objects::nonNull).map(String::trim).collect(Collectors.toList());
    +            o = o.withStaticPartitionValues(staticPartitionValues);
    +        }
    +
    +        if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
    +            final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
    +            o = o.withKerberosPrincipal(credentialsService.getPrincipal()).withKerberosKeytab(credentialsService.getKeytab());
    +        }
    +
    +        final HiveOptions options = o;
    +
    +        // Store the original class loader, then explicitly set it to this class's classloader (for use by the Hive Metastore)
    +        ClassLoader originalClassloader = Thread.currentThread().getContextClassLoader();
    +        Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
    +
    +        StreamingConnection hiveStreamingConnection = null;
    +
    +        try (final InputStream rawIn = session.read(flowFile)) {
    +            long processedRecords = 0L;
    +            final RecordReader reader;
    +
    +            try (final BufferedInputStream in = new BufferedInputStream(rawIn)) {
    +
    +                // if we fail to create the RecordReader then we want to route to failure, so we need to
    +                // handle this separately from the other IOExceptions which normally route to retry
    +                try {
    +                    reader = recordReaderFactory.createRecordReader(flowFile, in, getLogger());
    +                } catch (Exception e) {
    +                    throw new RecordReaderFactoryException("Unable to create RecordReader", e);
    +                }
    +
    +                hiveStreamingConnection = makeStreamingConnection(options, reader);
    +
    +                // Write records to Hive streaming, then commit and close
    +                hiveStreamingConnection.beginTransaction();
    +                hiveStreamingConnection.write(in);
    --- End diff --
    
    We'll encourage users of this processor to have flow files with a significant number of records, although I don't think the average use case will have that many rows. Certainly we do not want to commit a single row at a time, but we'd need special logic to keep flow files (and the connection) around until 1M rows is achieved, because we want to wait until the rows are committed before we send the flow files downstream. This is how MergeContent and MergeRecord work, we are suggesting that users combine smaller flow files into larger ones before sending to PutHive3Streaming


---

[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2755#discussion_r193182526
  
    --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/orc/record/ORCHDFSRecordWriter.java ---
    @@ -0,0 +1,105 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.orc.record;
    +
    +import org.apache.avro.Schema;
    +import org.apache.hadoop.hive.ql.io.orc.NiFiOrcUtils;
    +import org.apache.hadoop.hive.ql.io.orc.Writer;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
    +import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter;
    +import org.apache.nifi.serialization.WriteResult;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordSet;
    +
    +import java.io.IOException;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import static org.apache.nifi.processors.orc.PutORC.HIVE_DDL_ATTRIBUTE;
    +
    +/**
    + * HDFSRecordWriter that writes ORC files using Avro as the schema representation.
    + */
    +
    +public class ORCHDFSRecordWriter implements HDFSRecordWriter {
    +
    +    private final Schema avroSchema;
    +    private final TypeInfo orcSchema;
    +    private final Writer orcWriter;
    +    private final String hiveTableName;
    +    private final boolean hiveFieldNames;
    +
    +    public ORCHDFSRecordWriter(final Writer orcWriter, final Schema avroSchema, final String hiveTableName, final boolean hiveFieldNames) {
    +        this.avroSchema = avroSchema;
    +        this.orcWriter = orcWriter;
    +        this.hiveFieldNames = hiveFieldNames;
    +        this.orcSchema = NiFiOrcUtils.getOrcField(avroSchema, this.hiveFieldNames);
    +        this.hiveTableName = hiveTableName;
    +    }
    +
    +    @Override
    +    public void write(final Record record) throws IOException {
    +        List<Schema.Field> fields = avroSchema.getFields();
    --- End diff --
    
    Yes will change


---

[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2755#discussion_r193182416
  
    --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java ---
    @@ -0,0 +1,548 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.hive;
    +
    +import com.google.common.util.concurrent.ThreadFactoryBuilder;
    +import org.apache.hadoop.hive.conf.HiveConf;
    +import org.apache.hadoop.security.UserGroupInformation;
    +import org.apache.hive.streaming.ConnectionError;
    +import org.apache.hive.streaming.HiveStreamingConnection;
    +import org.apache.hive.streaming.InvalidTable;
    +import org.apache.hive.streaming.SerializationError;
    +import org.apache.hive.streaming.StreamingConnection;
    +import org.apache.hive.streaming.StreamingException;
    +import org.apache.hive.streaming.StreamingIOFailure;
    +import org.apache.hive.streaming.TransactionError;
    +import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.hadoop.SecurityUtil;
    +import org.apache.nifi.kerberos.KerberosCredentialsService;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processor.util.pattern.DiscontinuedException;
    +import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
    +import org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.nifi.util.hive.AuthenticationFailedException;
    +import org.apache.nifi.util.hive.HiveConfigurator;
    +import org.apache.nifi.util.hive.HiveOptions;
    +import org.apache.hive.streaming.HiveRecordWriter;
    +import org.apache.nifi.util.hive.HiveUtils;
    +import org.apache.nifi.util.hive.ValidationResources;
    +
    +import java.io.BufferedInputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.regex.Pattern;
    +import java.util.stream.Collectors;
    +
    +import static org.apache.nifi.processors.hive.AbstractHive3QLProcessor.ATTR_OUTPUT_TABLES;
    +
    +@Tags({"hive", "streaming", "put", "database", "store"})
    +@CapabilityDescription("This processor uses Hive Streaming to send flow file records to an Apache Hive 3.0+ table. "
    +        + "The partition values are expected to be the 'last' fields of each record, so if the table is partitioned on column A for example, then the last field in "
    +        + "each record should be field A.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "hivestreaming.record.count", description = "This attribute is written on the flow files routed to the 'success' "
    +                + "and 'failure' relationships, and contains the number of records from the incoming flow file. All records in a flow file are committed as a single transaction."),
    +        @WritesAttribute(attribute = "query.output.tables", description = "This attribute is written on the flow files routed to the 'success' "
    +                + "and 'failure' relationships, and contains the target table name in 'databaseName.tableName' format.")
    +})
    +@RequiresInstanceClassLoading
    +public class PutHive3Streaming extends AbstractProcessor {
    +    // Attributes
    +    public static final String HIVE_STREAMING_RECORD_COUNT_ATTR = "hivestreaming.record.count";
    +
    +    private static final String CLIENT_CACHE_DISABLED_PROPERTY = "hcatalog.hive.client.cache.disabled";
    +
    +    // Properties
    +    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
    +            .name("record-reader")
    +            .displayName("Record Reader")
    +            .description("The service for reading records from incoming flow files.")
    +            .identifiesControllerService(RecordReaderFactory.class)
    +            .required(true)
    +            .build();
    +
    +    static final PropertyDescriptor METASTORE_URI = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-metastore-uri")
    +            .displayName("Hive Metastore URI")
    +            .description("The URI location for the Hive Metastore. Note that this is not the location of the Hive Server. The default port for the "
    +                    + "Hive metastore is 9043.")
    +            .required(true)
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.URI_VALIDATOR)
    +            .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))) // no start with / or end with /
    +            .build();
    +
    +    static final PropertyDescriptor HIVE_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder()
    +            .name("hive3-config-resources")
    +            .displayName("Hive Configuration Resources")
    +            .description("A file or comma separated list of files which contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop "
    +                    + "will search the classpath for a 'hive-site.xml' file or will revert to a default configuration. Note that to enable authentication "
    +                    + "with Kerberos e.g., the appropriate properties must be set in the configuration files. Also note that if Max Concurrent Tasks is set "
    +                    + "to a number greater than one, the 'hcatalog.hive.client.cache.disabled' property will be forced to 'true' to avoid concurrency issues. "
    +                    + "Please see the Hive documentation for more details.")
    +            .required(false)
    +            .addValidator(HiveUtils.createMultipleFilesExistValidator())
    +            .build();
    +
    +    static final PropertyDescriptor DB_NAME = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-database-name")
    +            .displayName("Database Name")
    +            .description("The name of the database in which to put the data.")
    +            .required(true)
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-table-name")
    +            .displayName("Table Name")
    +            .description("The name of the database table in which to put the data.")
    +            .required(true)
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor PARTITION_VALUES = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-part-vals")
    +            .displayName("Partition Values")
    +            .description("Specifies a comma-separated list of the values for the partition columns of the target table. If the incoming records all have the same values "
    +                    + "for the partition columns, those values can be entered here, resulting in a performance gain. If specified, this property will often contain "
    +                    + "Expression Language, for example if PartitionRecord is upstream and two partitions 'name' and 'age' are used, then this property can be set to "
    +                    + "${name},${age}.")
    +            .required(false)
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor AUTOCREATE_PARTITIONS = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-autocreate-partition")
    +            .displayName("Auto-Create Partitions")
    +            .description("Flag indicating whether partitions should be automatically created")
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    static final PropertyDescriptor CALL_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-call-timeout")
    +            .displayName("Call Timeout")
    +            .description("The number of seconds allowed for a Hive Streaming operation to complete. A value of 0 indicates the processor should wait indefinitely on operations. "
    +                    + "Note that although this property supports Expression Language, it will not be evaluated against incoming FlowFile attributes.")
    +            .defaultValue("0")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +            .build();
    +
    +    static final PropertyDescriptor DISABLE_STREAMING_OPTIMIZATIONS = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-disable-optimizations")
    +            .displayName("Disable Streaming Optimizations")
    +            .description("Whether to disable streaming optimizations. Disabling streaming optimizations will have significant impact to performance and memory consumption.")
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .allowableValues("true", "false")
    +            .defaultValue("false")
    +            .build();
    +
    +
    +    static final PropertyDescriptor ROLLBACK_ON_FAILURE = RollbackOnFailure.createRollbackOnFailureProperty(
    +            "NOTE: When an error occurred after a Hive streaming transaction which is derived from the same input FlowFile is already committed," +
    +                    " (i.e. a FlowFile contains more records than 'Records per Transaction' and a failure occurred at the 2nd transaction or later)" +
    +                    " then the succeeded records will be transferred to 'success' relationship while the original input FlowFile stays in incoming queue." +
    +                    " Duplicated records can be created for the succeeded ones when the same FlowFile is processed again.");
    +
    +    static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
    +            .name("kerberos-credentials-service")
    +            .displayName("Kerberos Credentials Service")
    +            .description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos")
    +            .identifiesControllerService(KerberosCredentialsService.class)
    +            .required(false)
    +            .build();
    +
    +    // Relationships
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("A FlowFile containing Avro records routed to this relationship after the record has been successfully transmitted to Hive.")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("A FlowFile containing Avro records routed to this relationship if the record could not be transmitted to Hive.")
    +            .build();
    +
    +    public static final Relationship REL_RETRY = new Relationship.Builder()
    +            .name("retry")
    +            .description("The incoming FlowFile is routed to this relationship if its records cannot be transmitted to Hive. Note that "
    +                    + "some records may have been processed successfully, they will be routed (as Avro flow files) to the success relationship. "
    +                    + "The combination of the retry, success, and failure relationships indicate how many records succeeded and/or failed. This "
    +                    + "can be used to provide a retry capability since full rollback is not possible.")
    +            .build();
    +
    +    private List<PropertyDescriptor> propertyDescriptors;
    +    private Set<Relationship> relationships;
    +
    +    protected volatile HiveConfigurator hiveConfigurator = new HiveConfigurator();
    +    protected volatile UserGroupInformation ugi;
    +    protected volatile HiveConf hiveConfig;
    +
    +    protected volatile int callTimeout;
    +    protected ExecutorService callTimeoutPool;
    +    protected volatile boolean rollbackOnFailure;
    +
    +    // Holder of cached Configuration information so validation does not reload the same config over and over
    +    private final AtomicReference<ValidationResources> validationResourceHolder = new AtomicReference<>();
    +
    +    @Override
    +    protected void init(ProcessorInitializationContext context) {
    +        List<PropertyDescriptor> props = new ArrayList<>();
    +        props.add(RECORD_READER);
    +        props.add(METASTORE_URI);
    +        props.add(HIVE_CONFIGURATION_RESOURCES);
    +        props.add(DB_NAME);
    +        props.add(TABLE_NAME);
    +        props.add(PARTITION_VALUES);
    +        props.add(AUTOCREATE_PARTITIONS);
    +        props.add(CALL_TIMEOUT);
    +        props.add(DISABLE_STREAMING_OPTIMIZATIONS);
    +        props.add(ROLLBACK_ON_FAILURE);
    +        props.add(KERBEROS_CREDENTIALS_SERVICE);
    +
    +        propertyDescriptors = Collections.unmodifiableList(props);
    +
    +        Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_SUCCESS);
    +        _relationships.add(REL_FAILURE);
    +        _relationships.add(REL_RETRY);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
    +        boolean confFileProvided = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).isSet();
    +
    +        final List<ValidationResult> problems = new ArrayList<>();
    +
    +        final KerberosCredentialsService credentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
    +
    +        final String resolvedPrincipal = credentialsService != null ? credentialsService.getPrincipal() : null;
    +        final String resolvedKeytab = credentialsService != null ? credentialsService.getKeytab() : null;
    +        if (confFileProvided) {
    +            final String configFiles = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
    +            problems.addAll(hiveConfigurator.validate(configFiles, resolvedPrincipal, resolvedKeytab, validationResourceHolder, getLogger()));
    +        }
    +
    +        return problems;
    +    }
    +
    +    @OnScheduled
    +    public void setup(final ProcessContext context) {
    +        ComponentLog log = getLogger();
    +        rollbackOnFailure = context.getProperty(ROLLBACK_ON_FAILURE).asBoolean();
    +
    +        final String configFiles = context.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue();
    +        hiveConfig = hiveConfigurator.getConfigurationFromFiles(configFiles);
    +
    +        // If more than one concurrent task, force 'hcatalog.hive.client.cache.disabled' to true
    +        if (context.getMaxConcurrentTasks() > 1) {
    +            hiveConfig.setBoolean(CLIENT_CACHE_DISABLED_PROPERTY, true);
    +        }
    +
    +        // add any dynamic properties to the Hive configuration
    +        for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
    +            final PropertyDescriptor descriptor = entry.getKey();
    +            if (descriptor.isDynamic()) {
    +                hiveConfig.set(descriptor.getName(), entry.getValue());
    +            }
    +        }
    +
    +        hiveConfigurator.preload(hiveConfig);
    +
    +        if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
    +            final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
    +
    +            final String resolvedPrincipal = credentialsService.getPrincipal();
    +            final String resolvedKeytab = credentialsService.getKeytab();
    +
    +            log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[]{resolvedPrincipal, resolvedKeytab});
    +            try {
    +                ugi = hiveConfigurator.authenticate(hiveConfig, resolvedPrincipal, resolvedKeytab);
    +            } catch (AuthenticationFailedException ae) {
    +                throw new ProcessException("Kerberos authentication failed for Hive Streaming", ae);
    +            }
    +
    +            log.info("Successfully logged in as principal {} with keytab {}", new Object[]{resolvedPrincipal, resolvedKeytab});
    +        } else {
    +            ugi = null;
    +        }
    +
    +        callTimeout = context.getProperty(CALL_TIMEOUT).evaluateAttributeExpressions().asInteger() * 1000; // milliseconds
    +        String timeoutName = "put-hive3-streaming-%d";
    +        this.callTimeoutPool = Executors.newFixedThreadPool(1,
    +                new ThreadFactoryBuilder().setNameFormat(timeoutName).build());
    +    }
    +
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
    +        final String dbName = context.getProperty(DB_NAME).evaluateAttributeExpressions(flowFile).getValue();
    +        final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
    +
    +        final ComponentLog log = getLogger();
    +        final String metastoreUri = context.getProperty(METASTORE_URI).evaluateAttributeExpressions(flowFile).getValue();
    +
    +        final String partitionValuesString = context.getProperty(PARTITION_VALUES).evaluateAttributeExpressions(flowFile).getValue();
    +        final boolean autoCreatePartitions = context.getProperty(AUTOCREATE_PARTITIONS).asBoolean();
    +        final boolean disableStreamingOptimizations = context.getProperty(DISABLE_STREAMING_OPTIMIZATIONS).asBoolean();
    +
    +        HiveOptions o = new HiveOptions(metastoreUri, dbName, tableName)
    +                .withHiveConf(hiveConfig)
    +                .withAutoCreatePartitions(autoCreatePartitions)
    +                .withCallTimeout(callTimeout)
    +                .withStreamingOptimizations(!disableStreamingOptimizations);
    +
    +        if (!StringUtils.isEmpty(partitionValuesString)) {
    +            List<String> staticPartitionValues = Arrays.stream(partitionValuesString.split(",")).filter(Objects::nonNull).map(String::trim).collect(Collectors.toList());
    +            o = o.withStaticPartitionValues(staticPartitionValues);
    +        }
    +
    +        if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
    +            final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
    +            o = o.withKerberosPrincipal(credentialsService.getPrincipal()).withKerberosKeytab(credentialsService.getKeytab());
    +        }
    +
    +        final HiveOptions options = o;
    +
    +        // Store the original class loader, then explicitly set it to this class's classloader (for use by the Hive Metastore)
    +        ClassLoader originalClassloader = Thread.currentThread().getContextClassLoader();
    +        Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
    +
    +        StreamingConnection hiveStreamingConnection = null;
    +
    +        try (final InputStream rawIn = session.read(flowFile)) {
    +            long processedRecords = 0L;
    +            final RecordReader reader;
    +
    +            try (final BufferedInputStream in = new BufferedInputStream(rawIn)) {
    +
    +                // if we fail to create the RecordReader then we want to route to failure, so we need to
    +                // handle this separately from the other IOExceptions which normally route to retry
    +                try {
    +                    reader = recordReaderFactory.createRecordReader(flowFile, in, getLogger());
    +                } catch (Exception e) {
    +                    throw new RecordReaderFactoryException("Unable to create RecordReader", e);
    +                }
    +
    +                hiveStreamingConnection = makeStreamingConnection(options, reader);
    +
    +                // Write records to Hive streaming, then commit and close
    +                hiveStreamingConnection.beginTransaction();
    +                hiveStreamingConnection.write(in);
    +                hiveStreamingConnection.commitTransaction();
    +                rawIn.close();
    +
    +                Map<String, String> updateAttributes = new HashMap<>();
    +                updateAttributes.put(HIVE_STREAMING_RECORD_COUNT_ATTR, Long.toString(hiveStreamingConnection.getConnectionStats().getRecordsWritten()));
    +                updateAttributes.put(ATTR_OUTPUT_TABLES, options.getQualifiedTableName());
    +                flowFile = session.putAllAttributes(flowFile, updateAttributes);
    +                session.getProvenanceReporter().send(flowFile, hiveStreamingConnection.getMetastoreUri());
    +                session.transfer(flowFile, REL_SUCCESS);
    +            } catch (TransactionError te) {
    +                if (rollbackOnFailure) {
    +                    throw new ProcessException(te.getLocalizedMessage(), te);
    +                } else {
    +                    throw new ShouldRetryException(te.getLocalizedMessage(), te);
    +                }
    +            } catch (RecordReaderFactoryException rrfe) {
    +                throw new ProcessException(rrfe);
    +            }
    +        } catch (InvalidTable | SerializationError | StreamingIOFailure | IOException e) {
    +            if (rollbackOnFailure) {
    +                abortConnection(hiveStreamingConnection);
    +                throw new ProcessException(e.getLocalizedMessage(), e);
    +            } else {
    +                Map<String, String> updateAttributes = new HashMap<>();
    +                updateAttributes.put(HIVE_STREAMING_RECORD_COUNT_ATTR, Long.toString(hiveStreamingConnection.getConnectionStats().getRecordsWritten()));
    +                updateAttributes.put(ATTR_OUTPUT_TABLES, options.getQualifiedTableName());
    +                flowFile = session.putAllAttributes(flowFile, updateAttributes);
    +                session.transfer(flowFile, REL_FAILURE);
    +            }
    +        } catch (DiscontinuedException e) {
    +            // The input FlowFile processing is discontinued. Keep it in the input queue.
    +            getLogger().warn("Discontinued processing for {} due to {}", new Object[]{flowFile, e}, e);
    +            session.transfer(flowFile, Relationship.SELF);
    +        } catch (ConnectionError ce) {
    +            // If we can't connect to the metastore, yield the processor
    +            context.yield();
    +            throw new ProcessException("A connection to metastore cannot be established", ce);
    +        } catch (ShouldRetryException e) {
    +            // This exception is already a result of adjusting an error, so simply transfer the FlowFile to retry. Still need to abort the txn
    +            getLogger().error(e.getLocalizedMessage(), e);
    +            abortConnection(hiveStreamingConnection);
    +            flowFile = session.penalize(flowFile);
    +            session.transfer(flowFile, REL_RETRY);
    +        } catch (StreamingException se) {
    +            // Handle all other exceptions. These are often record-based exceptions (since Hive will throw a subclass of the exception caught above)
    +            Throwable cause = se.getCause();
    +            if (cause == null) cause = se;
    +            // This is a failure on the incoming data, rollback on failure if specified; otherwise route to failure after penalizing (and abort txn in any case)
    +            if (rollbackOnFailure) {
    +                abortConnection(hiveStreamingConnection);
    +                throw new ProcessException(cause.getLocalizedMessage(), cause);
    +            } else {
    +                flowFile = session.penalize(flowFile);
    +                Map<String, String> updateAttributes = new HashMap<>();
    +                updateAttributes.put(HIVE_STREAMING_RECORD_COUNT_ATTR, Long.toString(hiveStreamingConnection.getConnectionStats().getRecordsWritten()));
    +                updateAttributes.put(ATTR_OUTPUT_TABLES, options.getQualifiedTableName());
    +                flowFile = session.putAllAttributes(flowFile, updateAttributes);
    +                session.transfer(flowFile, REL_FAILURE);
    +            }
    +
    +        } catch (Exception e) {
    +            abortConnection(hiveStreamingConnection);
    +            throw (e instanceof ProcessException) ? (ProcessException) e : new ProcessException(e);
    +        } finally {
    +            closeConnection(hiveStreamingConnection);
    +            // Restore original class loader, might not be necessary but is good practice since the processor task changed it
    +            Thread.currentThread().setContextClassLoader(originalClassloader);
    +        }
    +    }
    +
    +    StreamingConnection makeStreamingConnection(HiveOptions options, RecordReader reader) throws StreamingException {
    +        return HiveStreamingConnection.newBuilder()
    +                .withDatabase(options.getDatabaseName())
    +                .withTable(options.getTableName())
    +                .withStaticPartitionValues(options.getStaticPartitionValues())
    +                .withHiveConf(options.getHiveConf())
    +                .withRecordWriter(new HiveRecordWriter(reader, getLogger()))
    +                .withAgentInfo("NiFi " + this.getClass().getSimpleName() + " [" + this.getIdentifier()
    +                        + "] thread " + Thread.currentThread().getId() + "[" + Thread.currentThread().getName() + "]")
    +                .connect();
    +    }
    +
    +    @OnStopped
    +    public void cleanup() {
    +        validationResourceHolder.set(null); // trigger re-validation of resources
    +
    +        ComponentLog log = getLogger();
    +
    +        if (callTimeoutPool != null) {
    +            callTimeoutPool.shutdown();
    +            try {
    +                while (!callTimeoutPool.isTerminated()) {
    +                    callTimeoutPool.awaitTermination(callTimeout, TimeUnit.MILLISECONDS);
    +                }
    +            } catch (Throwable t) {
    +                log.warn("shutdown interrupted on " + callTimeoutPool, t);
    +            }
    +            callTimeoutPool = null;
    +        }
    +
    +        ugi = null;
    +    }
    +
    +    private void abortAndCloseConnection(StreamingConnection connection) {
    +        try {
    +            abortConnection(connection);
    --- End diff --
    
    Can you describe how to use the interrupt handler? This is left over from the Storm Hive bolt using the old API, only to abort the single transaction currently underway, if there was an error. Apache NiFi will be built on Apache Hive 3.0.x so we'd need HIVE-19772 to be in Hive 3.0.1 (and it be released) in order for us to take advantage of that. Otherwise we'd have to keep this manual abort in the meantime.


---

[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

Posted by prasanthj <gi...@git.apache.org>.
Github user prasanthj commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2755#discussion_r193173275
  
    --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java ---
    @@ -0,0 +1,247 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.hive.streaming;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hive.conf.HiveConf;
    +import org.apache.hadoop.hive.serde.serdeConstants;
    +import org.apache.hadoop.hive.serde2.AbstractSerDe;
    +import org.apache.hadoop.hive.serde2.SerDeException;
    +import org.apache.hadoop.hive.serde2.SerDeStats;
    +import org.apache.hadoop.hive.serde2.SerDeUtils;
    +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
    +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
    +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
    +import org.apache.hadoop.io.ObjectWritable;
    +import org.apache.hadoop.io.Writable;
    +import org.apache.hive.common.util.HiveStringUtils;
    +import org.apache.hive.common.util.TimestampParser;
    +import org.apache.nifi.avro.AvroTypeUtil;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.util.DataTypeUtils;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Properties;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +public class NiFiRecordSerDe extends AbstractSerDe {
    +
    +    protected RecordReader recordReader;
    +    protected ComponentLog log;
    +    protected List<String> columnNames;
    +    protected StructTypeInfo schema;
    +
    +    protected StandardStructObjectInspector cachedObjectInspector;
    +    protected TimestampParser tsParser;
    +
    +    private final static Pattern INTERNAL_PATTERN = Pattern.compile("_col([0-9]+)");
    +
    +    public NiFiRecordSerDe(RecordReader recordReader, ComponentLog log) {
    +        this.recordReader = recordReader;
    +        this.log = log;
    +    }
    +
    +    @Override
    +    public void initialize(Configuration conf, Properties tbl) {
    +        List<TypeInfo> columnTypes;
    +        StructTypeInfo rowTypeInfo;
    +
    +        log.debug("Initializing NiFiRecordSerDe: {}", tbl.entrySet().toArray());
    +
    +        // Get column names and types
    +        String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS);
    +        String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
    +        final String columnNameDelimiter = tbl.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ? tbl
    +                .getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : String.valueOf(SerDeUtils.COMMA);
    +        // all table column names
    +        if (columnNameProperty.isEmpty()) {
    +            columnNames = new ArrayList<>(0);
    +        } else {
    +            columnNames = new ArrayList<>(Arrays.asList(columnNameProperty.split(columnNameDelimiter)));
    +        }
    +
    +        // all column types
    +        if (columnTypeProperty.isEmpty()) {
    +            columnTypes = new ArrayList<>(0);
    +        } else {
    +            columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
    +        }
    +
    +        log.debug("columns: {}, {}", new Object[]{columnNameProperty, columnNames});
    +        log.debug("types: {}, {} ", new Object[]{columnTypeProperty, columnTypes});
    +
    +        assert (columnNames.size() == columnTypes.size());
    +
    +        rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes);
    +        schema = rowTypeInfo;
    +        log.debug("schema : {}", new Object[]{schema});
    +        cachedObjectInspector = (StandardStructObjectInspector) TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(rowTypeInfo);
    +        tsParser = new TimestampParser(HiveStringUtils.splitAndUnEscape(tbl.getProperty(serdeConstants.TIMESTAMP_FORMATS)));
    +    }
    +
    +    @Override
    +    public Class<? extends Writable> getSerializedClass() {
    +        return null;
    +    }
    +
    +    @Override
    +    public Writable serialize(Object o, ObjectInspector objectInspector) throws SerDeException {
    +        return null;
    +    }
    +
    +    @Override
    +    public SerDeStats getSerDeStats() {
    +        return null;
    +    }
    +
    +    @Override
    +    public Object deserialize(Writable writable) throws SerDeException {
    +        ObjectWritable t = (ObjectWritable) writable;
    +        Record record = (Record) t.get();
    +        List<Object> r = new ArrayList<>(Collections.nCopies(columnNames.size(), null));
    +        try {
    +            RecordSchema recordSchema = record.getSchema();
    +            for (RecordField field : recordSchema.getFields()) {
    +                String fieldName = field.getFieldName();
    +                int fpos = schema.getAllStructFieldNames().indexOf(fieldName.toLowerCase());
    --- End diff --
    
    Once the processor is started can the record schema change for different records?
    If not can this can field name to position mapping be moved to init?


---

[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

Posted by prasanthj <gi...@git.apache.org>.
Github user prasanthj commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2755#discussion_r193178966
  
    --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/orc/record/ORCHDFSRecordWriter.java ---
    @@ -0,0 +1,105 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.orc.record;
    +
    +import org.apache.avro.Schema;
    +import org.apache.hadoop.hive.ql.io.orc.NiFiOrcUtils;
    +import org.apache.hadoop.hive.ql.io.orc.Writer;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
    +import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter;
    +import org.apache.nifi.serialization.WriteResult;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordSet;
    +
    +import java.io.IOException;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import static org.apache.nifi.processors.orc.PutORC.HIVE_DDL_ATTRIBUTE;
    +
    +/**
    + * HDFSRecordWriter that writes ORC files using Avro as the schema representation.
    + */
    +
    +public class ORCHDFSRecordWriter implements HDFSRecordWriter {
    +
    +    private final Schema avroSchema;
    +    private final TypeInfo orcSchema;
    +    private final Writer orcWriter;
    +    private final String hiveTableName;
    +    private final boolean hiveFieldNames;
    +
    +    public ORCHDFSRecordWriter(final Writer orcWriter, final Schema avroSchema, final String hiveTableName, final boolean hiveFieldNames) {
    +        this.avroSchema = avroSchema;
    +        this.orcWriter = orcWriter;
    +        this.hiveFieldNames = hiveFieldNames;
    +        this.orcSchema = NiFiOrcUtils.getOrcField(avroSchema, this.hiveFieldNames);
    +        this.hiveTableName = hiveTableName;
    +    }
    +
    +    @Override
    +    public void write(final Record record) throws IOException {
    +        List<Schema.Field> fields = avroSchema.getFields();
    +        if (fields != null) {
    +            Object[] row = new Object[fields.size()];
    --- End diff --
    
    Same for this array. If the fields does not change, can be a single allocation.


---

[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

Posted by prasanthj <gi...@git.apache.org>.
Github user prasanthj commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2755#discussion_r193177877
  
    --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java ---
    @@ -0,0 +1,548 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.hive;
    +
    +import com.google.common.util.concurrent.ThreadFactoryBuilder;
    +import org.apache.hadoop.hive.conf.HiveConf;
    +import org.apache.hadoop.security.UserGroupInformation;
    +import org.apache.hive.streaming.ConnectionError;
    +import org.apache.hive.streaming.HiveStreamingConnection;
    +import org.apache.hive.streaming.InvalidTable;
    +import org.apache.hive.streaming.SerializationError;
    +import org.apache.hive.streaming.StreamingConnection;
    +import org.apache.hive.streaming.StreamingException;
    +import org.apache.hive.streaming.StreamingIOFailure;
    +import org.apache.hive.streaming.TransactionError;
    +import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.hadoop.SecurityUtil;
    +import org.apache.nifi.kerberos.KerberosCredentialsService;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processor.util.pattern.DiscontinuedException;
    +import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
    +import org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.nifi.util.hive.AuthenticationFailedException;
    +import org.apache.nifi.util.hive.HiveConfigurator;
    +import org.apache.nifi.util.hive.HiveOptions;
    +import org.apache.hive.streaming.HiveRecordWriter;
    +import org.apache.nifi.util.hive.HiveUtils;
    +import org.apache.nifi.util.hive.ValidationResources;
    +
    +import java.io.BufferedInputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.regex.Pattern;
    +import java.util.stream.Collectors;
    +
    +import static org.apache.nifi.processors.hive.AbstractHive3QLProcessor.ATTR_OUTPUT_TABLES;
    +
    +@Tags({"hive", "streaming", "put", "database", "store"})
    +@CapabilityDescription("This processor uses Hive Streaming to send flow file records to an Apache Hive 3.0+ table. "
    +        + "The partition values are expected to be the 'last' fields of each record, so if the table is partitioned on column A for example, then the last field in "
    +        + "each record should be field A.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "hivestreaming.record.count", description = "This attribute is written on the flow files routed to the 'success' "
    +                + "and 'failure' relationships, and contains the number of records from the incoming flow file. All records in a flow file are committed as a single transaction."),
    +        @WritesAttribute(attribute = "query.output.tables", description = "This attribute is written on the flow files routed to the 'success' "
    +                + "and 'failure' relationships, and contains the target table name in 'databaseName.tableName' format.")
    +})
    +@RequiresInstanceClassLoading
    +public class PutHive3Streaming extends AbstractProcessor {
    +    // Attributes
    +    public static final String HIVE_STREAMING_RECORD_COUNT_ATTR = "hivestreaming.record.count";
    +
    +    private static final String CLIENT_CACHE_DISABLED_PROPERTY = "hcatalog.hive.client.cache.disabled";
    +
    +    // Properties
    +    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
    +            .name("record-reader")
    +            .displayName("Record Reader")
    +            .description("The service for reading records from incoming flow files.")
    +            .identifiesControllerService(RecordReaderFactory.class)
    +            .required(true)
    +            .build();
    +
    +    static final PropertyDescriptor METASTORE_URI = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-metastore-uri")
    +            .displayName("Hive Metastore URI")
    +            .description("The URI location for the Hive Metastore. Note that this is not the location of the Hive Server. The default port for the "
    +                    + "Hive metastore is 9043.")
    +            .required(true)
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.URI_VALIDATOR)
    +            .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))) // no start with / or end with /
    +            .build();
    +
    +    static final PropertyDescriptor HIVE_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder()
    +            .name("hive3-config-resources")
    +            .displayName("Hive Configuration Resources")
    +            .description("A file or comma separated list of files which contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop "
    +                    + "will search the classpath for a 'hive-site.xml' file or will revert to a default configuration. Note that to enable authentication "
    +                    + "with Kerberos e.g., the appropriate properties must be set in the configuration files. Also note that if Max Concurrent Tasks is set "
    +                    + "to a number greater than one, the 'hcatalog.hive.client.cache.disabled' property will be forced to 'true' to avoid concurrency issues. "
    +                    + "Please see the Hive documentation for more details.")
    +            .required(false)
    +            .addValidator(HiveUtils.createMultipleFilesExistValidator())
    +            .build();
    +
    +    static final PropertyDescriptor DB_NAME = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-database-name")
    +            .displayName("Database Name")
    +            .description("The name of the database in which to put the data.")
    +            .required(true)
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-table-name")
    +            .displayName("Table Name")
    +            .description("The name of the database table in which to put the data.")
    +            .required(true)
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor PARTITION_VALUES = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-part-vals")
    +            .displayName("Partition Values")
    +            .description("Specifies a comma-separated list of the values for the partition columns of the target table. If the incoming records all have the same values "
    +                    + "for the partition columns, those values can be entered here, resulting in a performance gain. If specified, this property will often contain "
    +                    + "Expression Language, for example if PartitionRecord is upstream and two partitions 'name' and 'age' are used, then this property can be set to "
    +                    + "${name},${age}.")
    +            .required(false)
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor AUTOCREATE_PARTITIONS = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-autocreate-partition")
    +            .displayName("Auto-Create Partitions")
    +            .description("Flag indicating whether partitions should be automatically created")
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    static final PropertyDescriptor CALL_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-call-timeout")
    +            .displayName("Call Timeout")
    +            .description("The number of seconds allowed for a Hive Streaming operation to complete. A value of 0 indicates the processor should wait indefinitely on operations. "
    +                    + "Note that although this property supports Expression Language, it will not be evaluated against incoming FlowFile attributes.")
    +            .defaultValue("0")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +            .build();
    +
    +    static final PropertyDescriptor DISABLE_STREAMING_OPTIMIZATIONS = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-disable-optimizations")
    +            .displayName("Disable Streaming Optimizations")
    +            .description("Whether to disable streaming optimizations. Disabling streaming optimizations will have significant impact to performance and memory consumption.")
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .allowableValues("true", "false")
    +            .defaultValue("false")
    +            .build();
    +
    +
    +    static final PropertyDescriptor ROLLBACK_ON_FAILURE = RollbackOnFailure.createRollbackOnFailureProperty(
    +            "NOTE: When an error occurred after a Hive streaming transaction which is derived from the same input FlowFile is already committed," +
    +                    " (i.e. a FlowFile contains more records than 'Records per Transaction' and a failure occurred at the 2nd transaction or later)" +
    +                    " then the succeeded records will be transferred to 'success' relationship while the original input FlowFile stays in incoming queue." +
    +                    " Duplicated records can be created for the succeeded ones when the same FlowFile is processed again.");
    +
    +    static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
    +            .name("kerberos-credentials-service")
    +            .displayName("Kerberos Credentials Service")
    +            .description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos")
    +            .identifiesControllerService(KerberosCredentialsService.class)
    +            .required(false)
    +            .build();
    +
    +    // Relationships
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("A FlowFile containing Avro records routed to this relationship after the record has been successfully transmitted to Hive.")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("A FlowFile containing Avro records routed to this relationship if the record could not be transmitted to Hive.")
    +            .build();
    +
    +    public static final Relationship REL_RETRY = new Relationship.Builder()
    +            .name("retry")
    +            .description("The incoming FlowFile is routed to this relationship if its records cannot be transmitted to Hive. Note that "
    +                    + "some records may have been processed successfully, they will be routed (as Avro flow files) to the success relationship. "
    +                    + "The combination of the retry, success, and failure relationships indicate how many records succeeded and/or failed. This "
    +                    + "can be used to provide a retry capability since full rollback is not possible.")
    +            .build();
    +
    +    private List<PropertyDescriptor> propertyDescriptors;
    +    private Set<Relationship> relationships;
    +
    +    protected volatile HiveConfigurator hiveConfigurator = new HiveConfigurator();
    +    protected volatile UserGroupInformation ugi;
    +    protected volatile HiveConf hiveConfig;
    +
    +    protected volatile int callTimeout;
    +    protected ExecutorService callTimeoutPool;
    +    protected volatile boolean rollbackOnFailure;
    +
    +    // Holder of cached Configuration information so validation does not reload the same config over and over
    +    private final AtomicReference<ValidationResources> validationResourceHolder = new AtomicReference<>();
    +
    +    @Override
    +    protected void init(ProcessorInitializationContext context) {
    +        List<PropertyDescriptor> props = new ArrayList<>();
    +        props.add(RECORD_READER);
    +        props.add(METASTORE_URI);
    +        props.add(HIVE_CONFIGURATION_RESOURCES);
    +        props.add(DB_NAME);
    +        props.add(TABLE_NAME);
    +        props.add(PARTITION_VALUES);
    +        props.add(AUTOCREATE_PARTITIONS);
    +        props.add(CALL_TIMEOUT);
    +        props.add(DISABLE_STREAMING_OPTIMIZATIONS);
    +        props.add(ROLLBACK_ON_FAILURE);
    +        props.add(KERBEROS_CREDENTIALS_SERVICE);
    +
    +        propertyDescriptors = Collections.unmodifiableList(props);
    +
    +        Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_SUCCESS);
    +        _relationships.add(REL_FAILURE);
    +        _relationships.add(REL_RETRY);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
    +        boolean confFileProvided = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).isSet();
    +
    +        final List<ValidationResult> problems = new ArrayList<>();
    +
    +        final KerberosCredentialsService credentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
    +
    +        final String resolvedPrincipal = credentialsService != null ? credentialsService.getPrincipal() : null;
    +        final String resolvedKeytab = credentialsService != null ? credentialsService.getKeytab() : null;
    +        if (confFileProvided) {
    +            final String configFiles = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
    +            problems.addAll(hiveConfigurator.validate(configFiles, resolvedPrincipal, resolvedKeytab, validationResourceHolder, getLogger()));
    +        }
    +
    +        return problems;
    +    }
    +
    +    @OnScheduled
    +    public void setup(final ProcessContext context) {
    +        ComponentLog log = getLogger();
    +        rollbackOnFailure = context.getProperty(ROLLBACK_ON_FAILURE).asBoolean();
    +
    +        final String configFiles = context.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue();
    +        hiveConfig = hiveConfigurator.getConfigurationFromFiles(configFiles);
    +
    +        // If more than one concurrent task, force 'hcatalog.hive.client.cache.disabled' to true
    +        if (context.getMaxConcurrentTasks() > 1) {
    +            hiveConfig.setBoolean(CLIENT_CACHE_DISABLED_PROPERTY, true);
    +        }
    +
    +        // add any dynamic properties to the Hive configuration
    +        for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
    +            final PropertyDescriptor descriptor = entry.getKey();
    +            if (descriptor.isDynamic()) {
    +                hiveConfig.set(descriptor.getName(), entry.getValue());
    +            }
    +        }
    +
    +        hiveConfigurator.preload(hiveConfig);
    +
    +        if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
    +            final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
    +
    +            final String resolvedPrincipal = credentialsService.getPrincipal();
    +            final String resolvedKeytab = credentialsService.getKeytab();
    +
    +            log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[]{resolvedPrincipal, resolvedKeytab});
    +            try {
    +                ugi = hiveConfigurator.authenticate(hiveConfig, resolvedPrincipal, resolvedKeytab);
    +            } catch (AuthenticationFailedException ae) {
    +                throw new ProcessException("Kerberos authentication failed for Hive Streaming", ae);
    +            }
    +
    +            log.info("Successfully logged in as principal {} with keytab {}", new Object[]{resolvedPrincipal, resolvedKeytab});
    +        } else {
    +            ugi = null;
    +        }
    +
    +        callTimeout = context.getProperty(CALL_TIMEOUT).evaluateAttributeExpressions().asInteger() * 1000; // milliseconds
    +        String timeoutName = "put-hive3-streaming-%d";
    +        this.callTimeoutPool = Executors.newFixedThreadPool(1,
    +                new ThreadFactoryBuilder().setNameFormat(timeoutName).build());
    +    }
    +
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
    +        final String dbName = context.getProperty(DB_NAME).evaluateAttributeExpressions(flowFile).getValue();
    +        final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
    +
    +        final ComponentLog log = getLogger();
    +        final String metastoreUri = context.getProperty(METASTORE_URI).evaluateAttributeExpressions(flowFile).getValue();
    +
    +        final String partitionValuesString = context.getProperty(PARTITION_VALUES).evaluateAttributeExpressions(flowFile).getValue();
    +        final boolean autoCreatePartitions = context.getProperty(AUTOCREATE_PARTITIONS).asBoolean();
    +        final boolean disableStreamingOptimizations = context.getProperty(DISABLE_STREAMING_OPTIMIZATIONS).asBoolean();
    +
    +        HiveOptions o = new HiveOptions(metastoreUri, dbName, tableName)
    +                .withHiveConf(hiveConfig)
    +                .withAutoCreatePartitions(autoCreatePartitions)
    +                .withCallTimeout(callTimeout)
    +                .withStreamingOptimizations(!disableStreamingOptimizations);
    +
    +        if (!StringUtils.isEmpty(partitionValuesString)) {
    +            List<String> staticPartitionValues = Arrays.stream(partitionValuesString.split(",")).filter(Objects::nonNull).map(String::trim).collect(Collectors.toList());
    +            o = o.withStaticPartitionValues(staticPartitionValues);
    +        }
    +
    +        if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
    +            final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
    +            o = o.withKerberosPrincipal(credentialsService.getPrincipal()).withKerberosKeytab(credentialsService.getKeytab());
    +        }
    +
    +        final HiveOptions options = o;
    +
    +        // Store the original class loader, then explicitly set it to this class's classloader (for use by the Hive Metastore)
    +        ClassLoader originalClassloader = Thread.currentThread().getContextClassLoader();
    +        Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
    +
    +        StreamingConnection hiveStreamingConnection = null;
    +
    +        try (final InputStream rawIn = session.read(flowFile)) {
    +            long processedRecords = 0L;
    +            final RecordReader reader;
    +
    +            try (final BufferedInputStream in = new BufferedInputStream(rawIn)) {
    +
    +                // if we fail to create the RecordReader then we want to route to failure, so we need to
    +                // handle this separately from the other IOExceptions which normally route to retry
    +                try {
    +                    reader = recordReaderFactory.createRecordReader(flowFile, in, getLogger());
    +                } catch (Exception e) {
    +                    throw new RecordReaderFactoryException("Unable to create RecordReader", e);
    +                }
    +
    +                hiveStreamingConnection = makeStreamingConnection(options, reader);
    +
    +                // Write records to Hive streaming, then commit and close
    +                hiveStreamingConnection.beginTransaction();
    +                hiveStreamingConnection.write(in);
    +                hiveStreamingConnection.commitTransaction();
    +                rawIn.close();
    +
    +                Map<String, String> updateAttributes = new HashMap<>();
    +                updateAttributes.put(HIVE_STREAMING_RECORD_COUNT_ATTR, Long.toString(hiveStreamingConnection.getConnectionStats().getRecordsWritten()));
    +                updateAttributes.put(ATTR_OUTPUT_TABLES, options.getQualifiedTableName());
    +                flowFile = session.putAllAttributes(flowFile, updateAttributes);
    +                session.getProvenanceReporter().send(flowFile, hiveStreamingConnection.getMetastoreUri());
    +                session.transfer(flowFile, REL_SUCCESS);
    +            } catch (TransactionError te) {
    +                if (rollbackOnFailure) {
    +                    throw new ProcessException(te.getLocalizedMessage(), te);
    +                } else {
    +                    throw new ShouldRetryException(te.getLocalizedMessage(), te);
    +                }
    +            } catch (RecordReaderFactoryException rrfe) {
    +                throw new ProcessException(rrfe);
    +            }
    +        } catch (InvalidTable | SerializationError | StreamingIOFailure | IOException e) {
    +            if (rollbackOnFailure) {
    +                abortConnection(hiveStreamingConnection);
    +                throw new ProcessException(e.getLocalizedMessage(), e);
    +            } else {
    +                Map<String, String> updateAttributes = new HashMap<>();
    +                updateAttributes.put(HIVE_STREAMING_RECORD_COUNT_ATTR, Long.toString(hiveStreamingConnection.getConnectionStats().getRecordsWritten()));
    +                updateAttributes.put(ATTR_OUTPUT_TABLES, options.getQualifiedTableName());
    +                flowFile = session.putAllAttributes(flowFile, updateAttributes);
    +                session.transfer(flowFile, REL_FAILURE);
    +            }
    +        } catch (DiscontinuedException e) {
    +            // The input FlowFile processing is discontinued. Keep it in the input queue.
    +            getLogger().warn("Discontinued processing for {} due to {}", new Object[]{flowFile, e}, e);
    +            session.transfer(flowFile, Relationship.SELF);
    +        } catch (ConnectionError ce) {
    +            // If we can't connect to the metastore, yield the processor
    +            context.yield();
    +            throw new ProcessException("A connection to metastore cannot be established", ce);
    +        } catch (ShouldRetryException e) {
    +            // This exception is already a result of adjusting an error, so simply transfer the FlowFile to retry. Still need to abort the txn
    +            getLogger().error(e.getLocalizedMessage(), e);
    +            abortConnection(hiveStreamingConnection);
    +            flowFile = session.penalize(flowFile);
    +            session.transfer(flowFile, REL_RETRY);
    +        } catch (StreamingException se) {
    +            // Handle all other exceptions. These are often record-based exceptions (since Hive will throw a subclass of the exception caught above)
    +            Throwable cause = se.getCause();
    +            if (cause == null) cause = se;
    +            // This is a failure on the incoming data, rollback on failure if specified; otherwise route to failure after penalizing (and abort txn in any case)
    +            if (rollbackOnFailure) {
    +                abortConnection(hiveStreamingConnection);
    +                throw new ProcessException(cause.getLocalizedMessage(), cause);
    +            } else {
    +                flowFile = session.penalize(flowFile);
    +                Map<String, String> updateAttributes = new HashMap<>();
    +                updateAttributes.put(HIVE_STREAMING_RECORD_COUNT_ATTR, Long.toString(hiveStreamingConnection.getConnectionStats().getRecordsWritten()));
    +                updateAttributes.put(ATTR_OUTPUT_TABLES, options.getQualifiedTableName());
    +                flowFile = session.putAllAttributes(flowFile, updateAttributes);
    +                session.transfer(flowFile, REL_FAILURE);
    +            }
    +
    +        } catch (Exception e) {
    +            abortConnection(hiveStreamingConnection);
    +            throw (e instanceof ProcessException) ? (ProcessException) e : new ProcessException(e);
    +        } finally {
    +            closeConnection(hiveStreamingConnection);
    +            // Restore original class loader, might not be necessary but is good practice since the processor task changed it
    +            Thread.currentThread().setContextClassLoader(originalClassloader);
    +        }
    +    }
    +
    +    StreamingConnection makeStreamingConnection(HiveOptions options, RecordReader reader) throws StreamingException {
    +        return HiveStreamingConnection.newBuilder()
    +                .withDatabase(options.getDatabaseName())
    +                .withTable(options.getTableName())
    +                .withStaticPartitionValues(options.getStaticPartitionValues())
    +                .withHiveConf(options.getHiveConf())
    +                .withRecordWriter(new HiveRecordWriter(reader, getLogger()))
    +                .withAgentInfo("NiFi " + this.getClass().getSimpleName() + " [" + this.getIdentifier()
    +                        + "] thread " + Thread.currentThread().getId() + "[" + Thread.currentThread().getName() + "]")
    +                .connect();
    +    }
    +
    +    @OnStopped
    +    public void cleanup() {
    +        validationResourceHolder.set(null); // trigger re-validation of resources
    +
    +        ComponentLog log = getLogger();
    +
    +        if (callTimeoutPool != null) {
    +            callTimeoutPool.shutdown();
    +            try {
    +                while (!callTimeoutPool.isTerminated()) {
    +                    callTimeoutPool.awaitTermination(callTimeout, TimeUnit.MILLISECONDS);
    +                }
    +            } catch (Throwable t) {
    +                log.warn("shutdown interrupted on " + callTimeoutPool, t);
    +            }
    +            callTimeoutPool = null;
    +        }
    +
    +        ugi = null;
    +    }
    +
    +    private void abortAndCloseConnection(StreamingConnection connection) {
    +        try {
    +            abortConnection(connection);
    --- End diff --
    
    When does this get called? https://issues.apache.org/jira/browse/HIVE-19772 adds a shutdown handler and uncaught exception handler to auto close on interrupts or runtime exceptions. To be on safer side NiFi can also add interrupt handler which will invoke abortAndCloseConnection. 


---

[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2755#discussion_r193183490
  
    --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java ---
    @@ -0,0 +1,247 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.hive.streaming;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hive.conf.HiveConf;
    +import org.apache.hadoop.hive.serde.serdeConstants;
    +import org.apache.hadoop.hive.serde2.AbstractSerDe;
    +import org.apache.hadoop.hive.serde2.SerDeException;
    +import org.apache.hadoop.hive.serde2.SerDeStats;
    +import org.apache.hadoop.hive.serde2.SerDeUtils;
    +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
    +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
    +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
    +import org.apache.hadoop.io.ObjectWritable;
    +import org.apache.hadoop.io.Writable;
    +import org.apache.hive.common.util.HiveStringUtils;
    +import org.apache.hive.common.util.TimestampParser;
    +import org.apache.nifi.avro.AvroTypeUtil;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.util.DataTypeUtils;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Properties;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +public class NiFiRecordSerDe extends AbstractSerDe {
    +
    +    protected RecordReader recordReader;
    +    protected ComponentLog log;
    +    protected List<String> columnNames;
    +    protected StructTypeInfo schema;
    +
    +    protected StandardStructObjectInspector cachedObjectInspector;
    +    protected TimestampParser tsParser;
    +
    +    private final static Pattern INTERNAL_PATTERN = Pattern.compile("_col([0-9]+)");
    +
    +    public NiFiRecordSerDe(RecordReader recordReader, ComponentLog log) {
    +        this.recordReader = recordReader;
    +        this.log = log;
    +    }
    +
    +    @Override
    +    public void initialize(Configuration conf, Properties tbl) {
    +        List<TypeInfo> columnTypes;
    +        StructTypeInfo rowTypeInfo;
    +
    +        log.debug("Initializing NiFiRecordSerDe: {}", tbl.entrySet().toArray());
    +
    +        // Get column names and types
    +        String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS);
    +        String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
    +        final String columnNameDelimiter = tbl.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ? tbl
    +                .getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : String.valueOf(SerDeUtils.COMMA);
    +        // all table column names
    +        if (columnNameProperty.isEmpty()) {
    +            columnNames = new ArrayList<>(0);
    +        } else {
    +            columnNames = new ArrayList<>(Arrays.asList(columnNameProperty.split(columnNameDelimiter)));
    +        }
    +
    +        // all column types
    +        if (columnTypeProperty.isEmpty()) {
    +            columnTypes = new ArrayList<>(0);
    +        } else {
    +            columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
    +        }
    +
    +        log.debug("columns: {}, {}", new Object[]{columnNameProperty, columnNames});
    +        log.debug("types: {}, {} ", new Object[]{columnTypeProperty, columnTypes});
    +
    +        assert (columnNames.size() == columnTypes.size());
    +
    +        rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes);
    +        schema = rowTypeInfo;
    +        log.debug("schema : {}", new Object[]{schema});
    +        cachedObjectInspector = (StandardStructObjectInspector) TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(rowTypeInfo);
    +        tsParser = new TimestampParser(HiveStringUtils.splitAndUnEscape(tbl.getProperty(serdeConstants.TIMESTAMP_FORMATS)));
    +    }
    +
    +    @Override
    +    public Class<? extends Writable> getSerializedClass() {
    +        return null;
    +    }
    +
    +    @Override
    +    public Writable serialize(Object o, ObjectInspector objectInspector) throws SerDeException {
    +        return null;
    +    }
    +
    +    @Override
    +    public SerDeStats getSerDeStats() {
    +        return null;
    +    }
    +
    +    @Override
    +    public Object deserialize(Writable writable) throws SerDeException {
    +        ObjectWritable t = (ObjectWritable) writable;
    +        Record record = (Record) t.get();
    +        List<Object> r = new ArrayList<>(Collections.nCopies(columnNames.size(), null));
    +        try {
    +            RecordSchema recordSchema = record.getSchema();
    +            for (RecordField field : recordSchema.getFields()) {
    +                String fieldName = field.getFieldName();
    --- End diff --
    
    For static partitions, the fields and their values will still be in the record. Are they ignored/skipped in that case? 


---

[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

Posted by prasanthj <gi...@git.apache.org>.
Github user prasanthj commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2755#discussion_r193184650
  
    --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java ---
    @@ -0,0 +1,247 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.hive.streaming;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hive.conf.HiveConf;
    +import org.apache.hadoop.hive.serde.serdeConstants;
    +import org.apache.hadoop.hive.serde2.AbstractSerDe;
    +import org.apache.hadoop.hive.serde2.SerDeException;
    +import org.apache.hadoop.hive.serde2.SerDeStats;
    +import org.apache.hadoop.hive.serde2.SerDeUtils;
    +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
    +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
    +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
    +import org.apache.hadoop.io.ObjectWritable;
    +import org.apache.hadoop.io.Writable;
    +import org.apache.hive.common.util.HiveStringUtils;
    +import org.apache.hive.common.util.TimestampParser;
    +import org.apache.nifi.avro.AvroTypeUtil;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.util.DataTypeUtils;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Properties;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +public class NiFiRecordSerDe extends AbstractSerDe {
    +
    +    protected RecordReader recordReader;
    +    protected ComponentLog log;
    +    protected List<String> columnNames;
    +    protected StructTypeInfo schema;
    +
    +    protected StandardStructObjectInspector cachedObjectInspector;
    +    protected TimestampParser tsParser;
    +
    +    private final static Pattern INTERNAL_PATTERN = Pattern.compile("_col([0-9]+)");
    +
    +    public NiFiRecordSerDe(RecordReader recordReader, ComponentLog log) {
    +        this.recordReader = recordReader;
    +        this.log = log;
    +    }
    +
    +    @Override
    +    public void initialize(Configuration conf, Properties tbl) {
    +        List<TypeInfo> columnTypes;
    +        StructTypeInfo rowTypeInfo;
    +
    +        log.debug("Initializing NiFiRecordSerDe: {}", tbl.entrySet().toArray());
    +
    +        // Get column names and types
    +        String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS);
    +        String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
    +        final String columnNameDelimiter = tbl.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ? tbl
    +                .getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : String.valueOf(SerDeUtils.COMMA);
    +        // all table column names
    +        if (columnNameProperty.isEmpty()) {
    +            columnNames = new ArrayList<>(0);
    +        } else {
    +            columnNames = new ArrayList<>(Arrays.asList(columnNameProperty.split(columnNameDelimiter)));
    +        }
    +
    +        // all column types
    +        if (columnTypeProperty.isEmpty()) {
    +            columnTypes = new ArrayList<>(0);
    +        } else {
    +            columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
    +        }
    +
    +        log.debug("columns: {}, {}", new Object[]{columnNameProperty, columnNames});
    +        log.debug("types: {}, {} ", new Object[]{columnTypeProperty, columnTypes});
    +
    +        assert (columnNames.size() == columnTypes.size());
    +
    +        rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes);
    +        schema = rowTypeInfo;
    +        log.debug("schema : {}", new Object[]{schema});
    +        cachedObjectInspector = (StandardStructObjectInspector) TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(rowTypeInfo);
    +        tsParser = new TimestampParser(HiveStringUtils.splitAndUnEscape(tbl.getProperty(serdeConstants.TIMESTAMP_FORMATS)));
    +    }
    +
    +    @Override
    +    public Class<? extends Writable> getSerializedClass() {
    +        return null;
    +    }
    +
    +    @Override
    +    public Writable serialize(Object o, ObjectInspector objectInspector) throws SerDeException {
    +        return null;
    +    }
    +
    +    @Override
    +    public SerDeStats getSerDeStats() {
    +        return null;
    +    }
    +
    +    @Override
    +    public Object deserialize(Writable writable) throws SerDeException {
    +        ObjectWritable t = (ObjectWritable) writable;
    +        Record record = (Record) t.get();
    +        List<Object> r = new ArrayList<>(Collections.nCopies(columnNames.size(), null));
    +        try {
    +            RecordSchema recordSchema = record.getSchema();
    +            for (RecordField field : recordSchema.getFields()) {
    +                String fieldName = field.getFieldName();
    --- End diff --
    
    Ignored. Static partitions doesn't have to be in the record since its already specified in the API.  


---

[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

Posted by prasanthj <gi...@git.apache.org>.
Github user prasanthj commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2755#discussion_r193178839
  
    --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/orc/record/ORCHDFSRecordWriter.java ---
    @@ -0,0 +1,105 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.orc.record;
    +
    +import org.apache.avro.Schema;
    +import org.apache.hadoop.hive.ql.io.orc.NiFiOrcUtils;
    +import org.apache.hadoop.hive.ql.io.orc.Writer;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
    +import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter;
    +import org.apache.nifi.serialization.WriteResult;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordSet;
    +
    +import java.io.IOException;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import static org.apache.nifi.processors.orc.PutORC.HIVE_DDL_ATTRIBUTE;
    +
    +/**
    + * HDFSRecordWriter that writes ORC files using Avro as the schema representation.
    + */
    +
    +public class ORCHDFSRecordWriter implements HDFSRecordWriter {
    +
    +    private final Schema avroSchema;
    +    private final TypeInfo orcSchema;
    +    private final Writer orcWriter;
    +    private final String hiveTableName;
    +    private final boolean hiveFieldNames;
    +
    +    public ORCHDFSRecordWriter(final Writer orcWriter, final Schema avroSchema, final String hiveTableName, final boolean hiveFieldNames) {
    +        this.avroSchema = avroSchema;
    +        this.orcWriter = orcWriter;
    +        this.hiveFieldNames = hiveFieldNames;
    +        this.orcSchema = NiFiOrcUtils.getOrcField(avroSchema, this.hiveFieldNames);
    +        this.hiveTableName = hiveTableName;
    +    }
    +
    +    @Override
    +    public void write(final Record record) throws IOException {
    +        List<Schema.Field> fields = avroSchema.getFields();
    --- End diff --
    
    If fields does not change this can be outside of inner loop?


---

[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

Posted by prasanthj <gi...@git.apache.org>.
Github user prasanthj commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2755#discussion_r193174462
  
    --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java ---
    @@ -0,0 +1,247 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.hive.streaming;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hive.conf.HiveConf;
    +import org.apache.hadoop.hive.serde.serdeConstants;
    +import org.apache.hadoop.hive.serde2.AbstractSerDe;
    +import org.apache.hadoop.hive.serde2.SerDeException;
    +import org.apache.hadoop.hive.serde2.SerDeStats;
    +import org.apache.hadoop.hive.serde2.SerDeUtils;
    +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
    +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
    +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
    +import org.apache.hadoop.io.ObjectWritable;
    +import org.apache.hadoop.io.Writable;
    +import org.apache.hive.common.util.HiveStringUtils;
    +import org.apache.hive.common.util.TimestampParser;
    +import org.apache.nifi.avro.AvroTypeUtil;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.util.DataTypeUtils;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Properties;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +public class NiFiRecordSerDe extends AbstractSerDe {
    +
    +    protected RecordReader recordReader;
    +    protected ComponentLog log;
    +    protected List<String> columnNames;
    +    protected StructTypeInfo schema;
    +
    +    protected StandardStructObjectInspector cachedObjectInspector;
    +    protected TimestampParser tsParser;
    +
    +    private final static Pattern INTERNAL_PATTERN = Pattern.compile("_col([0-9]+)");
    +
    +    public NiFiRecordSerDe(RecordReader recordReader, ComponentLog log) {
    +        this.recordReader = recordReader;
    +        this.log = log;
    +    }
    +
    +    @Override
    +    public void initialize(Configuration conf, Properties tbl) {
    +        List<TypeInfo> columnTypes;
    +        StructTypeInfo rowTypeInfo;
    +
    +        log.debug("Initializing NiFiRecordSerDe: {}", tbl.entrySet().toArray());
    +
    +        // Get column names and types
    +        String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS);
    +        String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
    +        final String columnNameDelimiter = tbl.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ? tbl
    +                .getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : String.valueOf(SerDeUtils.COMMA);
    +        // all table column names
    +        if (columnNameProperty.isEmpty()) {
    +            columnNames = new ArrayList<>(0);
    +        } else {
    +            columnNames = new ArrayList<>(Arrays.asList(columnNameProperty.split(columnNameDelimiter)));
    +        }
    +
    +        // all column types
    +        if (columnTypeProperty.isEmpty()) {
    +            columnTypes = new ArrayList<>(0);
    +        } else {
    +            columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
    +        }
    +
    +        log.debug("columns: {}, {}", new Object[]{columnNameProperty, columnNames});
    +        log.debug("types: {}, {} ", new Object[]{columnTypeProperty, columnTypes});
    +
    +        assert (columnNames.size() == columnTypes.size());
    +
    +        rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes);
    +        schema = rowTypeInfo;
    +        log.debug("schema : {}", new Object[]{schema});
    +        cachedObjectInspector = (StandardStructObjectInspector) TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(rowTypeInfo);
    +        tsParser = new TimestampParser(HiveStringUtils.splitAndUnEscape(tbl.getProperty(serdeConstants.TIMESTAMP_FORMATS)));
    +    }
    +
    +    @Override
    +    public Class<? extends Writable> getSerializedClass() {
    +        return null;
    +    }
    +
    +    @Override
    +    public Writable serialize(Object o, ObjectInspector objectInspector) throws SerDeException {
    +        return null;
    +    }
    +
    +    @Override
    +    public SerDeStats getSerDeStats() {
    +        return null;
    +    }
    +
    +    @Override
    +    public Object deserialize(Writable writable) throws SerDeException {
    +        ObjectWritable t = (ObjectWritable) writable;
    +        Record record = (Record) t.get();
    +        List<Object> r = new ArrayList<>(Collections.nCopies(columnNames.size(), null));
    +        try {
    +            RecordSchema recordSchema = record.getSchema();
    +            for (RecordField field : recordSchema.getFields()) {
    +                String fieldName = field.getFieldName();
    --- End diff --
    
    Can the field name order be different than table schema column names? Field re-ordering to match table schema is really expensive. May be we can have a fast path where if field order exactly matches as that of table schema we don't have to do all of these per field object extraction. 


---

[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

Posted by prasanthj <gi...@git.apache.org>.
Github user prasanthj commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2755#discussion_r193184727
  
    --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java ---
    @@ -0,0 +1,548 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.hive;
    +
    +import com.google.common.util.concurrent.ThreadFactoryBuilder;
    +import org.apache.hadoop.hive.conf.HiveConf;
    +import org.apache.hadoop.security.UserGroupInformation;
    +import org.apache.hive.streaming.ConnectionError;
    +import org.apache.hive.streaming.HiveStreamingConnection;
    +import org.apache.hive.streaming.InvalidTable;
    +import org.apache.hive.streaming.SerializationError;
    +import org.apache.hive.streaming.StreamingConnection;
    +import org.apache.hive.streaming.StreamingException;
    +import org.apache.hive.streaming.StreamingIOFailure;
    +import org.apache.hive.streaming.TransactionError;
    +import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.hadoop.SecurityUtil;
    +import org.apache.nifi.kerberos.KerberosCredentialsService;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processor.util.pattern.DiscontinuedException;
    +import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
    +import org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.nifi.util.hive.AuthenticationFailedException;
    +import org.apache.nifi.util.hive.HiveConfigurator;
    +import org.apache.nifi.util.hive.HiveOptions;
    +import org.apache.hive.streaming.HiveRecordWriter;
    +import org.apache.nifi.util.hive.HiveUtils;
    +import org.apache.nifi.util.hive.ValidationResources;
    +
    +import java.io.BufferedInputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.regex.Pattern;
    +import java.util.stream.Collectors;
    +
    +import static org.apache.nifi.processors.hive.AbstractHive3QLProcessor.ATTR_OUTPUT_TABLES;
    +
    +@Tags({"hive", "streaming", "put", "database", "store"})
    +@CapabilityDescription("This processor uses Hive Streaming to send flow file records to an Apache Hive 3.0+ table. "
    +        + "The partition values are expected to be the 'last' fields of each record, so if the table is partitioned on column A for example, then the last field in "
    +        + "each record should be field A.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "hivestreaming.record.count", description = "This attribute is written on the flow files routed to the 'success' "
    +                + "and 'failure' relationships, and contains the number of records from the incoming flow file. All records in a flow file are committed as a single transaction."),
    +        @WritesAttribute(attribute = "query.output.tables", description = "This attribute is written on the flow files routed to the 'success' "
    +                + "and 'failure' relationships, and contains the target table name in 'databaseName.tableName' format.")
    +})
    +@RequiresInstanceClassLoading
    +public class PutHive3Streaming extends AbstractProcessor {
    +    // Attributes
    +    public static final String HIVE_STREAMING_RECORD_COUNT_ATTR = "hivestreaming.record.count";
    +
    +    private static final String CLIENT_CACHE_DISABLED_PROPERTY = "hcatalog.hive.client.cache.disabled";
    +
    +    // Properties
    +    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
    +            .name("record-reader")
    +            .displayName("Record Reader")
    +            .description("The service for reading records from incoming flow files.")
    +            .identifiesControllerService(RecordReaderFactory.class)
    +            .required(true)
    +            .build();
    +
    +    static final PropertyDescriptor METASTORE_URI = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-metastore-uri")
    +            .displayName("Hive Metastore URI")
    +            .description("The URI location for the Hive Metastore. Note that this is not the location of the Hive Server. The default port for the "
    +                    + "Hive metastore is 9043.")
    +            .required(true)
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.URI_VALIDATOR)
    +            .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))) // no start with / or end with /
    +            .build();
    +
    +    static final PropertyDescriptor HIVE_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder()
    +            .name("hive3-config-resources")
    +            .displayName("Hive Configuration Resources")
    +            .description("A file or comma separated list of files which contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop "
    +                    + "will search the classpath for a 'hive-site.xml' file or will revert to a default configuration. Note that to enable authentication "
    +                    + "with Kerberos e.g., the appropriate properties must be set in the configuration files. Also note that if Max Concurrent Tasks is set "
    +                    + "to a number greater than one, the 'hcatalog.hive.client.cache.disabled' property will be forced to 'true' to avoid concurrency issues. "
    +                    + "Please see the Hive documentation for more details.")
    +            .required(false)
    +            .addValidator(HiveUtils.createMultipleFilesExistValidator())
    +            .build();
    +
    +    static final PropertyDescriptor DB_NAME = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-database-name")
    +            .displayName("Database Name")
    +            .description("The name of the database in which to put the data.")
    +            .required(true)
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-table-name")
    +            .displayName("Table Name")
    +            .description("The name of the database table in which to put the data.")
    +            .required(true)
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor PARTITION_VALUES = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-part-vals")
    +            .displayName("Partition Values")
    +            .description("Specifies a comma-separated list of the values for the partition columns of the target table. If the incoming records all have the same values "
    +                    + "for the partition columns, those values can be entered here, resulting in a performance gain. If specified, this property will often contain "
    +                    + "Expression Language, for example if PartitionRecord is upstream and two partitions 'name' and 'age' are used, then this property can be set to "
    +                    + "${name},${age}.")
    +            .required(false)
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor AUTOCREATE_PARTITIONS = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-autocreate-partition")
    +            .displayName("Auto-Create Partitions")
    +            .description("Flag indicating whether partitions should be automatically created")
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    static final PropertyDescriptor CALL_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-call-timeout")
    +            .displayName("Call Timeout")
    +            .description("The number of seconds allowed for a Hive Streaming operation to complete. A value of 0 indicates the processor should wait indefinitely on operations. "
    +                    + "Note that although this property supports Expression Language, it will not be evaluated against incoming FlowFile attributes.")
    +            .defaultValue("0")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +            .build();
    +
    +    static final PropertyDescriptor DISABLE_STREAMING_OPTIMIZATIONS = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-disable-optimizations")
    +            .displayName("Disable Streaming Optimizations")
    +            .description("Whether to disable streaming optimizations. Disabling streaming optimizations will have significant impact to performance and memory consumption.")
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .allowableValues("true", "false")
    +            .defaultValue("false")
    +            .build();
    +
    +
    +    static final PropertyDescriptor ROLLBACK_ON_FAILURE = RollbackOnFailure.createRollbackOnFailureProperty(
    +            "NOTE: When an error occurred after a Hive streaming transaction which is derived from the same input FlowFile is already committed," +
    +                    " (i.e. a FlowFile contains more records than 'Records per Transaction' and a failure occurred at the 2nd transaction or later)" +
    +                    " then the succeeded records will be transferred to 'success' relationship while the original input FlowFile stays in incoming queue." +
    +                    " Duplicated records can be created for the succeeded ones when the same FlowFile is processed again.");
    +
    +    static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
    +            .name("kerberos-credentials-service")
    +            .displayName("Kerberos Credentials Service")
    +            .description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos")
    +            .identifiesControllerService(KerberosCredentialsService.class)
    +            .required(false)
    +            .build();
    +
    +    // Relationships
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("A FlowFile containing Avro records routed to this relationship after the record has been successfully transmitted to Hive.")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("A FlowFile containing Avro records routed to this relationship if the record could not be transmitted to Hive.")
    +            .build();
    +
    +    public static final Relationship REL_RETRY = new Relationship.Builder()
    +            .name("retry")
    +            .description("The incoming FlowFile is routed to this relationship if its records cannot be transmitted to Hive. Note that "
    +                    + "some records may have been processed successfully, they will be routed (as Avro flow files) to the success relationship. "
    +                    + "The combination of the retry, success, and failure relationships indicate how many records succeeded and/or failed. This "
    +                    + "can be used to provide a retry capability since full rollback is not possible.")
    +            .build();
    +
    +    private List<PropertyDescriptor> propertyDescriptors;
    +    private Set<Relationship> relationships;
    +
    +    protected volatile HiveConfigurator hiveConfigurator = new HiveConfigurator();
    +    protected volatile UserGroupInformation ugi;
    +    protected volatile HiveConf hiveConfig;
    +
    +    protected volatile int callTimeout;
    +    protected ExecutorService callTimeoutPool;
    +    protected volatile boolean rollbackOnFailure;
    +
    +    // Holder of cached Configuration information so validation does not reload the same config over and over
    +    private final AtomicReference<ValidationResources> validationResourceHolder = new AtomicReference<>();
    +
    +    @Override
    +    protected void init(ProcessorInitializationContext context) {
    +        List<PropertyDescriptor> props = new ArrayList<>();
    +        props.add(RECORD_READER);
    +        props.add(METASTORE_URI);
    +        props.add(HIVE_CONFIGURATION_RESOURCES);
    +        props.add(DB_NAME);
    +        props.add(TABLE_NAME);
    +        props.add(PARTITION_VALUES);
    +        props.add(AUTOCREATE_PARTITIONS);
    +        props.add(CALL_TIMEOUT);
    +        props.add(DISABLE_STREAMING_OPTIMIZATIONS);
    +        props.add(ROLLBACK_ON_FAILURE);
    +        props.add(KERBEROS_CREDENTIALS_SERVICE);
    +
    +        propertyDescriptors = Collections.unmodifiableList(props);
    +
    +        Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_SUCCESS);
    +        _relationships.add(REL_FAILURE);
    +        _relationships.add(REL_RETRY);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
    +        boolean confFileProvided = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).isSet();
    +
    +        final List<ValidationResult> problems = new ArrayList<>();
    +
    +        final KerberosCredentialsService credentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
    +
    +        final String resolvedPrincipal = credentialsService != null ? credentialsService.getPrincipal() : null;
    +        final String resolvedKeytab = credentialsService != null ? credentialsService.getKeytab() : null;
    +        if (confFileProvided) {
    +            final String configFiles = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
    +            problems.addAll(hiveConfigurator.validate(configFiles, resolvedPrincipal, resolvedKeytab, validationResourceHolder, getLogger()));
    +        }
    +
    +        return problems;
    +    }
    +
    +    @OnScheduled
    +    public void setup(final ProcessContext context) {
    +        ComponentLog log = getLogger();
    +        rollbackOnFailure = context.getProperty(ROLLBACK_ON_FAILURE).asBoolean();
    +
    +        final String configFiles = context.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue();
    +        hiveConfig = hiveConfigurator.getConfigurationFromFiles(configFiles);
    +
    +        // If more than one concurrent task, force 'hcatalog.hive.client.cache.disabled' to true
    +        if (context.getMaxConcurrentTasks() > 1) {
    +            hiveConfig.setBoolean(CLIENT_CACHE_DISABLED_PROPERTY, true);
    +        }
    +
    +        // add any dynamic properties to the Hive configuration
    +        for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
    +            final PropertyDescriptor descriptor = entry.getKey();
    +            if (descriptor.isDynamic()) {
    +                hiveConfig.set(descriptor.getName(), entry.getValue());
    +            }
    +        }
    +
    +        hiveConfigurator.preload(hiveConfig);
    +
    +        if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
    +            final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
    +
    +            final String resolvedPrincipal = credentialsService.getPrincipal();
    +            final String resolvedKeytab = credentialsService.getKeytab();
    +
    +            log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[]{resolvedPrincipal, resolvedKeytab});
    +            try {
    +                ugi = hiveConfigurator.authenticate(hiveConfig, resolvedPrincipal, resolvedKeytab);
    +            } catch (AuthenticationFailedException ae) {
    +                throw new ProcessException("Kerberos authentication failed for Hive Streaming", ae);
    +            }
    +
    +            log.info("Successfully logged in as principal {} with keytab {}", new Object[]{resolvedPrincipal, resolvedKeytab});
    +        } else {
    +            ugi = null;
    +        }
    +
    +        callTimeout = context.getProperty(CALL_TIMEOUT).evaluateAttributeExpressions().asInteger() * 1000; // milliseconds
    +        String timeoutName = "put-hive3-streaming-%d";
    +        this.callTimeoutPool = Executors.newFixedThreadPool(1,
    +                new ThreadFactoryBuilder().setNameFormat(timeoutName).build());
    +    }
    +
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
    +        final String dbName = context.getProperty(DB_NAME).evaluateAttributeExpressions(flowFile).getValue();
    +        final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
    +
    +        final ComponentLog log = getLogger();
    +        final String metastoreUri = context.getProperty(METASTORE_URI).evaluateAttributeExpressions(flowFile).getValue();
    +
    +        final String partitionValuesString = context.getProperty(PARTITION_VALUES).evaluateAttributeExpressions(flowFile).getValue();
    +        final boolean autoCreatePartitions = context.getProperty(AUTOCREATE_PARTITIONS).asBoolean();
    +        final boolean disableStreamingOptimizations = context.getProperty(DISABLE_STREAMING_OPTIMIZATIONS).asBoolean();
    +
    +        HiveOptions o = new HiveOptions(metastoreUri, dbName, tableName)
    +                .withHiveConf(hiveConfig)
    +                .withAutoCreatePartitions(autoCreatePartitions)
    +                .withCallTimeout(callTimeout)
    +                .withStreamingOptimizations(!disableStreamingOptimizations);
    +
    +        if (!StringUtils.isEmpty(partitionValuesString)) {
    +            List<String> staticPartitionValues = Arrays.stream(partitionValuesString.split(",")).filter(Objects::nonNull).map(String::trim).collect(Collectors.toList());
    +            o = o.withStaticPartitionValues(staticPartitionValues);
    +        }
    +
    +        if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
    +            final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
    +            o = o.withKerberosPrincipal(credentialsService.getPrincipal()).withKerberosKeytab(credentialsService.getKeytab());
    +        }
    +
    +        final HiveOptions options = o;
    +
    +        // Store the original class loader, then explicitly set it to this class's classloader (for use by the Hive Metastore)
    +        ClassLoader originalClassloader = Thread.currentThread().getContextClassLoader();
    +        Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
    +
    +        StreamingConnection hiveStreamingConnection = null;
    +
    +        try (final InputStream rawIn = session.read(flowFile)) {
    +            long processedRecords = 0L;
    +            final RecordReader reader;
    +
    +            try (final BufferedInputStream in = new BufferedInputStream(rawIn)) {
    +
    +                // if we fail to create the RecordReader then we want to route to failure, so we need to
    +                // handle this separately from the other IOExceptions which normally route to retry
    +                try {
    +                    reader = recordReaderFactory.createRecordReader(flowFile, in, getLogger());
    +                } catch (Exception e) {
    +                    throw new RecordReaderFactoryException("Unable to create RecordReader", e);
    +                }
    +
    +                hiveStreamingConnection = makeStreamingConnection(options, reader);
    +
    +                // Write records to Hive streaming, then commit and close
    +                hiveStreamingConnection.beginTransaction();
    +                hiveStreamingConnection.write(in);
    --- End diff --
    
    Sounds good. 


---

[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2755#discussion_r193181171
  
    --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java ---
    @@ -0,0 +1,247 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.hive.streaming;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hive.conf.HiveConf;
    +import org.apache.hadoop.hive.serde.serdeConstants;
    +import org.apache.hadoop.hive.serde2.AbstractSerDe;
    +import org.apache.hadoop.hive.serde2.SerDeException;
    +import org.apache.hadoop.hive.serde2.SerDeStats;
    +import org.apache.hadoop.hive.serde2.SerDeUtils;
    +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
    +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
    +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
    +import org.apache.hadoop.io.ObjectWritable;
    +import org.apache.hadoop.io.Writable;
    +import org.apache.hive.common.util.HiveStringUtils;
    +import org.apache.hive.common.util.TimestampParser;
    +import org.apache.nifi.avro.AvroTypeUtil;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.util.DataTypeUtils;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Properties;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +public class NiFiRecordSerDe extends AbstractSerDe {
    +
    +    protected RecordReader recordReader;
    +    protected ComponentLog log;
    +    protected List<String> columnNames;
    +    protected StructTypeInfo schema;
    +
    +    protected StandardStructObjectInspector cachedObjectInspector;
    +    protected TimestampParser tsParser;
    +
    +    private final static Pattern INTERNAL_PATTERN = Pattern.compile("_col([0-9]+)");
    +
    +    public NiFiRecordSerDe(RecordReader recordReader, ComponentLog log) {
    +        this.recordReader = recordReader;
    +        this.log = log;
    +    }
    +
    +    @Override
    +    public void initialize(Configuration conf, Properties tbl) {
    +        List<TypeInfo> columnTypes;
    +        StructTypeInfo rowTypeInfo;
    +
    +        log.debug("Initializing NiFiRecordSerDe: {}", tbl.entrySet().toArray());
    +
    +        // Get column names and types
    +        String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS);
    +        String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
    +        final String columnNameDelimiter = tbl.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ? tbl
    +                .getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : String.valueOf(SerDeUtils.COMMA);
    +        // all table column names
    +        if (columnNameProperty.isEmpty()) {
    +            columnNames = new ArrayList<>(0);
    +        } else {
    +            columnNames = new ArrayList<>(Arrays.asList(columnNameProperty.split(columnNameDelimiter)));
    +        }
    +
    +        // all column types
    +        if (columnTypeProperty.isEmpty()) {
    +            columnTypes = new ArrayList<>(0);
    +        } else {
    +            columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
    +        }
    +
    +        log.debug("columns: {}, {}", new Object[]{columnNameProperty, columnNames});
    +        log.debug("types: {}, {} ", new Object[]{columnTypeProperty, columnTypes});
    +
    +        assert (columnNames.size() == columnTypes.size());
    +
    +        rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes);
    +        schema = rowTypeInfo;
    +        log.debug("schema : {}", new Object[]{schema});
    +        cachedObjectInspector = (StandardStructObjectInspector) TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(rowTypeInfo);
    +        tsParser = new TimestampParser(HiveStringUtils.splitAndUnEscape(tbl.getProperty(serdeConstants.TIMESTAMP_FORMATS)));
    +    }
    +
    +    @Override
    +    public Class<? extends Writable> getSerializedClass() {
    +        return null;
    +    }
    +
    +    @Override
    +    public Writable serialize(Object o, ObjectInspector objectInspector) throws SerDeException {
    +        return null;
    +    }
    +
    +    @Override
    +    public SerDeStats getSerDeStats() {
    +        return null;
    +    }
    +
    +    @Override
    +    public Object deserialize(Writable writable) throws SerDeException {
    +        ObjectWritable t = (ObjectWritable) writable;
    +        Record record = (Record) t.get();
    +        List<Object> r = new ArrayList<>(Collections.nCopies(columnNames.size(), null));
    +        try {
    +            RecordSchema recordSchema = record.getSchema();
    +            for (RecordField field : recordSchema.getFields()) {
    +                String fieldName = field.getFieldName();
    --- End diff --
    
    We can require the incoming field names to be in the same order as the table schema column names, but historically we at least didn't care about the partition columns, they were removed from the object values as they had to be provided separately. If we require the partition columns be the last columns in the record, then we'll have to pay that expense somewhere, either upstream in NiFi or here. If we move the field name to position mapping to init, then we should only have to pay the price once per file and not per record?


---

[GitHub] nifi issue #2755: NIFI-4963: Added Hive3 bundle

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on the issue:

    https://github.com/apache/nifi/pull/2755
  
    Tested this out and looks good, going to merge


---

[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2755#discussion_r194413976
  
    --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java ---
    @@ -397,6 +398,9 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
                     }
     
                     hiveStreamingConnection = makeStreamingConnection(options, reader);
    +                // Add shutdown handler with higher priority than FileSystem shutdown hook so that streaming connection gets closed first before
    +                // filesystem close (to avoid ClosedChannelException)
    +                ShutdownHookManager.addShutdownHook(hiveStreamingConnection::close,  FileSystem.SHUTDOWN_HOOK_PRIORITY + 1);
    --- End diff --
    
    We have a full exception handling thing in the framework, but I will need to abort and close on other uncaught exceptions, so I changed the catch(Exception e) to catch(Throwable t)


---

[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2755#discussion_r193185676
  
    --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java ---
    @@ -0,0 +1,247 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.hive.streaming;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hive.conf.HiveConf;
    +import org.apache.hadoop.hive.serde.serdeConstants;
    +import org.apache.hadoop.hive.serde2.AbstractSerDe;
    +import org.apache.hadoop.hive.serde2.SerDeException;
    +import org.apache.hadoop.hive.serde2.SerDeStats;
    +import org.apache.hadoop.hive.serde2.SerDeUtils;
    +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
    +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
    +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
    +import org.apache.hadoop.io.ObjectWritable;
    +import org.apache.hadoop.io.Writable;
    +import org.apache.hive.common.util.HiveStringUtils;
    +import org.apache.hive.common.util.TimestampParser;
    +import org.apache.nifi.avro.AvroTypeUtil;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.util.DataTypeUtils;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Properties;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +public class NiFiRecordSerDe extends AbstractSerDe {
    +
    +    protected RecordReader recordReader;
    +    protected ComponentLog log;
    +    protected List<String> columnNames;
    +    protected StructTypeInfo schema;
    +
    +    protected StandardStructObjectInspector cachedObjectInspector;
    +    protected TimestampParser tsParser;
    +
    +    private final static Pattern INTERNAL_PATTERN = Pattern.compile("_col([0-9]+)");
    +
    +    public NiFiRecordSerDe(RecordReader recordReader, ComponentLog log) {
    +        this.recordReader = recordReader;
    +        this.log = log;
    +    }
    +
    +    @Override
    +    public void initialize(Configuration conf, Properties tbl) {
    +        List<TypeInfo> columnTypes;
    +        StructTypeInfo rowTypeInfo;
    +
    +        log.debug("Initializing NiFiRecordSerDe: {}", tbl.entrySet().toArray());
    +
    +        // Get column names and types
    +        String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS);
    +        String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
    +        final String columnNameDelimiter = tbl.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ? tbl
    +                .getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : String.valueOf(SerDeUtils.COMMA);
    +        // all table column names
    +        if (columnNameProperty.isEmpty()) {
    +            columnNames = new ArrayList<>(0);
    +        } else {
    +            columnNames = new ArrayList<>(Arrays.asList(columnNameProperty.split(columnNameDelimiter)));
    +        }
    +
    +        // all column types
    +        if (columnTypeProperty.isEmpty()) {
    +            columnTypes = new ArrayList<>(0);
    +        } else {
    +            columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
    +        }
    +
    +        log.debug("columns: {}, {}", new Object[]{columnNameProperty, columnNames});
    +        log.debug("types: {}, {} ", new Object[]{columnTypeProperty, columnTypes});
    +
    +        assert (columnNames.size() == columnTypes.size());
    +
    +        rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes);
    +        schema = rowTypeInfo;
    +        log.debug("schema : {}", new Object[]{schema});
    +        cachedObjectInspector = (StandardStructObjectInspector) TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(rowTypeInfo);
    +        tsParser = new TimestampParser(HiveStringUtils.splitAndUnEscape(tbl.getProperty(serdeConstants.TIMESTAMP_FORMATS)));
    +    }
    +
    +    @Override
    +    public Class<? extends Writable> getSerializedClass() {
    +        return null;
    +    }
    +
    +    @Override
    +    public Writable serialize(Object o, ObjectInspector objectInspector) throws SerDeException {
    +        return null;
    +    }
    +
    +    @Override
    +    public SerDeStats getSerDeStats() {
    +        return null;
    +    }
    +
    +    @Override
    +    public Object deserialize(Writable writable) throws SerDeException {
    +        ObjectWritable t = (ObjectWritable) writable;
    +        Record record = (Record) t.get();
    +        List<Object> r = new ArrayList<>(Collections.nCopies(columnNames.size(), null));
    +        try {
    +            RecordSchema recordSchema = record.getSchema();
    +            for (RecordField field : recordSchema.getFields()) {
    +                String fieldName = field.getFieldName();
    --- End diff --
    
    True, I'm just saying in our Record API the partition columns will almost always be in the record. The generic use case is to use the PartitionRecord processor upstream, which is basically a GROUP BY, it takes one flow file in and sends out one for each unique partition value, so all the records in a flow file will have the same partition value (and it is added as metadata so you can use it later). In PutHive3Streaming you can specify the partition value as the value from the metadata, but the partition columns are still in the records. 


---

[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2755#discussion_r193412766
  
    --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java ---
    @@ -0,0 +1,247 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.hive.streaming;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hive.conf.HiveConf;
    +import org.apache.hadoop.hive.serde.serdeConstants;
    +import org.apache.hadoop.hive.serde2.AbstractSerDe;
    +import org.apache.hadoop.hive.serde2.SerDeException;
    +import org.apache.hadoop.hive.serde2.SerDeStats;
    +import org.apache.hadoop.hive.serde2.SerDeUtils;
    +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
    +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
    +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
    +import org.apache.hadoop.io.ObjectWritable;
    +import org.apache.hadoop.io.Writable;
    +import org.apache.hive.common.util.HiveStringUtils;
    +import org.apache.hive.common.util.TimestampParser;
    +import org.apache.nifi.avro.AvroTypeUtil;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.util.DataTypeUtils;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Properties;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +public class NiFiRecordSerDe extends AbstractSerDe {
    +
    +    protected RecordReader recordReader;
    +    protected ComponentLog log;
    +    protected List<String> columnNames;
    +    protected StructTypeInfo schema;
    +
    +    protected StandardStructObjectInspector cachedObjectInspector;
    +    protected TimestampParser tsParser;
    +
    +    private final static Pattern INTERNAL_PATTERN = Pattern.compile("_col([0-9]+)");
    +
    +    public NiFiRecordSerDe(RecordReader recordReader, ComponentLog log) {
    +        this.recordReader = recordReader;
    +        this.log = log;
    +    }
    +
    +    @Override
    +    public void initialize(Configuration conf, Properties tbl) {
    +        List<TypeInfo> columnTypes;
    +        StructTypeInfo rowTypeInfo;
    +
    +        log.debug("Initializing NiFiRecordSerDe: {}", tbl.entrySet().toArray());
    +
    +        // Get column names and types
    +        String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS);
    +        String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
    +        final String columnNameDelimiter = tbl.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ? tbl
    +                .getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : String.valueOf(SerDeUtils.COMMA);
    +        // all table column names
    +        if (columnNameProperty.isEmpty()) {
    +            columnNames = new ArrayList<>(0);
    +        } else {
    +            columnNames = new ArrayList<>(Arrays.asList(columnNameProperty.split(columnNameDelimiter)));
    +        }
    +
    +        // all column types
    +        if (columnTypeProperty.isEmpty()) {
    +            columnTypes = new ArrayList<>(0);
    +        } else {
    +            columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
    +        }
    +
    +        log.debug("columns: {}, {}", new Object[]{columnNameProperty, columnNames});
    +        log.debug("types: {}, {} ", new Object[]{columnTypeProperty, columnTypes});
    +
    +        assert (columnNames.size() == columnTypes.size());
    +
    +        rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes);
    +        schema = rowTypeInfo;
    +        log.debug("schema : {}", new Object[]{schema});
    +        cachedObjectInspector = (StandardStructObjectInspector) TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(rowTypeInfo);
    +        tsParser = new TimestampParser(HiveStringUtils.splitAndUnEscape(tbl.getProperty(serdeConstants.TIMESTAMP_FORMATS)));
    +    }
    +
    +    @Override
    +    public Class<? extends Writable> getSerializedClass() {
    +        return null;
    +    }
    +
    +    @Override
    +    public Writable serialize(Object o, ObjectInspector objectInspector) throws SerDeException {
    +        return null;
    +    }
    +
    +    @Override
    +    public SerDeStats getSerDeStats() {
    +        return null;
    +    }
    +
    +    @Override
    +    public Object deserialize(Writable writable) throws SerDeException {
    +        ObjectWritable t = (ObjectWritable) writable;
    +        Record record = (Record) t.get();
    +        List<Object> r = new ArrayList<>(Collections.nCopies(columnNames.size(), null));
    +        try {
    +            RecordSchema recordSchema = record.getSchema();
    +            for (RecordField field : recordSchema.getFields()) {
    +                String fieldName = field.getFieldName();
    +                int fpos = schema.getAllStructFieldNames().indexOf(fieldName.toLowerCase());
    --- End diff --
    
    I took this code from the StrictJsonWriter, it tries to find internal column names from the JSON fields. I wasn't sure what it was used for, so I kept it in so as to make the behavior consistent. Happy to remove it if we don't need it.


---

[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2755#discussion_r193413841
  
    --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java ---
    @@ -0,0 +1,548 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.hive;
    +
    +import com.google.common.util.concurrent.ThreadFactoryBuilder;
    +import org.apache.hadoop.hive.conf.HiveConf;
    +import org.apache.hadoop.security.UserGroupInformation;
    +import org.apache.hive.streaming.ConnectionError;
    +import org.apache.hive.streaming.HiveStreamingConnection;
    +import org.apache.hive.streaming.InvalidTable;
    +import org.apache.hive.streaming.SerializationError;
    +import org.apache.hive.streaming.StreamingConnection;
    +import org.apache.hive.streaming.StreamingException;
    +import org.apache.hive.streaming.StreamingIOFailure;
    +import org.apache.hive.streaming.TransactionError;
    +import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.hadoop.SecurityUtil;
    +import org.apache.nifi.kerberos.KerberosCredentialsService;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processor.util.pattern.DiscontinuedException;
    +import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
    +import org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.nifi.util.hive.AuthenticationFailedException;
    +import org.apache.nifi.util.hive.HiveConfigurator;
    +import org.apache.nifi.util.hive.HiveOptions;
    +import org.apache.hive.streaming.HiveRecordWriter;
    +import org.apache.nifi.util.hive.HiveUtils;
    +import org.apache.nifi.util.hive.ValidationResources;
    +
    +import java.io.BufferedInputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.regex.Pattern;
    +import java.util.stream.Collectors;
    +
    +import static org.apache.nifi.processors.hive.AbstractHive3QLProcessor.ATTR_OUTPUT_TABLES;
    +
    +@Tags({"hive", "streaming", "put", "database", "store"})
    +@CapabilityDescription("This processor uses Hive Streaming to send flow file records to an Apache Hive 3.0+ table. "
    +        + "The partition values are expected to be the 'last' fields of each record, so if the table is partitioned on column A for example, then the last field in "
    +        + "each record should be field A.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "hivestreaming.record.count", description = "This attribute is written on the flow files routed to the 'success' "
    +                + "and 'failure' relationships, and contains the number of records from the incoming flow file. All records in a flow file are committed as a single transaction."),
    +        @WritesAttribute(attribute = "query.output.tables", description = "This attribute is written on the flow files routed to the 'success' "
    +                + "and 'failure' relationships, and contains the target table name in 'databaseName.tableName' format.")
    +})
    +@RequiresInstanceClassLoading
    +public class PutHive3Streaming extends AbstractProcessor {
    +    // Attributes
    +    public static final String HIVE_STREAMING_RECORD_COUNT_ATTR = "hivestreaming.record.count";
    +
    +    private static final String CLIENT_CACHE_DISABLED_PROPERTY = "hcatalog.hive.client.cache.disabled";
    +
    +    // Properties
    +    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
    +            .name("record-reader")
    +            .displayName("Record Reader")
    +            .description("The service for reading records from incoming flow files.")
    +            .identifiesControllerService(RecordReaderFactory.class)
    +            .required(true)
    +            .build();
    +
    +    static final PropertyDescriptor METASTORE_URI = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-metastore-uri")
    +            .displayName("Hive Metastore URI")
    +            .description("The URI location for the Hive Metastore. Note that this is not the location of the Hive Server. The default port for the "
    +                    + "Hive metastore is 9043.")
    +            .required(true)
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.URI_VALIDATOR)
    +            .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))) // no start with / or end with /
    +            .build();
    +
    +    static final PropertyDescriptor HIVE_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder()
    +            .name("hive3-config-resources")
    +            .displayName("Hive Configuration Resources")
    +            .description("A file or comma separated list of files which contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop "
    +                    + "will search the classpath for a 'hive-site.xml' file or will revert to a default configuration. Note that to enable authentication "
    +                    + "with Kerberos e.g., the appropriate properties must be set in the configuration files. Also note that if Max Concurrent Tasks is set "
    +                    + "to a number greater than one, the 'hcatalog.hive.client.cache.disabled' property will be forced to 'true' to avoid concurrency issues. "
    +                    + "Please see the Hive documentation for more details.")
    +            .required(false)
    +            .addValidator(HiveUtils.createMultipleFilesExistValidator())
    +            .build();
    +
    +    static final PropertyDescriptor DB_NAME = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-database-name")
    +            .displayName("Database Name")
    +            .description("The name of the database in which to put the data.")
    +            .required(true)
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-table-name")
    +            .displayName("Table Name")
    +            .description("The name of the database table in which to put the data.")
    +            .required(true)
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor PARTITION_VALUES = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-part-vals")
    +            .displayName("Partition Values")
    +            .description("Specifies a comma-separated list of the values for the partition columns of the target table. If the incoming records all have the same values "
    +                    + "for the partition columns, those values can be entered here, resulting in a performance gain. If specified, this property will often contain "
    +                    + "Expression Language, for example if PartitionRecord is upstream and two partitions 'name' and 'age' are used, then this property can be set to "
    +                    + "${name},${age}.")
    +            .required(false)
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor AUTOCREATE_PARTITIONS = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-autocreate-partition")
    +            .displayName("Auto-Create Partitions")
    +            .description("Flag indicating whether partitions should be automatically created")
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    static final PropertyDescriptor CALL_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-call-timeout")
    +            .displayName("Call Timeout")
    +            .description("The number of seconds allowed for a Hive Streaming operation to complete. A value of 0 indicates the processor should wait indefinitely on operations. "
    +                    + "Note that although this property supports Expression Language, it will not be evaluated against incoming FlowFile attributes.")
    +            .defaultValue("0")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +            .build();
    +
    +    static final PropertyDescriptor DISABLE_STREAMING_OPTIMIZATIONS = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-disable-optimizations")
    +            .displayName("Disable Streaming Optimizations")
    +            .description("Whether to disable streaming optimizations. Disabling streaming optimizations will have significant impact to performance and memory consumption.")
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .allowableValues("true", "false")
    +            .defaultValue("false")
    +            .build();
    +
    +
    +    static final PropertyDescriptor ROLLBACK_ON_FAILURE = RollbackOnFailure.createRollbackOnFailureProperty(
    +            "NOTE: When an error occurred after a Hive streaming transaction which is derived from the same input FlowFile is already committed," +
    +                    " (i.e. a FlowFile contains more records than 'Records per Transaction' and a failure occurred at the 2nd transaction or later)" +
    +                    " then the succeeded records will be transferred to 'success' relationship while the original input FlowFile stays in incoming queue." +
    +                    " Duplicated records can be created for the succeeded ones when the same FlowFile is processed again.");
    +
    +    static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
    +            .name("kerberos-credentials-service")
    +            .displayName("Kerberos Credentials Service")
    +            .description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos")
    +            .identifiesControllerService(KerberosCredentialsService.class)
    +            .required(false)
    +            .build();
    +
    +    // Relationships
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("A FlowFile containing Avro records routed to this relationship after the record has been successfully transmitted to Hive.")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("A FlowFile containing Avro records routed to this relationship if the record could not be transmitted to Hive.")
    +            .build();
    +
    +    public static final Relationship REL_RETRY = new Relationship.Builder()
    +            .name("retry")
    +            .description("The incoming FlowFile is routed to this relationship if its records cannot be transmitted to Hive. Note that "
    +                    + "some records may have been processed successfully, they will be routed (as Avro flow files) to the success relationship. "
    +                    + "The combination of the retry, success, and failure relationships indicate how many records succeeded and/or failed. This "
    +                    + "can be used to provide a retry capability since full rollback is not possible.")
    +            .build();
    +
    +    private List<PropertyDescriptor> propertyDescriptors;
    +    private Set<Relationship> relationships;
    +
    +    protected volatile HiveConfigurator hiveConfigurator = new HiveConfigurator();
    +    protected volatile UserGroupInformation ugi;
    +    protected volatile HiveConf hiveConfig;
    +
    +    protected volatile int callTimeout;
    +    protected ExecutorService callTimeoutPool;
    +    protected volatile boolean rollbackOnFailure;
    +
    +    // Holder of cached Configuration information so validation does not reload the same config over and over
    +    private final AtomicReference<ValidationResources> validationResourceHolder = new AtomicReference<>();
    +
    +    @Override
    +    protected void init(ProcessorInitializationContext context) {
    +        List<PropertyDescriptor> props = new ArrayList<>();
    +        props.add(RECORD_READER);
    +        props.add(METASTORE_URI);
    +        props.add(HIVE_CONFIGURATION_RESOURCES);
    +        props.add(DB_NAME);
    +        props.add(TABLE_NAME);
    +        props.add(PARTITION_VALUES);
    +        props.add(AUTOCREATE_PARTITIONS);
    +        props.add(CALL_TIMEOUT);
    +        props.add(DISABLE_STREAMING_OPTIMIZATIONS);
    +        props.add(ROLLBACK_ON_FAILURE);
    +        props.add(KERBEROS_CREDENTIALS_SERVICE);
    +
    +        propertyDescriptors = Collections.unmodifiableList(props);
    +
    +        Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_SUCCESS);
    +        _relationships.add(REL_FAILURE);
    +        _relationships.add(REL_RETRY);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
    +        boolean confFileProvided = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).isSet();
    +
    +        final List<ValidationResult> problems = new ArrayList<>();
    +
    +        final KerberosCredentialsService credentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
    +
    +        final String resolvedPrincipal = credentialsService != null ? credentialsService.getPrincipal() : null;
    +        final String resolvedKeytab = credentialsService != null ? credentialsService.getKeytab() : null;
    +        if (confFileProvided) {
    +            final String configFiles = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
    +            problems.addAll(hiveConfigurator.validate(configFiles, resolvedPrincipal, resolvedKeytab, validationResourceHolder, getLogger()));
    +        }
    +
    +        return problems;
    +    }
    +
    +    @OnScheduled
    +    public void setup(final ProcessContext context) {
    +        ComponentLog log = getLogger();
    +        rollbackOnFailure = context.getProperty(ROLLBACK_ON_FAILURE).asBoolean();
    +
    +        final String configFiles = context.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue();
    +        hiveConfig = hiveConfigurator.getConfigurationFromFiles(configFiles);
    +
    +        // If more than one concurrent task, force 'hcatalog.hive.client.cache.disabled' to true
    +        if (context.getMaxConcurrentTasks() > 1) {
    +            hiveConfig.setBoolean(CLIENT_CACHE_DISABLED_PROPERTY, true);
    +        }
    +
    +        // add any dynamic properties to the Hive configuration
    +        for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
    +            final PropertyDescriptor descriptor = entry.getKey();
    +            if (descriptor.isDynamic()) {
    +                hiveConfig.set(descriptor.getName(), entry.getValue());
    +            }
    +        }
    +
    +        hiveConfigurator.preload(hiveConfig);
    +
    +        if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
    +            final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
    +
    +            final String resolvedPrincipal = credentialsService.getPrincipal();
    +            final String resolvedKeytab = credentialsService.getKeytab();
    +
    +            log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[]{resolvedPrincipal, resolvedKeytab});
    +            try {
    +                ugi = hiveConfigurator.authenticate(hiveConfig, resolvedPrincipal, resolvedKeytab);
    +            } catch (AuthenticationFailedException ae) {
    +                throw new ProcessException("Kerberos authentication failed for Hive Streaming", ae);
    +            }
    +
    +            log.info("Successfully logged in as principal {} with keytab {}", new Object[]{resolvedPrincipal, resolvedKeytab});
    +        } else {
    +            ugi = null;
    +        }
    +
    +        callTimeout = context.getProperty(CALL_TIMEOUT).evaluateAttributeExpressions().asInteger() * 1000; // milliseconds
    +        String timeoutName = "put-hive3-streaming-%d";
    +        this.callTimeoutPool = Executors.newFixedThreadPool(1,
    +                new ThreadFactoryBuilder().setNameFormat(timeoutName).build());
    +    }
    +
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
    +        final String dbName = context.getProperty(DB_NAME).evaluateAttributeExpressions(flowFile).getValue();
    +        final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
    +
    +        final ComponentLog log = getLogger();
    +        final String metastoreUri = context.getProperty(METASTORE_URI).evaluateAttributeExpressions(flowFile).getValue();
    +
    +        final String partitionValuesString = context.getProperty(PARTITION_VALUES).evaluateAttributeExpressions(flowFile).getValue();
    +        final boolean autoCreatePartitions = context.getProperty(AUTOCREATE_PARTITIONS).asBoolean();
    +        final boolean disableStreamingOptimizations = context.getProperty(DISABLE_STREAMING_OPTIMIZATIONS).asBoolean();
    +
    +        HiveOptions o = new HiveOptions(metastoreUri, dbName, tableName)
    +                .withHiveConf(hiveConfig)
    +                .withAutoCreatePartitions(autoCreatePartitions)
    +                .withCallTimeout(callTimeout)
    +                .withStreamingOptimizations(!disableStreamingOptimizations);
    +
    +        if (!StringUtils.isEmpty(partitionValuesString)) {
    +            List<String> staticPartitionValues = Arrays.stream(partitionValuesString.split(",")).filter(Objects::nonNull).map(String::trim).collect(Collectors.toList());
    +            o = o.withStaticPartitionValues(staticPartitionValues);
    +        }
    +
    +        if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
    +            final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
    +            o = o.withKerberosPrincipal(credentialsService.getPrincipal()).withKerberosKeytab(credentialsService.getKeytab());
    +        }
    +
    +        final HiveOptions options = o;
    +
    +        // Store the original class loader, then explicitly set it to this class's classloader (for use by the Hive Metastore)
    +        ClassLoader originalClassloader = Thread.currentThread().getContextClassLoader();
    +        Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
    +
    +        StreamingConnection hiveStreamingConnection = null;
    +
    +        try (final InputStream rawIn = session.read(flowFile)) {
    +            long processedRecords = 0L;
    +            final RecordReader reader;
    +
    +            try (final BufferedInputStream in = new BufferedInputStream(rawIn)) {
    +
    +                // if we fail to create the RecordReader then we want to route to failure, so we need to
    +                // handle this separately from the other IOExceptions which normally route to retry
    +                try {
    +                    reader = recordReaderFactory.createRecordReader(flowFile, in, getLogger());
    +                } catch (Exception e) {
    +                    throw new RecordReaderFactoryException("Unable to create RecordReader", e);
    +                }
    +
    +                hiveStreamingConnection = makeStreamingConnection(options, reader);
    +
    +                // Write records to Hive streaming, then commit and close
    +                hiveStreamingConnection.beginTransaction();
    +                hiveStreamingConnection.write(in);
    +                hiveStreamingConnection.commitTransaction();
    +                rawIn.close();
    +
    +                Map<String, String> updateAttributes = new HashMap<>();
    +                updateAttributes.put(HIVE_STREAMING_RECORD_COUNT_ATTR, Long.toString(hiveStreamingConnection.getConnectionStats().getRecordsWritten()));
    +                updateAttributes.put(ATTR_OUTPUT_TABLES, options.getQualifiedTableName());
    +                flowFile = session.putAllAttributes(flowFile, updateAttributes);
    +                session.getProvenanceReporter().send(flowFile, hiveStreamingConnection.getMetastoreUri());
    +                session.transfer(flowFile, REL_SUCCESS);
    +            } catch (TransactionError te) {
    +                if (rollbackOnFailure) {
    +                    throw new ProcessException(te.getLocalizedMessage(), te);
    +                } else {
    +                    throw new ShouldRetryException(te.getLocalizedMessage(), te);
    +                }
    +            } catch (RecordReaderFactoryException rrfe) {
    +                throw new ProcessException(rrfe);
    +            }
    +        } catch (InvalidTable | SerializationError | StreamingIOFailure | IOException e) {
    +            if (rollbackOnFailure) {
    +                abortConnection(hiveStreamingConnection);
    +                throw new ProcessException(e.getLocalizedMessage(), e);
    +            } else {
    +                Map<String, String> updateAttributes = new HashMap<>();
    +                updateAttributes.put(HIVE_STREAMING_RECORD_COUNT_ATTR, Long.toString(hiveStreamingConnection.getConnectionStats().getRecordsWritten()));
    +                updateAttributes.put(ATTR_OUTPUT_TABLES, options.getQualifiedTableName());
    +                flowFile = session.putAllAttributes(flowFile, updateAttributes);
    +                session.transfer(flowFile, REL_FAILURE);
    +            }
    +        } catch (DiscontinuedException e) {
    +            // The input FlowFile processing is discontinued. Keep it in the input queue.
    +            getLogger().warn("Discontinued processing for {} due to {}", new Object[]{flowFile, e}, e);
    +            session.transfer(flowFile, Relationship.SELF);
    +        } catch (ConnectionError ce) {
    +            // If we can't connect to the metastore, yield the processor
    +            context.yield();
    +            throw new ProcessException("A connection to metastore cannot be established", ce);
    +        } catch (ShouldRetryException e) {
    +            // This exception is already a result of adjusting an error, so simply transfer the FlowFile to retry. Still need to abort the txn
    +            getLogger().error(e.getLocalizedMessage(), e);
    +            abortConnection(hiveStreamingConnection);
    +            flowFile = session.penalize(flowFile);
    +            session.transfer(flowFile, REL_RETRY);
    +        } catch (StreamingException se) {
    +            // Handle all other exceptions. These are often record-based exceptions (since Hive will throw a subclass of the exception caught above)
    +            Throwable cause = se.getCause();
    +            if (cause == null) cause = se;
    +            // This is a failure on the incoming data, rollback on failure if specified; otherwise route to failure after penalizing (and abort txn in any case)
    +            if (rollbackOnFailure) {
    +                abortConnection(hiveStreamingConnection);
    +                throw new ProcessException(cause.getLocalizedMessage(), cause);
    +            } else {
    +                flowFile = session.penalize(flowFile);
    +                Map<String, String> updateAttributes = new HashMap<>();
    +                updateAttributes.put(HIVE_STREAMING_RECORD_COUNT_ATTR, Long.toString(hiveStreamingConnection.getConnectionStats().getRecordsWritten()));
    +                updateAttributes.put(ATTR_OUTPUT_TABLES, options.getQualifiedTableName());
    +                flowFile = session.putAllAttributes(flowFile, updateAttributes);
    +                session.transfer(flowFile, REL_FAILURE);
    +            }
    +
    +        } catch (Exception e) {
    +            abortConnection(hiveStreamingConnection);
    +            throw (e instanceof ProcessException) ? (ProcessException) e : new ProcessException(e);
    +        } finally {
    +            closeConnection(hiveStreamingConnection);
    +            // Restore original class loader, might not be necessary but is good practice since the processor task changed it
    +            Thread.currentThread().setContextClassLoader(originalClassloader);
    +        }
    +    }
    +
    +    StreamingConnection makeStreamingConnection(HiveOptions options, RecordReader reader) throws StreamingException {
    +        return HiveStreamingConnection.newBuilder()
    +                .withDatabase(options.getDatabaseName())
    +                .withTable(options.getTableName())
    +                .withStaticPartitionValues(options.getStaticPartitionValues())
    +                .withHiveConf(options.getHiveConf())
    +                .withRecordWriter(new HiveRecordWriter(reader, getLogger()))
    +                .withAgentInfo("NiFi " + this.getClass().getSimpleName() + " [" + this.getIdentifier()
    +                        + "] thread " + Thread.currentThread().getId() + "[" + Thread.currentThread().getName() + "]")
    +                .connect();
    +    }
    +
    +    @OnStopped
    +    public void cleanup() {
    +        validationResourceHolder.set(null); // trigger re-validation of resources
    +
    +        ComponentLog log = getLogger();
    +
    +        if (callTimeoutPool != null) {
    +            callTimeoutPool.shutdown();
    +            try {
    +                while (!callTimeoutPool.isTerminated()) {
    +                    callTimeoutPool.awaitTermination(callTimeout, TimeUnit.MILLISECONDS);
    +                }
    +            } catch (Throwable t) {
    +                log.warn("shutdown interrupted on " + callTimeoutPool, t);
    +            }
    +            callTimeoutPool = null;
    +        }
    +
    +        ugi = null;
    +    }
    +
    +    private void abortAndCloseConnection(StreamingConnection connection) {
    +        try {
    +            abortConnection(connection);
    --- End diff --
    
    Will add the handler. However what happens when an error occurs during the writing of rows? Currently we catch various exceptions and if we're in a state where we shouldn't commit, we abort the transaction and send the flow file on (it can be retried in whole if the user routes it back to the input)


---

[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

Posted by prasanthj <gi...@git.apache.org>.
Github user prasanthj commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2755#discussion_r193182827
  
    --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java ---
    @@ -0,0 +1,247 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.hive.streaming;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hive.conf.HiveConf;
    +import org.apache.hadoop.hive.serde.serdeConstants;
    +import org.apache.hadoop.hive.serde2.AbstractSerDe;
    +import org.apache.hadoop.hive.serde2.SerDeException;
    +import org.apache.hadoop.hive.serde2.SerDeStats;
    +import org.apache.hadoop.hive.serde2.SerDeUtils;
    +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
    +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
    +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
    +import org.apache.hadoop.io.ObjectWritable;
    +import org.apache.hadoop.io.Writable;
    +import org.apache.hive.common.util.HiveStringUtils;
    +import org.apache.hive.common.util.TimestampParser;
    +import org.apache.nifi.avro.AvroTypeUtil;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.util.DataTypeUtils;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Properties;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +public class NiFiRecordSerDe extends AbstractSerDe {
    +
    +    protected RecordReader recordReader;
    +    protected ComponentLog log;
    +    protected List<String> columnNames;
    +    protected StructTypeInfo schema;
    +
    +    protected StandardStructObjectInspector cachedObjectInspector;
    +    protected TimestampParser tsParser;
    +
    +    private final static Pattern INTERNAL_PATTERN = Pattern.compile("_col([0-9]+)");
    +
    +    public NiFiRecordSerDe(RecordReader recordReader, ComponentLog log) {
    +        this.recordReader = recordReader;
    +        this.log = log;
    +    }
    +
    +    @Override
    +    public void initialize(Configuration conf, Properties tbl) {
    +        List<TypeInfo> columnTypes;
    +        StructTypeInfo rowTypeInfo;
    +
    +        log.debug("Initializing NiFiRecordSerDe: {}", tbl.entrySet().toArray());
    +
    +        // Get column names and types
    +        String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS);
    +        String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
    +        final String columnNameDelimiter = tbl.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ? tbl
    +                .getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : String.valueOf(SerDeUtils.COMMA);
    +        // all table column names
    +        if (columnNameProperty.isEmpty()) {
    +            columnNames = new ArrayList<>(0);
    +        } else {
    +            columnNames = new ArrayList<>(Arrays.asList(columnNameProperty.split(columnNameDelimiter)));
    +        }
    +
    +        // all column types
    +        if (columnTypeProperty.isEmpty()) {
    +            columnTypes = new ArrayList<>(0);
    +        } else {
    +            columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
    +        }
    +
    +        log.debug("columns: {}, {}", new Object[]{columnNameProperty, columnNames});
    +        log.debug("types: {}, {} ", new Object[]{columnTypeProperty, columnTypes});
    +
    +        assert (columnNames.size() == columnTypes.size());
    +
    +        rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes);
    +        schema = rowTypeInfo;
    +        log.debug("schema : {}", new Object[]{schema});
    +        cachedObjectInspector = (StandardStructObjectInspector) TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(rowTypeInfo);
    +        tsParser = new TimestampParser(HiveStringUtils.splitAndUnEscape(tbl.getProperty(serdeConstants.TIMESTAMP_FORMATS)));
    +    }
    +
    +    @Override
    +    public Class<? extends Writable> getSerializedClass() {
    +        return null;
    +    }
    +
    +    @Override
    +    public Writable serialize(Object o, ObjectInspector objectInspector) throws SerDeException {
    +        return null;
    +    }
    +
    +    @Override
    +    public SerDeStats getSerDeStats() {
    +        return null;
    +    }
    +
    +    @Override
    +    public Object deserialize(Writable writable) throws SerDeException {
    +        ObjectWritable t = (ObjectWritable) writable;
    +        Record record = (Record) t.get();
    +        List<Object> r = new ArrayList<>(Collections.nCopies(columnNames.size(), null));
    +        try {
    +            RecordSchema recordSchema = record.getSchema();
    +            for (RecordField field : recordSchema.getFields()) {
    +                String fieldName = field.getFieldName();
    --- End diff --
    
    Yes. At the very least this should happen once per file and not per record. 
    For unpartitioned or statically partitioned case, this should not happen as there is no partition columns to extract (static partitions goes into API) so should use fast path without these checks and allocations.  


---

[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2755#discussion_r192763045
  
    --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/HiveRecordWriter.java ---
    @@ -0,0 +1,106 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.hive.streaming;
    +
    +import com.google.common.base.Joiner;
    +import org.apache.hadoop.hive.serde.serdeConstants;
    +import org.apache.hadoop.hive.serde2.AbstractSerDe;
    +import org.apache.hadoop.hive.serde2.SerDeException;
    +import org.apache.hadoop.hive.serde2.SerDeUtils;
    +import org.apache.hadoop.io.ObjectWritable;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.serialization.MalformedRecordException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.record.Record;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.List;
    +import java.util.Properties;
    +
    +public class HiveRecordWriter extends AbstractRecordWriter {
    --- End diff --
    
    @prasanthj Do you mind taking a look at HiveRecordWriter and NiFiRecordSerDe (and PutHive3Streaming which uses them when creating the connection and passing in options)? Those are the custom impls for the new Hive Streaming API classes, hoping for suggestions on improving performance, etc. Thanks in advance!


---

[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

Posted by prasanthj <gi...@git.apache.org>.
Github user prasanthj commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2755#discussion_r193180621
  
    --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java ---
    @@ -0,0 +1,247 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.hive.streaming;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hive.conf.HiveConf;
    +import org.apache.hadoop.hive.serde.serdeConstants;
    +import org.apache.hadoop.hive.serde2.AbstractSerDe;
    +import org.apache.hadoop.hive.serde2.SerDeException;
    +import org.apache.hadoop.hive.serde2.SerDeStats;
    +import org.apache.hadoop.hive.serde2.SerDeUtils;
    +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
    +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
    +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
    +import org.apache.hadoop.io.ObjectWritable;
    +import org.apache.hadoop.io.Writable;
    +import org.apache.hive.common.util.HiveStringUtils;
    +import org.apache.hive.common.util.TimestampParser;
    +import org.apache.nifi.avro.AvroTypeUtil;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.util.DataTypeUtils;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Properties;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +public class NiFiRecordSerDe extends AbstractSerDe {
    +
    +    protected RecordReader recordReader;
    +    protected ComponentLog log;
    +    protected List<String> columnNames;
    +    protected StructTypeInfo schema;
    +
    +    protected StandardStructObjectInspector cachedObjectInspector;
    +    protected TimestampParser tsParser;
    +
    +    private final static Pattern INTERNAL_PATTERN = Pattern.compile("_col([0-9]+)");
    +
    +    public NiFiRecordSerDe(RecordReader recordReader, ComponentLog log) {
    +        this.recordReader = recordReader;
    +        this.log = log;
    +    }
    +
    +    @Override
    +    public void initialize(Configuration conf, Properties tbl) {
    +        List<TypeInfo> columnTypes;
    +        StructTypeInfo rowTypeInfo;
    +
    +        log.debug("Initializing NiFiRecordSerDe: {}", tbl.entrySet().toArray());
    +
    +        // Get column names and types
    +        String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS);
    +        String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
    +        final String columnNameDelimiter = tbl.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ? tbl
    +                .getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : String.valueOf(SerDeUtils.COMMA);
    +        // all table column names
    +        if (columnNameProperty.isEmpty()) {
    +            columnNames = new ArrayList<>(0);
    +        } else {
    +            columnNames = new ArrayList<>(Arrays.asList(columnNameProperty.split(columnNameDelimiter)));
    +        }
    +
    +        // all column types
    +        if (columnTypeProperty.isEmpty()) {
    +            columnTypes = new ArrayList<>(0);
    +        } else {
    +            columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
    +        }
    +
    +        log.debug("columns: {}, {}", new Object[]{columnNameProperty, columnNames});
    +        log.debug("types: {}, {} ", new Object[]{columnTypeProperty, columnTypes});
    +
    +        assert (columnNames.size() == columnTypes.size());
    +
    +        rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes);
    +        schema = rowTypeInfo;
    +        log.debug("schema : {}", new Object[]{schema});
    +        cachedObjectInspector = (StandardStructObjectInspector) TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(rowTypeInfo);
    +        tsParser = new TimestampParser(HiveStringUtils.splitAndUnEscape(tbl.getProperty(serdeConstants.TIMESTAMP_FORMATS)));
    +    }
    +
    +    @Override
    +    public Class<? extends Writable> getSerializedClass() {
    +        return null;
    +    }
    +
    +    @Override
    +    public Writable serialize(Object o, ObjectInspector objectInspector) throws SerDeException {
    +        return null;
    +    }
    +
    +    @Override
    +    public SerDeStats getSerDeStats() {
    +        return null;
    +    }
    +
    +    @Override
    +    public Object deserialize(Writable writable) throws SerDeException {
    +        ObjectWritable t = (ObjectWritable) writable;
    +        Record record = (Record) t.get();
    +        List<Object> r = new ArrayList<>(Collections.nCopies(columnNames.size(), null));
    +        try {
    +            RecordSchema recordSchema = record.getSchema();
    +            for (RecordField field : recordSchema.getFields()) {
    +                String fieldName = field.getFieldName();
    +                int fpos = schema.getAllStructFieldNames().indexOf(fieldName.toLowerCase());
    --- End diff --
    
    I don't understand the part about the internal columns. The destination table will never have internal column names. What would be the case for special casing the internal columns?


---

[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

Posted by prasanthj <gi...@git.apache.org>.
Github user prasanthj commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2755#discussion_r193184086
  
    --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java ---
    @@ -0,0 +1,548 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.hive;
    +
    +import com.google.common.util.concurrent.ThreadFactoryBuilder;
    +import org.apache.hadoop.hive.conf.HiveConf;
    +import org.apache.hadoop.security.UserGroupInformation;
    +import org.apache.hive.streaming.ConnectionError;
    +import org.apache.hive.streaming.HiveStreamingConnection;
    +import org.apache.hive.streaming.InvalidTable;
    +import org.apache.hive.streaming.SerializationError;
    +import org.apache.hive.streaming.StreamingConnection;
    +import org.apache.hive.streaming.StreamingException;
    +import org.apache.hive.streaming.StreamingIOFailure;
    +import org.apache.hive.streaming.TransactionError;
    +import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.hadoop.SecurityUtil;
    +import org.apache.nifi.kerberos.KerberosCredentialsService;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processor.util.pattern.DiscontinuedException;
    +import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
    +import org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.nifi.util.hive.AuthenticationFailedException;
    +import org.apache.nifi.util.hive.HiveConfigurator;
    +import org.apache.nifi.util.hive.HiveOptions;
    +import org.apache.hive.streaming.HiveRecordWriter;
    +import org.apache.nifi.util.hive.HiveUtils;
    +import org.apache.nifi.util.hive.ValidationResources;
    +
    +import java.io.BufferedInputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.regex.Pattern;
    +import java.util.stream.Collectors;
    +
    +import static org.apache.nifi.processors.hive.AbstractHive3QLProcessor.ATTR_OUTPUT_TABLES;
    +
    +@Tags({"hive", "streaming", "put", "database", "store"})
    +@CapabilityDescription("This processor uses Hive Streaming to send flow file records to an Apache Hive 3.0+ table. "
    +        + "The partition values are expected to be the 'last' fields of each record, so if the table is partitioned on column A for example, then the last field in "
    +        + "each record should be field A.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "hivestreaming.record.count", description = "This attribute is written on the flow files routed to the 'success' "
    +                + "and 'failure' relationships, and contains the number of records from the incoming flow file. All records in a flow file are committed as a single transaction."),
    +        @WritesAttribute(attribute = "query.output.tables", description = "This attribute is written on the flow files routed to the 'success' "
    +                + "and 'failure' relationships, and contains the target table name in 'databaseName.tableName' format.")
    +})
    +@RequiresInstanceClassLoading
    +public class PutHive3Streaming extends AbstractProcessor {
    +    // Attributes
    +    public static final String HIVE_STREAMING_RECORD_COUNT_ATTR = "hivestreaming.record.count";
    +
    +    private static final String CLIENT_CACHE_DISABLED_PROPERTY = "hcatalog.hive.client.cache.disabled";
    +
    +    // Properties
    +    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
    +            .name("record-reader")
    +            .displayName("Record Reader")
    +            .description("The service for reading records from incoming flow files.")
    +            .identifiesControllerService(RecordReaderFactory.class)
    +            .required(true)
    +            .build();
    +
    +    static final PropertyDescriptor METASTORE_URI = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-metastore-uri")
    +            .displayName("Hive Metastore URI")
    +            .description("The URI location for the Hive Metastore. Note that this is not the location of the Hive Server. The default port for the "
    +                    + "Hive metastore is 9043.")
    +            .required(true)
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.URI_VALIDATOR)
    +            .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))) // no start with / or end with /
    +            .build();
    +
    +    static final PropertyDescriptor HIVE_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder()
    +            .name("hive3-config-resources")
    +            .displayName("Hive Configuration Resources")
    +            .description("A file or comma separated list of files which contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop "
    +                    + "will search the classpath for a 'hive-site.xml' file or will revert to a default configuration. Note that to enable authentication "
    +                    + "with Kerberos e.g., the appropriate properties must be set in the configuration files. Also note that if Max Concurrent Tasks is set "
    +                    + "to a number greater than one, the 'hcatalog.hive.client.cache.disabled' property will be forced to 'true' to avoid concurrency issues. "
    +                    + "Please see the Hive documentation for more details.")
    +            .required(false)
    +            .addValidator(HiveUtils.createMultipleFilesExistValidator())
    +            .build();
    +
    +    static final PropertyDescriptor DB_NAME = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-database-name")
    +            .displayName("Database Name")
    +            .description("The name of the database in which to put the data.")
    +            .required(true)
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-table-name")
    +            .displayName("Table Name")
    +            .description("The name of the database table in which to put the data.")
    +            .required(true)
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor PARTITION_VALUES = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-part-vals")
    +            .displayName("Partition Values")
    +            .description("Specifies a comma-separated list of the values for the partition columns of the target table. If the incoming records all have the same values "
    +                    + "for the partition columns, those values can be entered here, resulting in a performance gain. If specified, this property will often contain "
    +                    + "Expression Language, for example if PartitionRecord is upstream and two partitions 'name' and 'age' are used, then this property can be set to "
    +                    + "${name},${age}.")
    +            .required(false)
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor AUTOCREATE_PARTITIONS = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-autocreate-partition")
    +            .displayName("Auto-Create Partitions")
    +            .description("Flag indicating whether partitions should be automatically created")
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    static final PropertyDescriptor CALL_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-call-timeout")
    +            .displayName("Call Timeout")
    +            .description("The number of seconds allowed for a Hive Streaming operation to complete. A value of 0 indicates the processor should wait indefinitely on operations. "
    +                    + "Note that although this property supports Expression Language, it will not be evaluated against incoming FlowFile attributes.")
    +            .defaultValue("0")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +            .build();
    +
    +    static final PropertyDescriptor DISABLE_STREAMING_OPTIMIZATIONS = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-disable-optimizations")
    +            .displayName("Disable Streaming Optimizations")
    +            .description("Whether to disable streaming optimizations. Disabling streaming optimizations will have significant impact to performance and memory consumption.")
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .allowableValues("true", "false")
    +            .defaultValue("false")
    +            .build();
    +
    +
    +    static final PropertyDescriptor ROLLBACK_ON_FAILURE = RollbackOnFailure.createRollbackOnFailureProperty(
    +            "NOTE: When an error occurred after a Hive streaming transaction which is derived from the same input FlowFile is already committed," +
    +                    " (i.e. a FlowFile contains more records than 'Records per Transaction' and a failure occurred at the 2nd transaction or later)" +
    +                    " then the succeeded records will be transferred to 'success' relationship while the original input FlowFile stays in incoming queue." +
    +                    " Duplicated records can be created for the succeeded ones when the same FlowFile is processed again.");
    +
    +    static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
    +            .name("kerberos-credentials-service")
    +            .displayName("Kerberos Credentials Service")
    +            .description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos")
    +            .identifiesControllerService(KerberosCredentialsService.class)
    +            .required(false)
    +            .build();
    +
    +    // Relationships
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("A FlowFile containing Avro records routed to this relationship after the record has been successfully transmitted to Hive.")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("A FlowFile containing Avro records routed to this relationship if the record could not be transmitted to Hive.")
    +            .build();
    +
    +    public static final Relationship REL_RETRY = new Relationship.Builder()
    +            .name("retry")
    +            .description("The incoming FlowFile is routed to this relationship if its records cannot be transmitted to Hive. Note that "
    +                    + "some records may have been processed successfully, they will be routed (as Avro flow files) to the success relationship. "
    +                    + "The combination of the retry, success, and failure relationships indicate how many records succeeded and/or failed. This "
    +                    + "can be used to provide a retry capability since full rollback is not possible.")
    +            .build();
    +
    +    private List<PropertyDescriptor> propertyDescriptors;
    +    private Set<Relationship> relationships;
    +
    +    protected volatile HiveConfigurator hiveConfigurator = new HiveConfigurator();
    +    protected volatile UserGroupInformation ugi;
    +    protected volatile HiveConf hiveConfig;
    +
    +    protected volatile int callTimeout;
    +    protected ExecutorService callTimeoutPool;
    +    protected volatile boolean rollbackOnFailure;
    +
    +    // Holder of cached Configuration information so validation does not reload the same config over and over
    +    private final AtomicReference<ValidationResources> validationResourceHolder = new AtomicReference<>();
    +
    +    @Override
    +    protected void init(ProcessorInitializationContext context) {
    +        List<PropertyDescriptor> props = new ArrayList<>();
    +        props.add(RECORD_READER);
    +        props.add(METASTORE_URI);
    +        props.add(HIVE_CONFIGURATION_RESOURCES);
    +        props.add(DB_NAME);
    +        props.add(TABLE_NAME);
    +        props.add(PARTITION_VALUES);
    +        props.add(AUTOCREATE_PARTITIONS);
    +        props.add(CALL_TIMEOUT);
    +        props.add(DISABLE_STREAMING_OPTIMIZATIONS);
    +        props.add(ROLLBACK_ON_FAILURE);
    +        props.add(KERBEROS_CREDENTIALS_SERVICE);
    +
    +        propertyDescriptors = Collections.unmodifiableList(props);
    +
    +        Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_SUCCESS);
    +        _relationships.add(REL_FAILURE);
    +        _relationships.add(REL_RETRY);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
    +        boolean confFileProvided = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).isSet();
    +
    +        final List<ValidationResult> problems = new ArrayList<>();
    +
    +        final KerberosCredentialsService credentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
    +
    +        final String resolvedPrincipal = credentialsService != null ? credentialsService.getPrincipal() : null;
    +        final String resolvedKeytab = credentialsService != null ? credentialsService.getKeytab() : null;
    +        if (confFileProvided) {
    +            final String configFiles = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
    +            problems.addAll(hiveConfigurator.validate(configFiles, resolvedPrincipal, resolvedKeytab, validationResourceHolder, getLogger()));
    +        }
    +
    +        return problems;
    +    }
    +
    +    @OnScheduled
    +    public void setup(final ProcessContext context) {
    +        ComponentLog log = getLogger();
    +        rollbackOnFailure = context.getProperty(ROLLBACK_ON_FAILURE).asBoolean();
    +
    +        final String configFiles = context.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue();
    +        hiveConfig = hiveConfigurator.getConfigurationFromFiles(configFiles);
    +
    +        // If more than one concurrent task, force 'hcatalog.hive.client.cache.disabled' to true
    +        if (context.getMaxConcurrentTasks() > 1) {
    +            hiveConfig.setBoolean(CLIENT_CACHE_DISABLED_PROPERTY, true);
    +        }
    +
    +        // add any dynamic properties to the Hive configuration
    +        for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
    +            final PropertyDescriptor descriptor = entry.getKey();
    +            if (descriptor.isDynamic()) {
    +                hiveConfig.set(descriptor.getName(), entry.getValue());
    +            }
    +        }
    +
    +        hiveConfigurator.preload(hiveConfig);
    +
    +        if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
    +            final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
    +
    +            final String resolvedPrincipal = credentialsService.getPrincipal();
    +            final String resolvedKeytab = credentialsService.getKeytab();
    +
    +            log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[]{resolvedPrincipal, resolvedKeytab});
    +            try {
    +                ugi = hiveConfigurator.authenticate(hiveConfig, resolvedPrincipal, resolvedKeytab);
    +            } catch (AuthenticationFailedException ae) {
    +                throw new ProcessException("Kerberos authentication failed for Hive Streaming", ae);
    +            }
    +
    +            log.info("Successfully logged in as principal {} with keytab {}", new Object[]{resolvedPrincipal, resolvedKeytab});
    +        } else {
    +            ugi = null;
    +        }
    +
    +        callTimeout = context.getProperty(CALL_TIMEOUT).evaluateAttributeExpressions().asInteger() * 1000; // milliseconds
    +        String timeoutName = "put-hive3-streaming-%d";
    +        this.callTimeoutPool = Executors.newFixedThreadPool(1,
    +                new ThreadFactoryBuilder().setNameFormat(timeoutName).build());
    +    }
    +
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
    +        final String dbName = context.getProperty(DB_NAME).evaluateAttributeExpressions(flowFile).getValue();
    +        final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
    +
    +        final ComponentLog log = getLogger();
    +        final String metastoreUri = context.getProperty(METASTORE_URI).evaluateAttributeExpressions(flowFile).getValue();
    +
    +        final String partitionValuesString = context.getProperty(PARTITION_VALUES).evaluateAttributeExpressions(flowFile).getValue();
    +        final boolean autoCreatePartitions = context.getProperty(AUTOCREATE_PARTITIONS).asBoolean();
    +        final boolean disableStreamingOptimizations = context.getProperty(DISABLE_STREAMING_OPTIMIZATIONS).asBoolean();
    +
    +        HiveOptions o = new HiveOptions(metastoreUri, dbName, tableName)
    +                .withHiveConf(hiveConfig)
    +                .withAutoCreatePartitions(autoCreatePartitions)
    +                .withCallTimeout(callTimeout)
    +                .withStreamingOptimizations(!disableStreamingOptimizations);
    +
    +        if (!StringUtils.isEmpty(partitionValuesString)) {
    +            List<String> staticPartitionValues = Arrays.stream(partitionValuesString.split(",")).filter(Objects::nonNull).map(String::trim).collect(Collectors.toList());
    +            o = o.withStaticPartitionValues(staticPartitionValues);
    +        }
    +
    +        if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
    +            final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
    +            o = o.withKerberosPrincipal(credentialsService.getPrincipal()).withKerberosKeytab(credentialsService.getKeytab());
    +        }
    +
    +        final HiveOptions options = o;
    +
    +        // Store the original class loader, then explicitly set it to this class's classloader (for use by the Hive Metastore)
    +        ClassLoader originalClassloader = Thread.currentThread().getContextClassLoader();
    +        Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
    +
    +        StreamingConnection hiveStreamingConnection = null;
    +
    +        try (final InputStream rawIn = session.read(flowFile)) {
    +            long processedRecords = 0L;
    +            final RecordReader reader;
    +
    +            try (final BufferedInputStream in = new BufferedInputStream(rawIn)) {
    +
    +                // if we fail to create the RecordReader then we want to route to failure, so we need to
    +                // handle this separately from the other IOExceptions which normally route to retry
    +                try {
    +                    reader = recordReaderFactory.createRecordReader(flowFile, in, getLogger());
    +                } catch (Exception e) {
    +                    throw new RecordReaderFactoryException("Unable to create RecordReader", e);
    +                }
    +
    +                hiveStreamingConnection = makeStreamingConnection(options, reader);
    +
    +                // Write records to Hive streaming, then commit and close
    +                hiveStreamingConnection.beginTransaction();
    +                hiveStreamingConnection.write(in);
    +                hiveStreamingConnection.commitTransaction();
    +                rawIn.close();
    +
    +                Map<String, String> updateAttributes = new HashMap<>();
    +                updateAttributes.put(HIVE_STREAMING_RECORD_COUNT_ATTR, Long.toString(hiveStreamingConnection.getConnectionStats().getRecordsWritten()));
    +                updateAttributes.put(ATTR_OUTPUT_TABLES, options.getQualifiedTableName());
    +                flowFile = session.putAllAttributes(flowFile, updateAttributes);
    +                session.getProvenanceReporter().send(flowFile, hiveStreamingConnection.getMetastoreUri());
    +                session.transfer(flowFile, REL_SUCCESS);
    +            } catch (TransactionError te) {
    +                if (rollbackOnFailure) {
    +                    throw new ProcessException(te.getLocalizedMessage(), te);
    +                } else {
    +                    throw new ShouldRetryException(te.getLocalizedMessage(), te);
    +                }
    +            } catch (RecordReaderFactoryException rrfe) {
    +                throw new ProcessException(rrfe);
    +            }
    +        } catch (InvalidTable | SerializationError | StreamingIOFailure | IOException e) {
    +            if (rollbackOnFailure) {
    +                abortConnection(hiveStreamingConnection);
    +                throw new ProcessException(e.getLocalizedMessage(), e);
    +            } else {
    +                Map<String, String> updateAttributes = new HashMap<>();
    +                updateAttributes.put(HIVE_STREAMING_RECORD_COUNT_ATTR, Long.toString(hiveStreamingConnection.getConnectionStats().getRecordsWritten()));
    +                updateAttributes.put(ATTR_OUTPUT_TABLES, options.getQualifiedTableName());
    +                flowFile = session.putAllAttributes(flowFile, updateAttributes);
    +                session.transfer(flowFile, REL_FAILURE);
    +            }
    +        } catch (DiscontinuedException e) {
    +            // The input FlowFile processing is discontinued. Keep it in the input queue.
    +            getLogger().warn("Discontinued processing for {} due to {}", new Object[]{flowFile, e}, e);
    +            session.transfer(flowFile, Relationship.SELF);
    +        } catch (ConnectionError ce) {
    +            // If we can't connect to the metastore, yield the processor
    +            context.yield();
    +            throw new ProcessException("A connection to metastore cannot be established", ce);
    +        } catch (ShouldRetryException e) {
    +            // This exception is already a result of adjusting an error, so simply transfer the FlowFile to retry. Still need to abort the txn
    +            getLogger().error(e.getLocalizedMessage(), e);
    +            abortConnection(hiveStreamingConnection);
    +            flowFile = session.penalize(flowFile);
    +            session.transfer(flowFile, REL_RETRY);
    +        } catch (StreamingException se) {
    +            // Handle all other exceptions. These are often record-based exceptions (since Hive will throw a subclass of the exception caught above)
    +            Throwable cause = se.getCause();
    +            if (cause == null) cause = se;
    +            // This is a failure on the incoming data, rollback on failure if specified; otherwise route to failure after penalizing (and abort txn in any case)
    +            if (rollbackOnFailure) {
    +                abortConnection(hiveStreamingConnection);
    +                throw new ProcessException(cause.getLocalizedMessage(), cause);
    +            } else {
    +                flowFile = session.penalize(flowFile);
    +                Map<String, String> updateAttributes = new HashMap<>();
    +                updateAttributes.put(HIVE_STREAMING_RECORD_COUNT_ATTR, Long.toString(hiveStreamingConnection.getConnectionStats().getRecordsWritten()));
    +                updateAttributes.put(ATTR_OUTPUT_TABLES, options.getQualifiedTableName());
    +                flowFile = session.putAllAttributes(flowFile, updateAttributes);
    +                session.transfer(flowFile, REL_FAILURE);
    +            }
    +
    +        } catch (Exception e) {
    +            abortConnection(hiveStreamingConnection);
    +            throw (e instanceof ProcessException) ? (ProcessException) e : new ProcessException(e);
    +        } finally {
    +            closeConnection(hiveStreamingConnection);
    +            // Restore original class loader, might not be necessary but is good practice since the processor task changed it
    +            Thread.currentThread().setContextClassLoader(originalClassloader);
    +        }
    +    }
    +
    +    StreamingConnection makeStreamingConnection(HiveOptions options, RecordReader reader) throws StreamingException {
    +        return HiveStreamingConnection.newBuilder()
    +                .withDatabase(options.getDatabaseName())
    +                .withTable(options.getTableName())
    +                .withStaticPartitionValues(options.getStaticPartitionValues())
    +                .withHiveConf(options.getHiveConf())
    +                .withRecordWriter(new HiveRecordWriter(reader, getLogger()))
    +                .withAgentInfo("NiFi " + this.getClass().getSimpleName() + " [" + this.getIdentifier()
    +                        + "] thread " + Thread.currentThread().getId() + "[" + Thread.currentThread().getName() + "]")
    +                .connect();
    +    }
    +
    +    @OnStopped
    +    public void cleanup() {
    +        validationResourceHolder.set(null); // trigger re-validation of resources
    +
    +        ComponentLog log = getLogger();
    +
    +        if (callTimeoutPool != null) {
    +            callTimeoutPool.shutdown();
    +            try {
    +                while (!callTimeoutPool.isTerminated()) {
    +                    callTimeoutPool.awaitTermination(callTimeout, TimeUnit.MILLISECONDS);
    +                }
    +            } catch (Throwable t) {
    +                log.warn("shutdown interrupted on " + callTimeoutPool, t);
    +            }
    +            callTimeoutPool = null;
    +        }
    +
    +        ugi = null;
    +    }
    +
    +    private void abortAndCloseConnection(StreamingConnection connection) {
    +        try {
    +            abortConnection(connection);
    --- End diff --
    
    Add Shutdown handler that invokes abortAndClose. Shutdown handler gets invoked on JVM shutdown. HIVE-19772 has example of shutdown handler and uncaught exception handler usage both of which can be added to Nifi processor. Basically the idea is to abort or commit properly when there is interrupt or runtime exception/errors. Without it, we may end up generating broken orc files which will not be cleaned up (compactor currently does not handle this gracefully)


---

[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

Posted by prasanthj <gi...@git.apache.org>.
Github user prasanthj commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2755#discussion_r193176667
  
    --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java ---
    @@ -0,0 +1,548 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.hive;
    +
    +import com.google.common.util.concurrent.ThreadFactoryBuilder;
    +import org.apache.hadoop.hive.conf.HiveConf;
    +import org.apache.hadoop.security.UserGroupInformation;
    +import org.apache.hive.streaming.ConnectionError;
    +import org.apache.hive.streaming.HiveStreamingConnection;
    +import org.apache.hive.streaming.InvalidTable;
    +import org.apache.hive.streaming.SerializationError;
    +import org.apache.hive.streaming.StreamingConnection;
    +import org.apache.hive.streaming.StreamingException;
    +import org.apache.hive.streaming.StreamingIOFailure;
    +import org.apache.hive.streaming.TransactionError;
    +import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.hadoop.SecurityUtil;
    +import org.apache.nifi.kerberos.KerberosCredentialsService;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processor.util.pattern.DiscontinuedException;
    +import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
    +import org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.nifi.util.hive.AuthenticationFailedException;
    +import org.apache.nifi.util.hive.HiveConfigurator;
    +import org.apache.nifi.util.hive.HiveOptions;
    +import org.apache.hive.streaming.HiveRecordWriter;
    +import org.apache.nifi.util.hive.HiveUtils;
    +import org.apache.nifi.util.hive.ValidationResources;
    +
    +import java.io.BufferedInputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.regex.Pattern;
    +import java.util.stream.Collectors;
    +
    +import static org.apache.nifi.processors.hive.AbstractHive3QLProcessor.ATTR_OUTPUT_TABLES;
    +
    +@Tags({"hive", "streaming", "put", "database", "store"})
    +@CapabilityDescription("This processor uses Hive Streaming to send flow file records to an Apache Hive 3.0+ table. "
    +        + "The partition values are expected to be the 'last' fields of each record, so if the table is partitioned on column A for example, then the last field in "
    +        + "each record should be field A.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "hivestreaming.record.count", description = "This attribute is written on the flow files routed to the 'success' "
    +                + "and 'failure' relationships, and contains the number of records from the incoming flow file. All records in a flow file are committed as a single transaction."),
    +        @WritesAttribute(attribute = "query.output.tables", description = "This attribute is written on the flow files routed to the 'success' "
    +                + "and 'failure' relationships, and contains the target table name in 'databaseName.tableName' format.")
    +})
    +@RequiresInstanceClassLoading
    +public class PutHive3Streaming extends AbstractProcessor {
    +    // Attributes
    +    public static final String HIVE_STREAMING_RECORD_COUNT_ATTR = "hivestreaming.record.count";
    +
    +    private static final String CLIENT_CACHE_DISABLED_PROPERTY = "hcatalog.hive.client.cache.disabled";
    +
    +    // Properties
    +    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
    +            .name("record-reader")
    +            .displayName("Record Reader")
    +            .description("The service for reading records from incoming flow files.")
    +            .identifiesControllerService(RecordReaderFactory.class)
    +            .required(true)
    +            .build();
    +
    +    static final PropertyDescriptor METASTORE_URI = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-metastore-uri")
    +            .displayName("Hive Metastore URI")
    +            .description("The URI location for the Hive Metastore. Note that this is not the location of the Hive Server. The default port for the "
    +                    + "Hive metastore is 9043.")
    +            .required(true)
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.URI_VALIDATOR)
    +            .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))) // no start with / or end with /
    +            .build();
    +
    +    static final PropertyDescriptor HIVE_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder()
    +            .name("hive3-config-resources")
    +            .displayName("Hive Configuration Resources")
    +            .description("A file or comma separated list of files which contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop "
    +                    + "will search the classpath for a 'hive-site.xml' file or will revert to a default configuration. Note that to enable authentication "
    +                    + "with Kerberos e.g., the appropriate properties must be set in the configuration files. Also note that if Max Concurrent Tasks is set "
    +                    + "to a number greater than one, the 'hcatalog.hive.client.cache.disabled' property will be forced to 'true' to avoid concurrency issues. "
    +                    + "Please see the Hive documentation for more details.")
    +            .required(false)
    +            .addValidator(HiveUtils.createMultipleFilesExistValidator())
    +            .build();
    +
    +    static final PropertyDescriptor DB_NAME = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-database-name")
    +            .displayName("Database Name")
    +            .description("The name of the database in which to put the data.")
    +            .required(true)
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-table-name")
    +            .displayName("Table Name")
    +            .description("The name of the database table in which to put the data.")
    +            .required(true)
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor PARTITION_VALUES = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-part-vals")
    +            .displayName("Partition Values")
    +            .description("Specifies a comma-separated list of the values for the partition columns of the target table. If the incoming records all have the same values "
    +                    + "for the partition columns, those values can be entered here, resulting in a performance gain. If specified, this property will often contain "
    +                    + "Expression Language, for example if PartitionRecord is upstream and two partitions 'name' and 'age' are used, then this property can be set to "
    +                    + "${name},${age}.")
    +            .required(false)
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor AUTOCREATE_PARTITIONS = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-autocreate-partition")
    +            .displayName("Auto-Create Partitions")
    +            .description("Flag indicating whether partitions should be automatically created")
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    static final PropertyDescriptor CALL_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-call-timeout")
    +            .displayName("Call Timeout")
    +            .description("The number of seconds allowed for a Hive Streaming operation to complete. A value of 0 indicates the processor should wait indefinitely on operations. "
    +                    + "Note that although this property supports Expression Language, it will not be evaluated against incoming FlowFile attributes.")
    +            .defaultValue("0")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +            .build();
    +
    +    static final PropertyDescriptor DISABLE_STREAMING_OPTIMIZATIONS = new PropertyDescriptor.Builder()
    +            .name("hive3-stream-disable-optimizations")
    +            .displayName("Disable Streaming Optimizations")
    +            .description("Whether to disable streaming optimizations. Disabling streaming optimizations will have significant impact to performance and memory consumption.")
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .allowableValues("true", "false")
    +            .defaultValue("false")
    +            .build();
    +
    +
    +    static final PropertyDescriptor ROLLBACK_ON_FAILURE = RollbackOnFailure.createRollbackOnFailureProperty(
    +            "NOTE: When an error occurred after a Hive streaming transaction which is derived from the same input FlowFile is already committed," +
    +                    " (i.e. a FlowFile contains more records than 'Records per Transaction' and a failure occurred at the 2nd transaction or later)" +
    +                    " then the succeeded records will be transferred to 'success' relationship while the original input FlowFile stays in incoming queue." +
    +                    " Duplicated records can be created for the succeeded ones when the same FlowFile is processed again.");
    +
    +    static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
    +            .name("kerberos-credentials-service")
    +            .displayName("Kerberos Credentials Service")
    +            .description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos")
    +            .identifiesControllerService(KerberosCredentialsService.class)
    +            .required(false)
    +            .build();
    +
    +    // Relationships
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("A FlowFile containing Avro records routed to this relationship after the record has been successfully transmitted to Hive.")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("A FlowFile containing Avro records routed to this relationship if the record could not be transmitted to Hive.")
    +            .build();
    +
    +    public static final Relationship REL_RETRY = new Relationship.Builder()
    +            .name("retry")
    +            .description("The incoming FlowFile is routed to this relationship if its records cannot be transmitted to Hive. Note that "
    +                    + "some records may have been processed successfully, they will be routed (as Avro flow files) to the success relationship. "
    +                    + "The combination of the retry, success, and failure relationships indicate how many records succeeded and/or failed. This "
    +                    + "can be used to provide a retry capability since full rollback is not possible.")
    +            .build();
    +
    +    private List<PropertyDescriptor> propertyDescriptors;
    +    private Set<Relationship> relationships;
    +
    +    protected volatile HiveConfigurator hiveConfigurator = new HiveConfigurator();
    +    protected volatile UserGroupInformation ugi;
    +    protected volatile HiveConf hiveConfig;
    +
    +    protected volatile int callTimeout;
    +    protected ExecutorService callTimeoutPool;
    +    protected volatile boolean rollbackOnFailure;
    +
    +    // Holder of cached Configuration information so validation does not reload the same config over and over
    +    private final AtomicReference<ValidationResources> validationResourceHolder = new AtomicReference<>();
    +
    +    @Override
    +    protected void init(ProcessorInitializationContext context) {
    +        List<PropertyDescriptor> props = new ArrayList<>();
    +        props.add(RECORD_READER);
    +        props.add(METASTORE_URI);
    +        props.add(HIVE_CONFIGURATION_RESOURCES);
    +        props.add(DB_NAME);
    +        props.add(TABLE_NAME);
    +        props.add(PARTITION_VALUES);
    +        props.add(AUTOCREATE_PARTITIONS);
    +        props.add(CALL_TIMEOUT);
    +        props.add(DISABLE_STREAMING_OPTIMIZATIONS);
    +        props.add(ROLLBACK_ON_FAILURE);
    +        props.add(KERBEROS_CREDENTIALS_SERVICE);
    +
    +        propertyDescriptors = Collections.unmodifiableList(props);
    +
    +        Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_SUCCESS);
    +        _relationships.add(REL_FAILURE);
    +        _relationships.add(REL_RETRY);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
    +        boolean confFileProvided = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).isSet();
    +
    +        final List<ValidationResult> problems = new ArrayList<>();
    +
    +        final KerberosCredentialsService credentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
    +
    +        final String resolvedPrincipal = credentialsService != null ? credentialsService.getPrincipal() : null;
    +        final String resolvedKeytab = credentialsService != null ? credentialsService.getKeytab() : null;
    +        if (confFileProvided) {
    +            final String configFiles = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
    +            problems.addAll(hiveConfigurator.validate(configFiles, resolvedPrincipal, resolvedKeytab, validationResourceHolder, getLogger()));
    +        }
    +
    +        return problems;
    +    }
    +
    +    @OnScheduled
    +    public void setup(final ProcessContext context) {
    +        ComponentLog log = getLogger();
    +        rollbackOnFailure = context.getProperty(ROLLBACK_ON_FAILURE).asBoolean();
    +
    +        final String configFiles = context.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue();
    +        hiveConfig = hiveConfigurator.getConfigurationFromFiles(configFiles);
    +
    +        // If more than one concurrent task, force 'hcatalog.hive.client.cache.disabled' to true
    +        if (context.getMaxConcurrentTasks() > 1) {
    +            hiveConfig.setBoolean(CLIENT_CACHE_DISABLED_PROPERTY, true);
    +        }
    +
    +        // add any dynamic properties to the Hive configuration
    +        for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
    +            final PropertyDescriptor descriptor = entry.getKey();
    +            if (descriptor.isDynamic()) {
    +                hiveConfig.set(descriptor.getName(), entry.getValue());
    +            }
    +        }
    +
    +        hiveConfigurator.preload(hiveConfig);
    +
    +        if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
    +            final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
    +
    +            final String resolvedPrincipal = credentialsService.getPrincipal();
    +            final String resolvedKeytab = credentialsService.getKeytab();
    +
    +            log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[]{resolvedPrincipal, resolvedKeytab});
    +            try {
    +                ugi = hiveConfigurator.authenticate(hiveConfig, resolvedPrincipal, resolvedKeytab);
    +            } catch (AuthenticationFailedException ae) {
    +                throw new ProcessException("Kerberos authentication failed for Hive Streaming", ae);
    +            }
    +
    +            log.info("Successfully logged in as principal {} with keytab {}", new Object[]{resolvedPrincipal, resolvedKeytab});
    +        } else {
    +            ugi = null;
    +        }
    +
    +        callTimeout = context.getProperty(CALL_TIMEOUT).evaluateAttributeExpressions().asInteger() * 1000; // milliseconds
    +        String timeoutName = "put-hive3-streaming-%d";
    +        this.callTimeoutPool = Executors.newFixedThreadPool(1,
    +                new ThreadFactoryBuilder().setNameFormat(timeoutName).build());
    +    }
    +
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
    +        final String dbName = context.getProperty(DB_NAME).evaluateAttributeExpressions(flowFile).getValue();
    +        final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
    +
    +        final ComponentLog log = getLogger();
    +        final String metastoreUri = context.getProperty(METASTORE_URI).evaluateAttributeExpressions(flowFile).getValue();
    +
    +        final String partitionValuesString = context.getProperty(PARTITION_VALUES).evaluateAttributeExpressions(flowFile).getValue();
    +        final boolean autoCreatePartitions = context.getProperty(AUTOCREATE_PARTITIONS).asBoolean();
    +        final boolean disableStreamingOptimizations = context.getProperty(DISABLE_STREAMING_OPTIMIZATIONS).asBoolean();
    +
    +        HiveOptions o = new HiveOptions(metastoreUri, dbName, tableName)
    +                .withHiveConf(hiveConfig)
    +                .withAutoCreatePartitions(autoCreatePartitions)
    +                .withCallTimeout(callTimeout)
    +                .withStreamingOptimizations(!disableStreamingOptimizations);
    +
    +        if (!StringUtils.isEmpty(partitionValuesString)) {
    +            List<String> staticPartitionValues = Arrays.stream(partitionValuesString.split(",")).filter(Objects::nonNull).map(String::trim).collect(Collectors.toList());
    +            o = o.withStaticPartitionValues(staticPartitionValues);
    +        }
    +
    +        if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
    +            final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
    +            o = o.withKerberosPrincipal(credentialsService.getPrincipal()).withKerberosKeytab(credentialsService.getKeytab());
    +        }
    +
    +        final HiveOptions options = o;
    +
    +        // Store the original class loader, then explicitly set it to this class's classloader (for use by the Hive Metastore)
    +        ClassLoader originalClassloader = Thread.currentThread().getContextClassLoader();
    +        Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
    +
    +        StreamingConnection hiveStreamingConnection = null;
    +
    +        try (final InputStream rawIn = session.read(flowFile)) {
    +            long processedRecords = 0L;
    +            final RecordReader reader;
    +
    +            try (final BufferedInputStream in = new BufferedInputStream(rawIn)) {
    +
    +                // if we fail to create the RecordReader then we want to route to failure, so we need to
    +                // handle this separately from the other IOExceptions which normally route to retry
    +                try {
    +                    reader = recordReaderFactory.createRecordReader(flowFile, in, getLogger());
    +                } catch (Exception e) {
    +                    throw new RecordReaderFactoryException("Unable to create RecordReader", e);
    +                }
    +
    +                hiveStreamingConnection = makeStreamingConnection(options, reader);
    +
    +                // Write records to Hive streaming, then commit and close
    +                hiveStreamingConnection.beginTransaction();
    +                hiveStreamingConnection.write(in);
    --- End diff --
    
    Looks like we are creating one connection per flow file. I don't know anything about flow file. How many rows does flow file typically have? Recommendation is to avoid making too frequent commits. Commit after 100000 to 1M rows will get best performance. 


---

[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/nifi/pull/2755


---