You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by mattyb149 <gi...@git.apache.org> on 2016/07/22 15:17:47 UTC

[GitHub] nifi pull request #706: NIFI-1868: Add PutHiveStreaming processor

GitHub user mattyb149 opened a pull request:

    https://github.com/apache/nifi/pull/706

    NIFI-1868: Add PutHiveStreaming processor

    The second commit is to (temporarily) remove ConvertAvroToORC (and all ORC references). The Hive processors (for 1.0) must work with Hive 1.2.1, which was before ORC was split into its own Apache project. In order for PutHiveStreaming (and the rest of the bundle) to compile against Hive 1.2.1, the Hive version was downgraded and all ORC references were removed.
    
    NIFI-1663 was reopened to refactor ConvertAvroToORC to use hive-orc in Hive 1.2.1 rather than Apache ORC. That Jira will restore the ConvertAvroToORC processor.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mattyb149/nifi NIFI-1868_1.0

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/706.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #706
    
----
commit 21ff63b29594e32361ad5ceb28885263f75957c8
Author: Matt Burgess <ma...@apache.org>
Date:   2016-07-21T15:59:41Z

    NIFI-1868: Add PutHiveStreaming processor

commit 4328d4dea1a81f342fa6e521235621ba526c7301
Author: Matt Burgess <ma...@apache.org>
Date:   2016-07-22T15:14:16Z

    NIFI-1868: Downgrade to Hive 1.2.1 and remove ConvertAvroToORC

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #706: NIFI-1868: Add PutHiveStreaming processor

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/706#discussion_r72871651
  
    --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java ---
    @@ -0,0 +1,657 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.hive;
    +
    +import com.google.common.util.concurrent.ThreadFactoryBuilder;
    +import org.apache.avro.Schema;
    +import org.apache.avro.file.DataFileStream;
    +import org.apache.avro.generic.GenericDatumReader;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.security.UserGroupInformation;
    +import org.apache.hive.hcatalog.streaming.ConnectionError;
    +import org.apache.hive.hcatalog.streaming.HiveEndPoint;
    +import org.apache.hive.hcatalog.streaming.SerializationError;
    +import org.apache.hive.hcatalog.streaming.StreamingException;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.hadoop.KerberosProperties;
    +import org.apache.nifi.hadoop.SecurityUtil;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.util.NiFiProperties;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.nifi.util.hive.AuthenticationFailedException;
    +import org.apache.nifi.util.hive.HiveConfigurator;
    +import org.apache.nifi.util.hive.HiveOptions;
    +import org.apache.nifi.util.hive.HiveUtils;
    +import org.apache.nifi.util.hive.HiveWriter;
    +import org.json.JSONException;
    +import org.json.JSONObject;
    +
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.Timer;
    +import java.util.TimerTask;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.regex.Pattern;
    +
    +/**
    + * This processor utilizes the Hive Streaming capability to insert data from the flow into a Hive database table.
    + */
    +@Tags({"hive", "streaming", "put", "database", "store"})
    +@CapabilityDescription("This processor uses Hive Streaming to send flow file data to an Apache Hive table. The incoming flow file is expected to be in "
    +        + "Avro format and the table must exist in Hive. Please see the Hive documentation for requirements on the Hive table (format, partitions, etc.). "
    +        + "The partition values are extracted from the Avro record based on the names of the partition columns as specified in the processor. ")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "hivestreaming.record.count", description = "The number of records from this flow file written using Hive Streaming.")
    +})
    +public class PutHiveStreaming extends AbstractProcessor {
    +
    +    // Attributes
    +    public static final String HIVE_STREAMING_RECORD_COUNT_ATTR = "hivestreaming.record.count";
    +
    +    // Validators
    +    private static final Validator GREATER_THAN_ONE_VALIDATOR = (subject, value, context) -> {
    +        if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
    +            return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
    +        }
    +
    +        String reason = null;
    +        try {
    +            final int intVal = Integer.parseInt(value);
    +
    +            if (intVal < 2) {
    +                reason = "value is less than 2";
    +            }
    +        } catch (final NumberFormatException e) {
    +            reason = "value is not a valid integer";
    +        }
    +
    +        return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build();
    +    };
    +
    +    // Properties
    +    public static final PropertyDescriptor METASTORE_URI = new PropertyDescriptor.Builder()
    +            .name("hive-stream-metastore-uri")
    +            .displayName("Hive Metastore URI")
    +            .description("The URI location for the Hive Metastore. Note that this is not the location of the Hive Server. The default port for the "
    +                    + "Hive metastore is 9043.")
    +            .required(true)
    +            .addValidator(StandardValidators.URI_VALIDATOR)
    +            .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))) // no start with / or end with /
    +            .build();
    +
    +    public static final PropertyDescriptor HIVE_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder()
    +            .name("hive-config-resources")
    +            .displayName("Hive Configuration Resources")
    +            .description("A file or comma separated list of files which contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop "
    +                    + "will search the classpath for a 'hive-site.xml' file or will revert to a default configuration. Note that to enable authentication "
    +                    + "with Kerberos e.g., the appropriate properties must be set in the configuration files. Please see the Hive documentation for more details.")
    +            .required(false)
    +            .addValidator(HiveUtils.createMultipleFilesExistValidator())
    +            .build();
    +
    +    public static final PropertyDescriptor DB_NAME = new PropertyDescriptor.Builder()
    +            .name("hive-stream-database-name")
    +            .displayName("Database Name")
    +            .description("The name of the database in which to put the data.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
    +            .name("hive-stream-table-name")
    +            .displayName("Table Name")
    +            .description("The name of the database table in which to put the data.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PARTITION_COLUMNS = new PropertyDescriptor.Builder()
    +            .name("hive-stream-partition-cols")
    +            .displayName("Partition Columns")
    +            .description("A comma-delimited list of column names on which the table has been partitioned. The order of values in this list must "
    +                    + "correspond exactly to the order of partition columns specified during the table creation.")
    +            .required(false)
    +            .expressionLanguageSupported(false)
    +            .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("[^,]+(,[^,]+)*"))) // comma-separated list with non-empty entries
    +            .build();
    +
    +    public static final PropertyDescriptor AUTOCREATE_PARTITIONS = new PropertyDescriptor.Builder()
    +            .name("hive-stream-autocreate-partition")
    +            .displayName("Auto-Create Partitions")
    +            .description("Flag indicating whether partitions should be automatically created")
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_OPEN_CONNECTIONS = new PropertyDescriptor.Builder()
    +            .name("hive-stream-max-open-connections")
    +            .displayName("Max Open Connections")
    +            .description("The maximum number of open connections that can be allocated from this pool at the same time, "
    +                    + "or negative for no limit.")
    +            .defaultValue("8")
    +            .required(true)
    +            .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +            .sensitive(false)
    +            .build();
    +
    +    public static final PropertyDescriptor HEARTBEAT_INTERVAL = new PropertyDescriptor.Builder()
    +            .name("hive-stream-heartbeat-interval")
    +            .displayName("Heartbeat Interval")
    +            .description("Indicates that a heartbeat should be sent when the specified number of seconds has elapsed. "
    +                    + "A value of 0 indicates that no heartbeat should be sent.")
    +            .defaultValue("60")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .sensitive(false)
    +            .build();
    +
    +    public static final PropertyDescriptor TXNS_PER_BATCH = new PropertyDescriptor.Builder()
    +            .name("hive-stream-transactions-per-batch")
    +            .displayName("Transactions per Batch")
    +            .description("A hint to Hive Streaming indicating how many transactions the processor task will need. This value must be greater than 1.")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .addValidator(GREATER_THAN_ONE_VALIDATOR)
    +            .defaultValue("100")
    +            .build();
    +
    +    // Relationships
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("A FlowFile is routed to this relationship after the database is successfully updated")
    +            .build();
    +    public static final Relationship REL_RETRY = new Relationship.Builder()
    +            .name("retry")
    +            .description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed")
    +            .build();
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("A FlowFile is routed to this relationship if the database cannot be updated and retrying the operation will also fail.")
    +            .build();
    +
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +    private final static Set<Relationship> relationships;
    +
    +    private static final long TICKET_RENEWAL_PERIOD = 60000;
    +
    +    protected KerberosProperties kerberosProperties;
    +
    +    protected volatile HiveConfigurator hiveConfigurator = new HiveConfigurator();
    +    protected volatile UserGroupInformation ugi;
    +
    +    protected final AtomicBoolean isInitialized = new AtomicBoolean(false);
    +
    +    protected HiveOptions options;
    +    protected ExecutorService callTimeoutPool;
    +    protected transient Timer heartBeatTimer;
    +    protected AtomicBoolean sendHeartBeat = new AtomicBoolean(false);
    +    protected Map<HiveEndPoint, HiveWriter> allWriters;
    +
    +
    +    /*
    +     * Will ensure that the list of property descriptors is build only once.
    +     * Will also create a Set of relationships
    +     */
    +    static {
    +        propertyDescriptors = new ArrayList<>();
    +        propertyDescriptors.add(METASTORE_URI);
    +        propertyDescriptors.add(HIVE_CONFIGURATION_RESOURCES);
    +        propertyDescriptors.add(DB_NAME);
    +        propertyDescriptors.add(TABLE_NAME);
    +        propertyDescriptors.add(PARTITION_COLUMNS);
    +        propertyDescriptors.add(AUTOCREATE_PARTITIONS);
    +        propertyDescriptors.add(MAX_OPEN_CONNECTIONS);
    +        propertyDescriptors.add(HEARTBEAT_INTERVAL);
    +        propertyDescriptors.add(TXNS_PER_BATCH);
    +
    +        Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_SUCCESS);
    +        _relationships.add(REL_FAILURE);
    +        _relationships.add(REL_RETRY);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    @Override
    +    protected void init(ProcessorInitializationContext context) {
    +        kerberosProperties = getKerberosProperties();
    +        propertyDescriptors.add(kerberosProperties.getKerberosPrincipal());
    +        propertyDescriptors.add(kerberosProperties.getKerberosKeytab());
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +
    +    @OnScheduled
    +    public void setup(final ProcessContext context) {
    +        ComponentLog log = getLogger();
    +
    +        final String metastoreUri = context.getProperty(METASTORE_URI).getValue();
    +        final String dbName = context.getProperty(DB_NAME).getValue();
    --- End diff --
    
    Sounds like a good Improvement Jira if folks really want the flexibility? I didn't hear much clamor about users wanting to support streaming to multiple DBs/tables from a single processor, and the HiveWriters are currently cached based on the partition values, so this cache would multiply in size by the number of variations in the options. Definitely worth investigation though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #706: NIFI-1868: Add PutHiveStreaming processor

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/706#discussion_r72677845
  
    --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java ---
    @@ -0,0 +1,657 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.hive;
    +
    +import com.google.common.util.concurrent.ThreadFactoryBuilder;
    +import org.apache.avro.Schema;
    +import org.apache.avro.file.DataFileStream;
    +import org.apache.avro.generic.GenericDatumReader;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.security.UserGroupInformation;
    +import org.apache.hive.hcatalog.streaming.ConnectionError;
    +import org.apache.hive.hcatalog.streaming.HiveEndPoint;
    +import org.apache.hive.hcatalog.streaming.SerializationError;
    +import org.apache.hive.hcatalog.streaming.StreamingException;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.hadoop.KerberosProperties;
    +import org.apache.nifi.hadoop.SecurityUtil;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.util.NiFiProperties;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.nifi.util.hive.AuthenticationFailedException;
    +import org.apache.nifi.util.hive.HiveConfigurator;
    +import org.apache.nifi.util.hive.HiveOptions;
    +import org.apache.nifi.util.hive.HiveUtils;
    +import org.apache.nifi.util.hive.HiveWriter;
    +import org.json.JSONException;
    +import org.json.JSONObject;
    +
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.Timer;
    +import java.util.TimerTask;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.regex.Pattern;
    +
    +/**
    + * This processor utilizes the Hive Streaming capability to insert data from the flow into a Hive database table.
    + */
    +@Tags({"hive", "streaming", "put", "database", "store"})
    +@CapabilityDescription("This processor uses Hive Streaming to send flow file data to an Apache Hive table. The incoming flow file is expected to be in "
    +        + "Avro format and the table must exist in Hive. Please see the Hive documentation for requirements on the Hive table (format, partitions, etc.). "
    +        + "The partition values are extracted from the Avro record based on the names of the partition columns as specified in the processor. ")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "hivestreaming.record.count", description = "The number of records from this flow file written using Hive Streaming.")
    +})
    +public class PutHiveStreaming extends AbstractProcessor {
    +
    +    // Attributes
    +    public static final String HIVE_STREAMING_RECORD_COUNT_ATTR = "hivestreaming.record.count";
    +
    +    // Validators
    +    private static final Validator GREATER_THAN_ONE_VALIDATOR = (subject, value, context) -> {
    +        if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
    +            return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
    +        }
    +
    +        String reason = null;
    +        try {
    +            final int intVal = Integer.parseInt(value);
    +
    +            if (intVal < 2) {
    +                reason = "value is less than 2";
    +            }
    +        } catch (final NumberFormatException e) {
    +            reason = "value is not a valid integer";
    +        }
    +
    +        return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build();
    +    };
    +
    +    // Properties
    +    public static final PropertyDescriptor METASTORE_URI = new PropertyDescriptor.Builder()
    +            .name("hive-stream-metastore-uri")
    +            .displayName("Hive Metastore URI")
    +            .description("The URI location for the Hive Metastore. Note that this is not the location of the Hive Server. The default port for the "
    +                    + "Hive metastore is 9043.")
    +            .required(true)
    +            .addValidator(StandardValidators.URI_VALIDATOR)
    +            .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))) // no start with / or end with /
    +            .build();
    +
    +    public static final PropertyDescriptor HIVE_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder()
    +            .name("hive-config-resources")
    +            .displayName("Hive Configuration Resources")
    +            .description("A file or comma separated list of files which contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop "
    +                    + "will search the classpath for a 'hive-site.xml' file or will revert to a default configuration. Note that to enable authentication "
    +                    + "with Kerberos e.g., the appropriate properties must be set in the configuration files. Please see the Hive documentation for more details.")
    +            .required(false)
    +            .addValidator(HiveUtils.createMultipleFilesExistValidator())
    +            .build();
    +
    +    public static final PropertyDescriptor DB_NAME = new PropertyDescriptor.Builder()
    +            .name("hive-stream-database-name")
    +            .displayName("Database Name")
    +            .description("The name of the database in which to put the data.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
    +            .name("hive-stream-table-name")
    +            .displayName("Table Name")
    +            .description("The name of the database table in which to put the data.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PARTITION_COLUMNS = new PropertyDescriptor.Builder()
    +            .name("hive-stream-partition-cols")
    +            .displayName("Partition Columns")
    +            .description("A comma-delimited list of column names on which the table has been partitioned. The order of values in this list must "
    +                    + "correspond exactly to the order of partition columns specified during the table creation.")
    +            .required(false)
    +            .expressionLanguageSupported(false)
    +            .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("[^,]+(,[^,]+)*"))) // comma-separated list with non-empty entries
    +            .build();
    +
    +    public static final PropertyDescriptor AUTOCREATE_PARTITIONS = new PropertyDescriptor.Builder()
    +            .name("hive-stream-autocreate-partition")
    +            .displayName("Auto-Create Partitions")
    +            .description("Flag indicating whether partitions should be automatically created")
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_OPEN_CONNECTIONS = new PropertyDescriptor.Builder()
    +            .name("hive-stream-max-open-connections")
    +            .displayName("Max Open Connections")
    +            .description("The maximum number of open connections that can be allocated from this pool at the same time, "
    +                    + "or negative for no limit.")
    +            .defaultValue("8")
    +            .required(true)
    +            .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +            .sensitive(false)
    +            .build();
    +
    +    public static final PropertyDescriptor HEARTBEAT_INTERVAL = new PropertyDescriptor.Builder()
    +            .name("hive-stream-heartbeat-interval")
    +            .displayName("Heartbeat Interval")
    +            .description("Indicates that a heartbeat should be sent when the specified number of seconds has elapsed. "
    +                    + "A value of 0 indicates that no heartbeat should be sent.")
    +            .defaultValue("60")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .sensitive(false)
    +            .build();
    +
    +    public static final PropertyDescriptor TXNS_PER_BATCH = new PropertyDescriptor.Builder()
    +            .name("hive-stream-transactions-per-batch")
    +            .displayName("Transactions per Batch")
    +            .description("A hint to Hive Streaming indicating how many transactions the processor task will need. This value must be greater than 1.")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .addValidator(GREATER_THAN_ONE_VALIDATOR)
    +            .defaultValue("100")
    +            .build();
    +
    +    // Relationships
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("A FlowFile is routed to this relationship after the database is successfully updated")
    +            .build();
    +    public static final Relationship REL_RETRY = new Relationship.Builder()
    +            .name("retry")
    +            .description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed")
    +            .build();
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("A FlowFile is routed to this relationship if the database cannot be updated and retrying the operation will also fail.")
    +            .build();
    +
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +    private final static Set<Relationship> relationships;
    +
    +    private static final long TICKET_RENEWAL_PERIOD = 60000;
    +
    +    protected KerberosProperties kerberosProperties;
    +
    +    protected volatile HiveConfigurator hiveConfigurator = new HiveConfigurator();
    +    protected volatile UserGroupInformation ugi;
    +
    +    protected final AtomicBoolean isInitialized = new AtomicBoolean(false);
    +
    +    protected HiveOptions options;
    +    protected ExecutorService callTimeoutPool;
    +    protected transient Timer heartBeatTimer;
    +    protected AtomicBoolean sendHeartBeat = new AtomicBoolean(false);
    +    protected Map<HiveEndPoint, HiveWriter> allWriters;
    +
    +
    +    /*
    +     * Will ensure that the list of property descriptors is build only once.
    +     * Will also create a Set of relationships
    +     */
    +    static {
    +        propertyDescriptors = new ArrayList<>();
    +        propertyDescriptors.add(METASTORE_URI);
    +        propertyDescriptors.add(HIVE_CONFIGURATION_RESOURCES);
    +        propertyDescriptors.add(DB_NAME);
    +        propertyDescriptors.add(TABLE_NAME);
    +        propertyDescriptors.add(PARTITION_COLUMNS);
    +        propertyDescriptors.add(AUTOCREATE_PARTITIONS);
    +        propertyDescriptors.add(MAX_OPEN_CONNECTIONS);
    +        propertyDescriptors.add(HEARTBEAT_INTERVAL);
    +        propertyDescriptors.add(TXNS_PER_BATCH);
    +
    +        Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_SUCCESS);
    +        _relationships.add(REL_FAILURE);
    +        _relationships.add(REL_RETRY);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    @Override
    +    protected void init(ProcessorInitializationContext context) {
    +        kerberosProperties = getKerberosProperties();
    +        propertyDescriptors.add(kerberosProperties.getKerberosPrincipal());
    +        propertyDescriptors.add(kerberosProperties.getKerberosKeytab());
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +
    +    @OnScheduled
    +    public void setup(final ProcessContext context) {
    +        ComponentLog log = getLogger();
    +
    +        final String metastoreUri = context.getProperty(METASTORE_URI).getValue();
    +        final String dbName = context.getProperty(DB_NAME).getValue();
    --- End diff --
    
    Would it make sense to support expression language for DB Name and Table Name? I suspect it would add a bit of complexity since you wouldn't be able to create the HiveOptions and login upfront and you'd have to do that each time through onTrigger and maintain some kind of cache of options, just throwing it out there as in idea


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #706: NIFI-1868: Add PutHiveStreaming processor

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on the issue:

    https://github.com/apache/nifi/pull/706
  
    Tested out the trigger serially and appears to be working as expected. I am a +1 to merge this in, do you want to squash the commits first?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #706: NIFI-1868: Add PutHiveStreaming processor

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on the issue:

    https://github.com/apache/nifi/pull/706
  
    TriggerSerially sounds fair knowing that we should be able to remove it in the future


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #706: NIFI-1868: Add PutHiveStreaming processor

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on the issue:

    https://github.com/apache/nifi/pull/706
  
    Per an offline discussion with @bbende, we decided a good way to handle the outgoing flow files is to have a "success", "failure", and "retry" relationship. If a connection / environment error occurs, the incoming flow file will be routed to retry. Otherwise, all records put successfully to Hive Streaming will go into an Avro record on the "success" relationship, and any records failed to be written will go into an Avro record on the "failure" relationship.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #706: NIFI-1868: Add PutHiveStreaming processor

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/706#discussion_r72797764
  
    --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java ---
    @@ -0,0 +1,657 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.hive;
    +
    +import com.google.common.util.concurrent.ThreadFactoryBuilder;
    +import org.apache.avro.Schema;
    +import org.apache.avro.file.DataFileStream;
    +import org.apache.avro.generic.GenericDatumReader;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.security.UserGroupInformation;
    +import org.apache.hive.hcatalog.streaming.ConnectionError;
    +import org.apache.hive.hcatalog.streaming.HiveEndPoint;
    +import org.apache.hive.hcatalog.streaming.SerializationError;
    +import org.apache.hive.hcatalog.streaming.StreamingException;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.hadoop.KerberosProperties;
    +import org.apache.nifi.hadoop.SecurityUtil;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.util.NiFiProperties;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.nifi.util.hive.AuthenticationFailedException;
    +import org.apache.nifi.util.hive.HiveConfigurator;
    +import org.apache.nifi.util.hive.HiveOptions;
    +import org.apache.nifi.util.hive.HiveUtils;
    +import org.apache.nifi.util.hive.HiveWriter;
    +import org.json.JSONException;
    +import org.json.JSONObject;
    +
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.Timer;
    +import java.util.TimerTask;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.regex.Pattern;
    +
    +/**
    + * This processor utilizes the Hive Streaming capability to insert data from the flow into a Hive database table.
    + */
    +@Tags({"hive", "streaming", "put", "database", "store"})
    +@CapabilityDescription("This processor uses Hive Streaming to send flow file data to an Apache Hive table. The incoming flow file is expected to be in "
    +        + "Avro format and the table must exist in Hive. Please see the Hive documentation for requirements on the Hive table (format, partitions, etc.). "
    +        + "The partition values are extracted from the Avro record based on the names of the partition columns as specified in the processor. ")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "hivestreaming.record.count", description = "The number of records from this flow file written using Hive Streaming.")
    +})
    +public class PutHiveStreaming extends AbstractProcessor {
    +
    +    // Attributes
    +    public static final String HIVE_STREAMING_RECORD_COUNT_ATTR = "hivestreaming.record.count";
    +
    +    // Validators
    +    private static final Validator GREATER_THAN_ONE_VALIDATOR = (subject, value, context) -> {
    +        if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
    +            return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
    +        }
    +
    +        String reason = null;
    +        try {
    +            final int intVal = Integer.parseInt(value);
    +
    +            if (intVal < 2) {
    +                reason = "value is less than 2";
    +            }
    +        } catch (final NumberFormatException e) {
    +            reason = "value is not a valid integer";
    +        }
    +
    +        return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build();
    +    };
    +
    +    // Properties
    +    public static final PropertyDescriptor METASTORE_URI = new PropertyDescriptor.Builder()
    +            .name("hive-stream-metastore-uri")
    +            .displayName("Hive Metastore URI")
    +            .description("The URI location for the Hive Metastore. Note that this is not the location of the Hive Server. The default port for the "
    +                    + "Hive metastore is 9043.")
    +            .required(true)
    +            .addValidator(StandardValidators.URI_VALIDATOR)
    +            .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))) // no start with / or end with /
    +            .build();
    +
    +    public static final PropertyDescriptor HIVE_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder()
    +            .name("hive-config-resources")
    +            .displayName("Hive Configuration Resources")
    +            .description("A file or comma separated list of files which contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop "
    +                    + "will search the classpath for a 'hive-site.xml' file or will revert to a default configuration. Note that to enable authentication "
    +                    + "with Kerberos e.g., the appropriate properties must be set in the configuration files. Please see the Hive documentation for more details.")
    +            .required(false)
    +            .addValidator(HiveUtils.createMultipleFilesExistValidator())
    +            .build();
    +
    +    public static final PropertyDescriptor DB_NAME = new PropertyDescriptor.Builder()
    +            .name("hive-stream-database-name")
    +            .displayName("Database Name")
    +            .description("The name of the database in which to put the data.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
    +            .name("hive-stream-table-name")
    +            .displayName("Table Name")
    +            .description("The name of the database table in which to put the data.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PARTITION_COLUMNS = new PropertyDescriptor.Builder()
    +            .name("hive-stream-partition-cols")
    +            .displayName("Partition Columns")
    +            .description("A comma-delimited list of column names on which the table has been partitioned. The order of values in this list must "
    +                    + "correspond exactly to the order of partition columns specified during the table creation.")
    +            .required(false)
    +            .expressionLanguageSupported(false)
    +            .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("[^,]+(,[^,]+)*"))) // comma-separated list with non-empty entries
    +            .build();
    +
    +    public static final PropertyDescriptor AUTOCREATE_PARTITIONS = new PropertyDescriptor.Builder()
    +            .name("hive-stream-autocreate-partition")
    +            .displayName("Auto-Create Partitions")
    +            .description("Flag indicating whether partitions should be automatically created")
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_OPEN_CONNECTIONS = new PropertyDescriptor.Builder()
    +            .name("hive-stream-max-open-connections")
    +            .displayName("Max Open Connections")
    +            .description("The maximum number of open connections that can be allocated from this pool at the same time, "
    +                    + "or negative for no limit.")
    +            .defaultValue("8")
    +            .required(true)
    +            .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +            .sensitive(false)
    +            .build();
    +
    +    public static final PropertyDescriptor HEARTBEAT_INTERVAL = new PropertyDescriptor.Builder()
    +            .name("hive-stream-heartbeat-interval")
    +            .displayName("Heartbeat Interval")
    +            .description("Indicates that a heartbeat should be sent when the specified number of seconds has elapsed. "
    +                    + "A value of 0 indicates that no heartbeat should be sent.")
    +            .defaultValue("60")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .sensitive(false)
    +            .build();
    +
    +    public static final PropertyDescriptor TXNS_PER_BATCH = new PropertyDescriptor.Builder()
    +            .name("hive-stream-transactions-per-batch")
    +            .displayName("Transactions per Batch")
    +            .description("A hint to Hive Streaming indicating how many transactions the processor task will need. This value must be greater than 1.")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .addValidator(GREATER_THAN_ONE_VALIDATOR)
    +            .defaultValue("100")
    +            .build();
    +
    +    // Relationships
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("A FlowFile is routed to this relationship after the database is successfully updated")
    +            .build();
    +    public static final Relationship REL_RETRY = new Relationship.Builder()
    +            .name("retry")
    +            .description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed")
    +            .build();
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("A FlowFile is routed to this relationship if the database cannot be updated and retrying the operation will also fail.")
    +            .build();
    +
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +    private final static Set<Relationship> relationships;
    +
    +    private static final long TICKET_RENEWAL_PERIOD = 60000;
    +
    +    protected KerberosProperties kerberosProperties;
    +
    +    protected volatile HiveConfigurator hiveConfigurator = new HiveConfigurator();
    +    protected volatile UserGroupInformation ugi;
    +
    +    protected final AtomicBoolean isInitialized = new AtomicBoolean(false);
    +
    +    protected HiveOptions options;
    +    protected ExecutorService callTimeoutPool;
    +    protected transient Timer heartBeatTimer;
    +    protected AtomicBoolean sendHeartBeat = new AtomicBoolean(false);
    +    protected Map<HiveEndPoint, HiveWriter> allWriters;
    +
    +
    +    /*
    +     * Will ensure that the list of property descriptors is build only once.
    +     * Will also create a Set of relationships
    +     */
    +    static {
    +        propertyDescriptors = new ArrayList<>();
    +        propertyDescriptors.add(METASTORE_URI);
    +        propertyDescriptors.add(HIVE_CONFIGURATION_RESOURCES);
    +        propertyDescriptors.add(DB_NAME);
    +        propertyDescriptors.add(TABLE_NAME);
    +        propertyDescriptors.add(PARTITION_COLUMNS);
    +        propertyDescriptors.add(AUTOCREATE_PARTITIONS);
    +        propertyDescriptors.add(MAX_OPEN_CONNECTIONS);
    +        propertyDescriptors.add(HEARTBEAT_INTERVAL);
    +        propertyDescriptors.add(TXNS_PER_BATCH);
    +
    +        Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_SUCCESS);
    +        _relationships.add(REL_FAILURE);
    +        _relationships.add(REL_RETRY);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    @Override
    +    protected void init(ProcessorInitializationContext context) {
    +        kerberosProperties = getKerberosProperties();
    +        propertyDescriptors.add(kerberosProperties.getKerberosPrincipal());
    +        propertyDescriptors.add(kerberosProperties.getKerberosKeytab());
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +
    +    @OnScheduled
    +    public void setup(final ProcessContext context) {
    +        ComponentLog log = getLogger();
    +
    +        final String metastoreUri = context.getProperty(METASTORE_URI).getValue();
    +        final String dbName = context.getProperty(DB_NAME).getValue();
    +        final String tableName = context.getProperty(TABLE_NAME).getValue();
    +        final boolean autoCreatePartitions = context.getProperty(AUTOCREATE_PARTITIONS).asBoolean();
    +        final Integer maxConnections = context.getProperty(MAX_OPEN_CONNECTIONS).asInteger();
    +        final Integer heartbeatInterval = context.getProperty(HEARTBEAT_INTERVAL).asInteger();
    +        final Integer txnsPerBatch = context.getProperty(TXNS_PER_BATCH).asInteger();
    +        final String configFiles = context.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue();
    +        final Configuration hiveConfig = hiveConfigurator.getConfigurationFromFiles(configFiles);
    +
    +        // add any dynamic properties to the Hive configuration
    +        for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
    +            final PropertyDescriptor descriptor = entry.getKey();
    +            if (descriptor.isDynamic()) {
    +                hiveConfig.set(descriptor.getName(), entry.getValue());
    +            }
    +        }
    +
    +        options = new HiveOptions(metastoreUri, dbName, tableName)
    +                .withTxnsPerBatch(txnsPerBatch)
    +                .withAutoCreatePartitions(autoCreatePartitions)
    +                .withMaxOpenConnections(maxConnections)
    +                .withHeartBeatInterval(heartbeatInterval);
    +
    +        if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
    +            final String principal = context.getProperty(kerberosProperties.getKerberosPrincipal()).getValue();
    +            final String keyTab = context.getProperty(kerberosProperties.getKerberosKeytab()).getValue();
    +
    +            log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[]{principal, keyTab});
    +            try {
    +                ugi = hiveConfigurator.authenticate(hiveConfig, principal, keyTab, TICKET_RENEWAL_PERIOD, log);
    +            } catch (AuthenticationFailedException ae) {
    +                throw new ProcessException("Kerberos authentication failed for Hive Streaming", ae);
    +            }
    +            log.info("Successfully logged in as principal {} with keytab {}", new Object[]{principal, keyTab});
    +            options = options.withKerberosPrincipal(principal).withKerberosKeytab(keyTab);
    +        }
    +
    +        allWriters = new ConcurrentHashMap<>();
    +        String timeoutName = "put-hive-streaming-%d";
    +        this.callTimeoutPool = Executors.newFixedThreadPool(1,
    +                new ThreadFactoryBuilder().setNameFormat(timeoutName).build());
    +
    +        sendHeartBeat.set(true);
    +        heartBeatTimer = new Timer();
    +        setupHeartBeatTimer();
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final ComponentLog log = getLogger();
    +        try {
    +            final List<String> partitionColumnList;
    +            String partitionColumns = context.getProperty(PARTITION_COLUMNS).getValue();
    +            if (StringUtils.isEmpty(partitionColumns)) {
    +                partitionColumnList = Collections.emptyList();
    +            } else {
    +                String[] partitionCols = partitionColumns.split(",");
    +                partitionColumnList = new ArrayList<>(partitionCols.length);
    +                for (String col : partitionCols) {
    +                    partitionColumnList.add(col.trim());
    +                }
    +            }
    +
    +            // Store the original class loader, then explicitly set it to this class's classloader (for use by the Hive Metastore)
    +            ClassLoader originalClassloader = Thread.currentThread().getContextClassLoader();
    +            Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
    +
    +            int recordCount = 0;
    +            final List<HiveStreamingRecord> records = new LinkedList<>();
    +
    +            session.read(flowFile, in -> {
    +
    +                try (final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
    +
    +                    GenericRecord currRecord;
    +                    while (reader.hasNext()) {
    +                        currRecord = reader.next();
    +                        List<String> partitionValues = new ArrayList<>();
    +
    +                        for (String partition : partitionColumnList) {
    +                            Object partitionValue = currRecord.get(partition);
    +                            if (partitionValue == null) {
    +                                throw new IOException("Partition column '" + partition + "' not found in Avro record");
    +                            }
    +                            partitionValues.add(partitionValue.toString());
    +                        }
    +
    +                        List<Schema.Field> fields = currRecord.getSchema().getFields();
    +                        if (fields != null) {
    +                            JSONObject obj = new JSONObject();
    +                            for (Schema.Field field : fields) {
    +                                String fieldName = field.name();
    +                                // Skip fields that are partition columns, we extracted those values above to create an EndPoint
    +                                if (!partitionColumnList.contains(fieldName)) {
    +                                    Object value = currRecord.get(fieldName);
    +                                    try {
    +                                        obj.put(fieldName, value);
    +                                    } catch (JSONException je) {
    +                                        throw new IOException(je);
    +                                    }
    +                                }
    +                            }
    +                            records.add(new HiveStreamingRecord(partitionValues, obj));
    +                        }
    +                    }
    +                }
    +            });
    +
    +            // Write all records to Hive Streaming
    +            for (HiveStreamingRecord record : records) {
    --- End diff --
    
    HiveWriter will batch up a number of records for efficiency, but you're right, here I'm waiting until all the records are in memory. There was some reason (probably incorrect) that I had it outside the loop, but if that reason was only to keep the recordCount (which would need to be final for the callback), I can just use an AtomicInteger instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #706: NIFI-1868: Add PutHiveStreaming processor

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/nifi/pull/706


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #706: NIFI-1868: Add PutHiveStreaming processor

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on the issue:

    https://github.com/apache/nifi/pull/706
  
    Error handling on the relationships is looking good, thanks for making those updates.
    
    One more thing I ran into, when I set concurrent tasks to 2 on the PutHiveStreamingProcessor and I was generating a flow file every 500 ms, I would occasionally get the following error:
    
    ```
    2016-08-02 20:50:17,860 ERROR [Timer-Driven Process Thread-1] o.a.n.processors.hive.PutHiveStreaming
    java.lang.NullPointerException: null
    	at org.apache.hive.hcatalog.streaming.StrictJsonWriter.write(StrictJsonWriter.java:79) ~[na:na]
    	at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:632) ~[na:na]
    	at org.apache.nifi.util.hive.HiveWriter$1.call(HiveWriter.java:113) ~[na:na]
    	at org.apache.nifi.util.hive.HiveWriter$1.call(HiveWriter.java:110) ~[na:na]
    	at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_102]
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_102]
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_102]
    	at java.lang.Thread.run(Thread.java:745) [na:1.8.0_102] 
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #706: NIFI-1868: Add PutHiveStreaming processor

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/706#discussion_r72673853
  
    --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java ---
    @@ -0,0 +1,657 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.hive;
    +
    +import com.google.common.util.concurrent.ThreadFactoryBuilder;
    +import org.apache.avro.Schema;
    +import org.apache.avro.file.DataFileStream;
    +import org.apache.avro.generic.GenericDatumReader;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.security.UserGroupInformation;
    +import org.apache.hive.hcatalog.streaming.ConnectionError;
    +import org.apache.hive.hcatalog.streaming.HiveEndPoint;
    +import org.apache.hive.hcatalog.streaming.SerializationError;
    +import org.apache.hive.hcatalog.streaming.StreamingException;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.hadoop.KerberosProperties;
    +import org.apache.nifi.hadoop.SecurityUtil;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.util.NiFiProperties;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.nifi.util.hive.AuthenticationFailedException;
    +import org.apache.nifi.util.hive.HiveConfigurator;
    +import org.apache.nifi.util.hive.HiveOptions;
    +import org.apache.nifi.util.hive.HiveUtils;
    +import org.apache.nifi.util.hive.HiveWriter;
    +import org.json.JSONException;
    +import org.json.JSONObject;
    +
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.Timer;
    +import java.util.TimerTask;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.regex.Pattern;
    +
    +/**
    + * This processor utilizes the Hive Streaming capability to insert data from the flow into a Hive database table.
    + */
    +@Tags({"hive", "streaming", "put", "database", "store"})
    +@CapabilityDescription("This processor uses Hive Streaming to send flow file data to an Apache Hive table. The incoming flow file is expected to be in "
    +        + "Avro format and the table must exist in Hive. Please see the Hive documentation for requirements on the Hive table (format, partitions, etc.). "
    +        + "The partition values are extracted from the Avro record based on the names of the partition columns as specified in the processor. ")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "hivestreaming.record.count", description = "The number of records from this flow file written using Hive Streaming.")
    +})
    +public class PutHiveStreaming extends AbstractProcessor {
    +
    +    // Attributes
    +    public static final String HIVE_STREAMING_RECORD_COUNT_ATTR = "hivestreaming.record.count";
    +
    +    // Validators
    +    private static final Validator GREATER_THAN_ONE_VALIDATOR = (subject, value, context) -> {
    +        if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
    +            return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
    +        }
    +
    +        String reason = null;
    +        try {
    +            final int intVal = Integer.parseInt(value);
    +
    +            if (intVal < 2) {
    +                reason = "value is less than 2";
    +            }
    +        } catch (final NumberFormatException e) {
    +            reason = "value is not a valid integer";
    +        }
    +
    +        return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build();
    +    };
    +
    +    // Properties
    +    public static final PropertyDescriptor METASTORE_URI = new PropertyDescriptor.Builder()
    +            .name("hive-stream-metastore-uri")
    +            .displayName("Hive Metastore URI")
    +            .description("The URI location for the Hive Metastore. Note that this is not the location of the Hive Server. The default port for the "
    +                    + "Hive metastore is 9043.")
    +            .required(true)
    +            .addValidator(StandardValidators.URI_VALIDATOR)
    +            .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))) // no start with / or end with /
    +            .build();
    +
    +    public static final PropertyDescriptor HIVE_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder()
    +            .name("hive-config-resources")
    +            .displayName("Hive Configuration Resources")
    +            .description("A file or comma separated list of files which contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop "
    +                    + "will search the classpath for a 'hive-site.xml' file or will revert to a default configuration. Note that to enable authentication "
    +                    + "with Kerberos e.g., the appropriate properties must be set in the configuration files. Please see the Hive documentation for more details.")
    +            .required(false)
    +            .addValidator(HiveUtils.createMultipleFilesExistValidator())
    +            .build();
    +
    +    public static final PropertyDescriptor DB_NAME = new PropertyDescriptor.Builder()
    +            .name("hive-stream-database-name")
    +            .displayName("Database Name")
    +            .description("The name of the database in which to put the data.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
    +            .name("hive-stream-table-name")
    +            .displayName("Table Name")
    +            .description("The name of the database table in which to put the data.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PARTITION_COLUMNS = new PropertyDescriptor.Builder()
    +            .name("hive-stream-partition-cols")
    +            .displayName("Partition Columns")
    +            .description("A comma-delimited list of column names on which the table has been partitioned. The order of values in this list must "
    +                    + "correspond exactly to the order of partition columns specified during the table creation.")
    +            .required(false)
    +            .expressionLanguageSupported(false)
    +            .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("[^,]+(,[^,]+)*"))) // comma-separated list with non-empty entries
    +            .build();
    +
    +    public static final PropertyDescriptor AUTOCREATE_PARTITIONS = new PropertyDescriptor.Builder()
    +            .name("hive-stream-autocreate-partition")
    +            .displayName("Auto-Create Partitions")
    +            .description("Flag indicating whether partitions should be automatically created")
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_OPEN_CONNECTIONS = new PropertyDescriptor.Builder()
    +            .name("hive-stream-max-open-connections")
    +            .displayName("Max Open Connections")
    +            .description("The maximum number of open connections that can be allocated from this pool at the same time, "
    +                    + "or negative for no limit.")
    +            .defaultValue("8")
    +            .required(true)
    +            .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +            .sensitive(false)
    +            .build();
    +
    +    public static final PropertyDescriptor HEARTBEAT_INTERVAL = new PropertyDescriptor.Builder()
    +            .name("hive-stream-heartbeat-interval")
    +            .displayName("Heartbeat Interval")
    +            .description("Indicates that a heartbeat should be sent when the specified number of seconds has elapsed. "
    +                    + "A value of 0 indicates that no heartbeat should be sent.")
    +            .defaultValue("60")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .sensitive(false)
    +            .build();
    +
    +    public static final PropertyDescriptor TXNS_PER_BATCH = new PropertyDescriptor.Builder()
    +            .name("hive-stream-transactions-per-batch")
    +            .displayName("Transactions per Batch")
    +            .description("A hint to Hive Streaming indicating how many transactions the processor task will need. This value must be greater than 1.")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .addValidator(GREATER_THAN_ONE_VALIDATOR)
    +            .defaultValue("100")
    +            .build();
    +
    +    // Relationships
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("A FlowFile is routed to this relationship after the database is successfully updated")
    +            .build();
    +    public static final Relationship REL_RETRY = new Relationship.Builder()
    +            .name("retry")
    +            .description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed")
    +            .build();
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("A FlowFile is routed to this relationship if the database cannot be updated and retrying the operation will also fail.")
    +            .build();
    +
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +    private final static Set<Relationship> relationships;
    +
    +    private static final long TICKET_RENEWAL_PERIOD = 60000;
    +
    +    protected KerberosProperties kerberosProperties;
    +
    +    protected volatile HiveConfigurator hiveConfigurator = new HiveConfigurator();
    +    protected volatile UserGroupInformation ugi;
    +
    +    protected final AtomicBoolean isInitialized = new AtomicBoolean(false);
    +
    +    protected HiveOptions options;
    +    protected ExecutorService callTimeoutPool;
    +    protected transient Timer heartBeatTimer;
    +    protected AtomicBoolean sendHeartBeat = new AtomicBoolean(false);
    +    protected Map<HiveEndPoint, HiveWriter> allWriters;
    +
    +
    +    /*
    +     * Will ensure that the list of property descriptors is build only once.
    +     * Will also create a Set of relationships
    +     */
    +    static {
    +        propertyDescriptors = new ArrayList<>();
    +        propertyDescriptors.add(METASTORE_URI);
    +        propertyDescriptors.add(HIVE_CONFIGURATION_RESOURCES);
    +        propertyDescriptors.add(DB_NAME);
    +        propertyDescriptors.add(TABLE_NAME);
    +        propertyDescriptors.add(PARTITION_COLUMNS);
    +        propertyDescriptors.add(AUTOCREATE_PARTITIONS);
    +        propertyDescriptors.add(MAX_OPEN_CONNECTIONS);
    +        propertyDescriptors.add(HEARTBEAT_INTERVAL);
    +        propertyDescriptors.add(TXNS_PER_BATCH);
    +
    +        Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_SUCCESS);
    +        _relationships.add(REL_FAILURE);
    +        _relationships.add(REL_RETRY);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    @Override
    +    protected void init(ProcessorInitializationContext context) {
    +        kerberosProperties = getKerberosProperties();
    +        propertyDescriptors.add(kerberosProperties.getKerberosPrincipal());
    +        propertyDescriptors.add(kerberosProperties.getKerberosKeytab());
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +
    +    @OnScheduled
    +    public void setup(final ProcessContext context) {
    +        ComponentLog log = getLogger();
    +
    +        final String metastoreUri = context.getProperty(METASTORE_URI).getValue();
    +        final String dbName = context.getProperty(DB_NAME).getValue();
    +        final String tableName = context.getProperty(TABLE_NAME).getValue();
    +        final boolean autoCreatePartitions = context.getProperty(AUTOCREATE_PARTITIONS).asBoolean();
    +        final Integer maxConnections = context.getProperty(MAX_OPEN_CONNECTIONS).asInteger();
    +        final Integer heartbeatInterval = context.getProperty(HEARTBEAT_INTERVAL).asInteger();
    +        final Integer txnsPerBatch = context.getProperty(TXNS_PER_BATCH).asInteger();
    +        final String configFiles = context.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue();
    +        final Configuration hiveConfig = hiveConfigurator.getConfigurationFromFiles(configFiles);
    +
    +        // add any dynamic properties to the Hive configuration
    +        for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
    +            final PropertyDescriptor descriptor = entry.getKey();
    +            if (descriptor.isDynamic()) {
    +                hiveConfig.set(descriptor.getName(), entry.getValue());
    +            }
    +        }
    +
    +        options = new HiveOptions(metastoreUri, dbName, tableName)
    +                .withTxnsPerBatch(txnsPerBatch)
    +                .withAutoCreatePartitions(autoCreatePartitions)
    +                .withMaxOpenConnections(maxConnections)
    +                .withHeartBeatInterval(heartbeatInterval);
    +
    +        if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
    +            final String principal = context.getProperty(kerberosProperties.getKerberosPrincipal()).getValue();
    +            final String keyTab = context.getProperty(kerberosProperties.getKerberosKeytab()).getValue();
    +
    +            log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[]{principal, keyTab});
    +            try {
    +                ugi = hiveConfigurator.authenticate(hiveConfig, principal, keyTab, TICKET_RENEWAL_PERIOD, log);
    +            } catch (AuthenticationFailedException ae) {
    +                throw new ProcessException("Kerberos authentication failed for Hive Streaming", ae);
    +            }
    +            log.info("Successfully logged in as principal {} with keytab {}", new Object[]{principal, keyTab});
    +            options = options.withKerberosPrincipal(principal).withKerberosKeytab(keyTab);
    +        }
    +
    +        allWriters = new ConcurrentHashMap<>();
    +        String timeoutName = "put-hive-streaming-%d";
    +        this.callTimeoutPool = Executors.newFixedThreadPool(1,
    +                new ThreadFactoryBuilder().setNameFormat(timeoutName).build());
    +
    +        sendHeartBeat.set(true);
    +        heartBeatTimer = new Timer();
    +        setupHeartBeatTimer();
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final ComponentLog log = getLogger();
    +        try {
    +            final List<String> partitionColumnList;
    +            String partitionColumns = context.getProperty(PARTITION_COLUMNS).getValue();
    +            if (StringUtils.isEmpty(partitionColumns)) {
    +                partitionColumnList = Collections.emptyList();
    +            } else {
    +                String[] partitionCols = partitionColumns.split(",");
    +                partitionColumnList = new ArrayList<>(partitionCols.length);
    +                for (String col : partitionCols) {
    +                    partitionColumnList.add(col.trim());
    +                }
    +            }
    +
    +            // Store the original class loader, then explicitly set it to this class's classloader (for use by the Hive Metastore)
    +            ClassLoader originalClassloader = Thread.currentThread().getContextClassLoader();
    +            Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
    +
    +            int recordCount = 0;
    +            final List<HiveStreamingRecord> records = new LinkedList<>();
    +
    +            session.read(flowFile, in -> {
    +
    +                try (final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
    +
    +                    GenericRecord currRecord;
    +                    while (reader.hasNext()) {
    +                        currRecord = reader.next();
    +                        List<String> partitionValues = new ArrayList<>();
    +
    +                        for (String partition : partitionColumnList) {
    +                            Object partitionValue = currRecord.get(partition);
    +                            if (partitionValue == null) {
    +                                throw new IOException("Partition column '" + partition + "' not found in Avro record");
    +                            }
    +                            partitionValues.add(partitionValue.toString());
    +                        }
    +
    +                        List<Schema.Field> fields = currRecord.getSchema().getFields();
    +                        if (fields != null) {
    +                            JSONObject obj = new JSONObject();
    +                            for (Schema.Field field : fields) {
    +                                String fieldName = field.name();
    +                                // Skip fields that are partition columns, we extracted those values above to create an EndPoint
    +                                if (!partitionColumnList.contains(fieldName)) {
    +                                    Object value = currRecord.get(fieldName);
    +                                    try {
    +                                        obj.put(fieldName, value);
    +                                    } catch (JSONException je) {
    +                                        throw new IOException(je);
    +                                    }
    +                                }
    +                            }
    +                            records.add(new HiveStreamingRecord(partitionValues, obj));
    +                        }
    +                    }
    +                }
    +            });
    +
    +            // Write all records to Hive Streaming
    +            for (HiveStreamingRecord record : records) {
    --- End diff --
    
    I don't know much about how the HiveWriter works, but would there be any benefit to writing the records as you go in the above loop, rather than reading them all into memory and then writing them? 
    
    If we leave as is maybe we can just put a note in the processor description that mentions that the full contents of the incoming Avro will be read into memory, or if this processor/Hive wouldn't perform well with large Avro files then maybe we should mention that instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #706: NIFI-1868: Add PutHiveStreaming processor

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/706#discussion_r72672215
  
    --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java ---
    @@ -0,0 +1,657 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.hive;
    +
    +import com.google.common.util.concurrent.ThreadFactoryBuilder;
    +import org.apache.avro.Schema;
    +import org.apache.avro.file.DataFileStream;
    +import org.apache.avro.generic.GenericDatumReader;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.security.UserGroupInformation;
    +import org.apache.hive.hcatalog.streaming.ConnectionError;
    +import org.apache.hive.hcatalog.streaming.HiveEndPoint;
    +import org.apache.hive.hcatalog.streaming.SerializationError;
    +import org.apache.hive.hcatalog.streaming.StreamingException;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.hadoop.KerberosProperties;
    +import org.apache.nifi.hadoop.SecurityUtil;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.util.NiFiProperties;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.nifi.util.hive.AuthenticationFailedException;
    +import org.apache.nifi.util.hive.HiveConfigurator;
    +import org.apache.nifi.util.hive.HiveOptions;
    +import org.apache.nifi.util.hive.HiveUtils;
    +import org.apache.nifi.util.hive.HiveWriter;
    +import org.json.JSONException;
    +import org.json.JSONObject;
    +
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.Timer;
    +import java.util.TimerTask;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.regex.Pattern;
    +
    +/**
    + * This processor utilizes the Hive Streaming capability to insert data from the flow into a Hive database table.
    + */
    +@Tags({"hive", "streaming", "put", "database", "store"})
    +@CapabilityDescription("This processor uses Hive Streaming to send flow file data to an Apache Hive table. The incoming flow file is expected to be in "
    +        + "Avro format and the table must exist in Hive. Please see the Hive documentation for requirements on the Hive table (format, partitions, etc.). "
    +        + "The partition values are extracted from the Avro record based on the names of the partition columns as specified in the processor. ")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "hivestreaming.record.count", description = "The number of records from this flow file written using Hive Streaming.")
    +})
    +public class PutHiveStreaming extends AbstractProcessor {
    +
    +    // Attributes
    +    public static final String HIVE_STREAMING_RECORD_COUNT_ATTR = "hivestreaming.record.count";
    +
    +    // Validators
    +    private static final Validator GREATER_THAN_ONE_VALIDATOR = (subject, value, context) -> {
    +        if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
    +            return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
    +        }
    +
    +        String reason = null;
    +        try {
    +            final int intVal = Integer.parseInt(value);
    +
    +            if (intVal < 2) {
    +                reason = "value is less than 2";
    +            }
    +        } catch (final NumberFormatException e) {
    +            reason = "value is not a valid integer";
    +        }
    +
    +        return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build();
    +    };
    +
    +    // Properties
    +    public static final PropertyDescriptor METASTORE_URI = new PropertyDescriptor.Builder()
    +            .name("hive-stream-metastore-uri")
    +            .displayName("Hive Metastore URI")
    +            .description("The URI location for the Hive Metastore. Note that this is not the location of the Hive Server. The default port for the "
    +                    + "Hive metastore is 9043.")
    +            .required(true)
    +            .addValidator(StandardValidators.URI_VALIDATOR)
    +            .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))) // no start with / or end with /
    +            .build();
    +
    +    public static final PropertyDescriptor HIVE_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder()
    +            .name("hive-config-resources")
    +            .displayName("Hive Configuration Resources")
    +            .description("A file or comma separated list of files which contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop "
    +                    + "will search the classpath for a 'hive-site.xml' file or will revert to a default configuration. Note that to enable authentication "
    +                    + "with Kerberos e.g., the appropriate properties must be set in the configuration files. Please see the Hive documentation for more details.")
    +            .required(false)
    +            .addValidator(HiveUtils.createMultipleFilesExistValidator())
    +            .build();
    +
    +    public static final PropertyDescriptor DB_NAME = new PropertyDescriptor.Builder()
    +            .name("hive-stream-database-name")
    +            .displayName("Database Name")
    +            .description("The name of the database in which to put the data.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
    +            .name("hive-stream-table-name")
    +            .displayName("Table Name")
    +            .description("The name of the database table in which to put the data.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PARTITION_COLUMNS = new PropertyDescriptor.Builder()
    +            .name("hive-stream-partition-cols")
    +            .displayName("Partition Columns")
    +            .description("A comma-delimited list of column names on which the table has been partitioned. The order of values in this list must "
    +                    + "correspond exactly to the order of partition columns specified during the table creation.")
    +            .required(false)
    +            .expressionLanguageSupported(false)
    +            .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("[^,]+(,[^,]+)*"))) // comma-separated list with non-empty entries
    +            .build();
    +
    +    public static final PropertyDescriptor AUTOCREATE_PARTITIONS = new PropertyDescriptor.Builder()
    +            .name("hive-stream-autocreate-partition")
    +            .displayName("Auto-Create Partitions")
    +            .description("Flag indicating whether partitions should be automatically created")
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_OPEN_CONNECTIONS = new PropertyDescriptor.Builder()
    +            .name("hive-stream-max-open-connections")
    +            .displayName("Max Open Connections")
    +            .description("The maximum number of open connections that can be allocated from this pool at the same time, "
    +                    + "or negative for no limit.")
    +            .defaultValue("8")
    +            .required(true)
    +            .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +            .sensitive(false)
    +            .build();
    +
    +    public static final PropertyDescriptor HEARTBEAT_INTERVAL = new PropertyDescriptor.Builder()
    +            .name("hive-stream-heartbeat-interval")
    +            .displayName("Heartbeat Interval")
    +            .description("Indicates that a heartbeat should be sent when the specified number of seconds has elapsed. "
    +                    + "A value of 0 indicates that no heartbeat should be sent.")
    +            .defaultValue("60")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .sensitive(false)
    +            .build();
    +
    +    public static final PropertyDescriptor TXNS_PER_BATCH = new PropertyDescriptor.Builder()
    +            .name("hive-stream-transactions-per-batch")
    +            .displayName("Transactions per Batch")
    +            .description("A hint to Hive Streaming indicating how many transactions the processor task will need. This value must be greater than 1.")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .addValidator(GREATER_THAN_ONE_VALIDATOR)
    +            .defaultValue("100")
    +            .build();
    +
    +    // Relationships
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("A FlowFile is routed to this relationship after the database is successfully updated")
    +            .build();
    +    public static final Relationship REL_RETRY = new Relationship.Builder()
    +            .name("retry")
    +            .description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed")
    +            .build();
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("A FlowFile is routed to this relationship if the database cannot be updated and retrying the operation will also fail.")
    +            .build();
    +
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +    private final static Set<Relationship> relationships;
    +
    +    private static final long TICKET_RENEWAL_PERIOD = 60000;
    +
    +    protected KerberosProperties kerberosProperties;
    +
    +    protected volatile HiveConfigurator hiveConfigurator = new HiveConfigurator();
    +    protected volatile UserGroupInformation ugi;
    +
    +    protected final AtomicBoolean isInitialized = new AtomicBoolean(false);
    +
    +    protected HiveOptions options;
    +    protected ExecutorService callTimeoutPool;
    +    protected transient Timer heartBeatTimer;
    +    protected AtomicBoolean sendHeartBeat = new AtomicBoolean(false);
    +    protected Map<HiveEndPoint, HiveWriter> allWriters;
    +
    +
    +    /*
    +     * Will ensure that the list of property descriptors is build only once.
    +     * Will also create a Set of relationships
    +     */
    +    static {
    +        propertyDescriptors = new ArrayList<>();
    +        propertyDescriptors.add(METASTORE_URI);
    +        propertyDescriptors.add(HIVE_CONFIGURATION_RESOURCES);
    +        propertyDescriptors.add(DB_NAME);
    +        propertyDescriptors.add(TABLE_NAME);
    +        propertyDescriptors.add(PARTITION_COLUMNS);
    +        propertyDescriptors.add(AUTOCREATE_PARTITIONS);
    +        propertyDescriptors.add(MAX_OPEN_CONNECTIONS);
    +        propertyDescriptors.add(HEARTBEAT_INTERVAL);
    +        propertyDescriptors.add(TXNS_PER_BATCH);
    +
    +        Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_SUCCESS);
    +        _relationships.add(REL_FAILURE);
    +        _relationships.add(REL_RETRY);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    @Override
    +    protected void init(ProcessorInitializationContext context) {
    +        kerberosProperties = getKerberosProperties();
    +        propertyDescriptors.add(kerberosProperties.getKerberosPrincipal());
    +        propertyDescriptors.add(kerberosProperties.getKerberosKeytab());
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +
    +    @OnScheduled
    +    public void setup(final ProcessContext context) {
    +        ComponentLog log = getLogger();
    +
    +        final String metastoreUri = context.getProperty(METASTORE_URI).getValue();
    +        final String dbName = context.getProperty(DB_NAME).getValue();
    +        final String tableName = context.getProperty(TABLE_NAME).getValue();
    +        final boolean autoCreatePartitions = context.getProperty(AUTOCREATE_PARTITIONS).asBoolean();
    +        final Integer maxConnections = context.getProperty(MAX_OPEN_CONNECTIONS).asInteger();
    +        final Integer heartbeatInterval = context.getProperty(HEARTBEAT_INTERVAL).asInteger();
    +        final Integer txnsPerBatch = context.getProperty(TXNS_PER_BATCH).asInteger();
    +        final String configFiles = context.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue();
    +        final Configuration hiveConfig = hiveConfigurator.getConfigurationFromFiles(configFiles);
    +
    +        // add any dynamic properties to the Hive configuration
    +        for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
    +            final PropertyDescriptor descriptor = entry.getKey();
    +            if (descriptor.isDynamic()) {
    +                hiveConfig.set(descriptor.getName(), entry.getValue());
    +            }
    +        }
    +
    +        options = new HiveOptions(metastoreUri, dbName, tableName)
    +                .withTxnsPerBatch(txnsPerBatch)
    +                .withAutoCreatePartitions(autoCreatePartitions)
    +                .withMaxOpenConnections(maxConnections)
    +                .withHeartBeatInterval(heartbeatInterval);
    +
    +        if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
    +            final String principal = context.getProperty(kerberosProperties.getKerberosPrincipal()).getValue();
    +            final String keyTab = context.getProperty(kerberosProperties.getKerberosKeytab()).getValue();
    +
    +            log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[]{principal, keyTab});
    +            try {
    +                ugi = hiveConfigurator.authenticate(hiveConfig, principal, keyTab, TICKET_RENEWAL_PERIOD, log);
    +            } catch (AuthenticationFailedException ae) {
    +                throw new ProcessException("Kerberos authentication failed for Hive Streaming", ae);
    +            }
    +            log.info("Successfully logged in as principal {} with keytab {}", new Object[]{principal, keyTab});
    +            options = options.withKerberosPrincipal(principal).withKerberosKeytab(keyTab);
    +        }
    +
    +        allWriters = new ConcurrentHashMap<>();
    +        String timeoutName = "put-hive-streaming-%d";
    +        this.callTimeoutPool = Executors.newFixedThreadPool(1,
    +                new ThreadFactoryBuilder().setNameFormat(timeoutName).build());
    +
    +        sendHeartBeat.set(true);
    +        heartBeatTimer = new Timer();
    +        setupHeartBeatTimer();
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final ComponentLog log = getLogger();
    +        try {
    +            final List<String> partitionColumnList;
    +            String partitionColumns = context.getProperty(PARTITION_COLUMNS).getValue();
    +            if (StringUtils.isEmpty(partitionColumns)) {
    +                partitionColumnList = Collections.emptyList();
    +            } else {
    +                String[] partitionCols = partitionColumns.split(",");
    +                partitionColumnList = new ArrayList<>(partitionCols.length);
    +                for (String col : partitionCols) {
    +                    partitionColumnList.add(col.trim());
    +                }
    +            }
    +
    +            // Store the original class loader, then explicitly set it to this class's classloader (for use by the Hive Metastore)
    +            ClassLoader originalClassloader = Thread.currentThread().getContextClassLoader();
    +            Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
    +
    +            int recordCount = 0;
    +            final List<HiveStreamingRecord> records = new LinkedList<>();
    +
    +            session.read(flowFile, in -> {
    +
    +                try (final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
    +
    +                    GenericRecord currRecord;
    +                    while (reader.hasNext()) {
    +                        currRecord = reader.next();
    --- End diff --
    
    Minor point, I think theres a call to read.next() that takes in a record instance to reuse, could save some object creating for a large number of records


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #706: NIFI-1868: Add PutHiveStreaming processor

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on the issue:

    https://github.com/apache/nifi/pull/706
  
    Yes that's a good idea, will update, thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #706: NIFI-1868: Add PutHiveStreaming processor

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on the issue:

    https://github.com/apache/nifi/pull/706
  
    Latest update is looking good... one thing I noticed, if you send in an Avro file that does not have the partition columns of the table, it throws an IOException around line 435 when trying to extract the partition fields from the Avro schema, but then it gets wrapped in a ProcessException and thrown out of onTrigger so the flow file sits in the incoming queue but can never be processed. 
    
    Could we look for ProcessException with a cause of IOException and route to failure (similar to the connection error handling)? or maybe create a specific exception type to look for since there could be other IOExceptions that we want to bounce out of onTrigger?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #706: NIFI-1868: Add PutHiveStreaming processor

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on the issue:

    https://github.com/apache/nifi/pull/706
  
    Yeah I think that's a bug in this version of Hive: https://issues.apache.org/jira/browse/HIVE-13725
    
    For now I'd like to make the processor TriggerSerially (unless there are any objections), then when we upgrade the version of Hive we can revisit and hopefully remove this limitation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---