You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2018/06/13 18:33:25 UTC

[5/6] nifi git commit: NIFI-4963: Added Hive3 bundle - Incorporated review comments - Added more defensive code for PutHive3Streaming error handling

http://git-wip-us.apache.org/repos/asf/nifi/blob/da99f873/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/dbcp/hive/Hive3ConnectionPool.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/dbcp/hive/Hive3ConnectionPool.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/dbcp/hive/Hive3ConnectionPool.java
new file mode 100644
index 0000000..b0662b8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/dbcp/hive/Hive3ConnectionPool.java
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.dbcp.hive;
+
+import java.io.File;
+
+import org.apache.commons.dbcp.BasicDataSource;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.jdbc.HiveDriver;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.hadoop.KerberosProperties;
+import org.apache.nifi.hadoop.SecurityUtil;
+import org.apache.nifi.kerberos.KerberosCredentialsService;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.hive.AuthenticationFailedException;
+import org.apache.nifi.util.hive.HiveConfigurator;
+import org.apache.nifi.util.hive.HiveUtils;
+import org.apache.nifi.util.hive.ValidationResources;
+
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.security.PrivilegedExceptionAction;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+
+/**
+ * Implementation for Database Connection Pooling Service used for Apache Hive
+ * connections. Apache DBCP is used for connection pooling functionality.
+ */
+@RequiresInstanceClassLoading
+@Tags({"hive", "dbcp", "jdbc", "database", "connection", "pooling", "store"})
+@CapabilityDescription("Provides Database Connection Pooling Service for Apache Hive 3.x. Connections can be asked from pool and returned after usage.")
+public class Hive3ConnectionPool extends AbstractControllerService implements Hive3DBCPService {
+    private static final String ALLOW_EXPLICIT_KEYTAB = "NIFI_ALLOW_EXPLICIT_KEYTAB";
+
+    static final PropertyDescriptor DATABASE_URL = new PropertyDescriptor.Builder()
+            .name("hive-db-connect-url")
+            .displayName("Database Connection URL")
+            .description("A database connection URL used to connect to a database. May contain database system name, host, port, database name and some parameters."
+                    + " The exact syntax of a database connection URL is specified by the Hive documentation. For example, the server principal is often included "
+                    + "as a connection parameter when connecting to a secure Hive server.")
+            .defaultValue(null)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor HIVE_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder()
+            .name("hive-config-resources")
+            .displayName("Hive Configuration Resources")
+            .description("A file or comma separated list of files which contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop "
+                    + "will search the classpath for a 'hive-site.xml' file or will revert to a default configuration. Note that to enable authentication "
+                    + "with Kerberos e.g., the appropriate properties must be set in the configuration files. Please see the Hive documentation for more details.")
+            .required(false)
+            .addValidator(HiveUtils.createMultipleFilesExistValidator())
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor DB_USER = new PropertyDescriptor.Builder()
+            .name("hive-db-user")
+            .displayName("Database User")
+            .description("Database user name")
+            .defaultValue(null)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor DB_PASSWORD = new PropertyDescriptor.Builder()
+            .name("hive-db-password")
+            .displayName("Password")
+            .description("The password for the database user")
+            .defaultValue(null)
+            .required(false)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor MAX_WAIT_TIME = new PropertyDescriptor.Builder()
+            .name("hive-max-wait-time")
+            .displayName("Max Wait Time")
+            .description("The maximum amount of time that the pool will wait (when there are no available connections) "
+                    + " for a connection to be returned before failing, or -1 to wait indefinitely. ")
+            .defaultValue("500 millis")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor MAX_TOTAL_CONNECTIONS = new PropertyDescriptor.Builder()
+            .name("hive-max-total-connections")
+            .displayName("Max Total Connections")
+            .description("The maximum number of active connections that can be allocated from this pool at the same time, "
+                    + "or negative for no limit.")
+            .defaultValue("8")
+            .required(true)
+            .addValidator(StandardValidators.INTEGER_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor VALIDATION_QUERY = new PropertyDescriptor.Builder()
+            .name("Validation-query")
+            .displayName("Validation query")
+            .description("Validation query used to validate connections before returning them. "
+                    + "When a borrowed connection is invalid, it gets dropped and a new valid connection will be returned. "
+                    + "NOTE: Using validation may have a performance penalty.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    private static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
+            .name("kerberos-credentials-service")
+            .displayName("Kerberos Credentials Service")
+            .description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos")
+            .identifiesControllerService(KerberosCredentialsService.class)
+            .required(false)
+            .build();
+
+
+    private List<PropertyDescriptor> properties;
+
+    private String connectionUrl = "unknown";
+
+    // Holder of cached Configuration information so validation does not reload the same config over and over
+    private final AtomicReference<ValidationResources> validationResourceHolder = new AtomicReference<>();
+
+    private volatile BasicDataSource dataSource;
+
+    private volatile HiveConfigurator hiveConfigurator = new HiveConfigurator();
+    private volatile UserGroupInformation ugi;
+    private volatile File kerberosConfigFile = null;
+    private volatile KerberosProperties kerberosProperties;
+
+    @Override
+    protected void init(final ControllerServiceInitializationContext context) {
+        List<PropertyDescriptor> props = new ArrayList<>();
+        props.add(DATABASE_URL);
+        props.add(HIVE_CONFIGURATION_RESOURCES);
+        props.add(DB_USER);
+        props.add(DB_PASSWORD);
+        props.add(MAX_WAIT_TIME);
+        props.add(MAX_TOTAL_CONNECTIONS);
+        props.add(VALIDATION_QUERY);
+        props.add(KERBEROS_CREDENTIALS_SERVICE);
+
+        kerberosConfigFile = context.getKerberosConfigurationFile();
+        kerberosProperties = new KerberosProperties(kerberosConfigFile);
+        props.add(kerberosProperties.getKerberosPrincipal());
+        props.add(kerberosProperties.getKerberosKeytab());
+        properties = props;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        boolean confFileProvided = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).isSet();
+
+        final List<ValidationResult> problems = new ArrayList<>();
+
+        if (confFileProvided) {
+            final String explicitPrincipal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
+            final String explicitKeytab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
+            final KerberosCredentialsService credentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+
+            final String resolvedPrincipal;
+            final String resolvedKeytab;
+            if (credentialsService == null) {
+                resolvedPrincipal = explicitPrincipal;
+                resolvedKeytab = explicitKeytab;
+            } else {
+                resolvedPrincipal = credentialsService.getPrincipal();
+                resolvedKeytab = credentialsService.getKeytab();
+            }
+
+
+            final String configFiles = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
+            problems.addAll(hiveConfigurator.validate(configFiles, resolvedPrincipal, resolvedKeytab, validationResourceHolder, getLogger()));
+
+            if (credentialsService != null && (explicitPrincipal != null || explicitKeytab != null)) {
+                problems.add(new ValidationResult.Builder()
+                        .subject("Kerberos Credentials")
+                        .valid(false)
+                        .explanation("Cannot specify both a Kerberos Credentials Service and a principal/keytab")
+                        .build());
+            }
+
+            final String allowExplicitKeytabVariable = System.getenv(ALLOW_EXPLICIT_KEYTAB);
+            if ("false".equalsIgnoreCase(allowExplicitKeytabVariable) && (explicitPrincipal != null || explicitKeytab != null)) {
+                problems.add(new ValidationResult.Builder()
+                        .subject("Kerberos Credentials")
+                        .valid(false)
+                        .explanation("The '" + ALLOW_EXPLICIT_KEYTAB + "' system environment variable is configured to forbid explicitly configuring principal/keytab in processors. "
+                                + "The Kerberos Credentials Service should be used instead of setting the Kerberos Keytab or Kerberos Principal property.")
+                        .build());
+            }
+        }
+
+        return problems;
+    }
+
+    /**
+     * Configures connection pool by creating an instance of the
+     * {@link BasicDataSource} based on configuration provided with
+     * {@link ConfigurationContext}.
+     * <p>
+     * This operation makes no guarantees that the actual connection could be
+     * made since the underlying system may still go off-line during normal
+     * operation of the connection pool.
+     * <p/>
+     * As of Apache NiFi 1.5.0, due to changes made to
+     * {@link SecurityUtil#loginKerberos(Configuration, String, String)}, which is used by this class invoking
+     * {@link HiveConfigurator#authenticate(Configuration, String, String)}
+     * to authenticate a principal with Kerberos, Hive controller services no longer
+     * attempt relogins explicitly.  For more information, please read the documentation for
+     * {@link SecurityUtil#loginKerberos(Configuration, String, String)}.
+     * <p/>
+     * In previous versions of NiFi, a {@link org.apache.nifi.hadoop.KerberosTicketRenewer} was started by
+     * {@link HiveConfigurator#authenticate(Configuration, String, String, long)} when the Hive
+     * controller service was enabled.  The use of a separate thread to explicitly relogin could cause race conditions
+     * with the implicit relogin attempts made by hadoop/Hive code on a thread that references the same
+     * {@link UserGroupInformation} instance.  One of these threads could leave the
+     * {@link javax.security.auth.Subject} in {@link UserGroupInformation} to be cleared or in an unexpected state
+     * while the other thread is attempting to use the {@link javax.security.auth.Subject}, resulting in failed
+     * authentication attempts that would leave the Hive controller service in an unrecoverable state.
+     *
+     * @see SecurityUtil#loginKerberos(Configuration, String, String)
+     * @see HiveConfigurator#authenticate(Configuration, String, String)
+     * @see HiveConfigurator#authenticate(Configuration, String, String, long)
+     * @param context the configuration context
+     * @throws InitializationException if unable to create a database connection
+     */
+    @OnEnabled
+    public void onConfigured(final ConfigurationContext context) throws InitializationException {
+
+        ComponentLog log = getLogger();
+
+        final String configFiles = context.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
+        final Configuration hiveConfig = hiveConfigurator.getConfigurationFromFiles(configFiles);
+        final String validationQuery = context.getProperty(VALIDATION_QUERY).evaluateAttributeExpressions().getValue();
+
+        // add any dynamic properties to the Hive configuration
+        for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
+            final PropertyDescriptor descriptor = entry.getKey();
+            if (descriptor.isDynamic()) {
+                hiveConfig.set(descriptor.getName(), context.getProperty(descriptor).evaluateAttributeExpressions().getValue());
+            }
+        }
+
+        final String drv = HiveDriver.class.getName();
+        if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
+            final String explicitPrincipal = context.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
+            final String explicitKeytab = context.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
+            final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+
+            final String resolvedPrincipal;
+            final String resolvedKeytab;
+            if (credentialsService == null) {
+                resolvedPrincipal = explicitPrincipal;
+                resolvedKeytab = explicitKeytab;
+            } else {
+                resolvedPrincipal = credentialsService.getPrincipal();
+                resolvedKeytab = credentialsService.getKeytab();
+            }
+
+            log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab});
+            try {
+                ugi = hiveConfigurator.authenticate(hiveConfig, resolvedPrincipal, resolvedKeytab);
+            } catch (AuthenticationFailedException ae) {
+                log.error(ae.getMessage(), ae);
+            }
+
+            getLogger().info("Successfully logged in as principal {} with keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab});
+        }
+
+        final String user = context.getProperty(DB_USER).evaluateAttributeExpressions().getValue();
+        final String passw = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue();
+        final Long maxWaitMillis = context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
+        final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).evaluateAttributeExpressions().asInteger();
+
+        dataSource = new BasicDataSource();
+        dataSource.setDriverClassName(drv);
+
+        connectionUrl = context.getProperty(DATABASE_URL).evaluateAttributeExpressions().getValue();
+
+        dataSource.setMaxWait(maxWaitMillis);
+        dataSource.setMaxActive(maxTotal);
+
+        if (validationQuery != null && !validationQuery.isEmpty()) {
+            dataSource.setValidationQuery(validationQuery);
+            dataSource.setTestOnBorrow(true);
+        }
+
+        dataSource.setUrl(connectionUrl);
+        dataSource.setUsername(user);
+        dataSource.setPassword(passw);
+    }
+
+    /**
+     * Shutdown pool, close all open connections.
+     */
+    @OnDisabled
+    public void shutdown() {
+        try {
+            dataSource.close();
+        } catch (final SQLException e) {
+            throw new ProcessException(e);
+        }
+    }
+
+    @Override
+    public Connection getConnection() throws ProcessException {
+        try {
+            if (ugi != null) {
+                try {
+                    return ugi.doAs((PrivilegedExceptionAction<Connection>) () -> dataSource.getConnection());
+                } catch (UndeclaredThrowableException e) {
+                    Throwable cause = e.getCause();
+                    if (cause instanceof SQLException) {
+                        throw (SQLException) cause;
+                    } else {
+                        throw e;
+                    }
+                }
+            } else {
+                getLogger().info("Simple Authentication");
+                return dataSource.getConnection();
+            }
+        } catch (SQLException | IOException | InterruptedException e) {
+            getLogger().error("Error getting Hive connection", e);
+            throw new ProcessException(e);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "Hive3ConnectionPool[id=" + getIdentifier() + "]";
+    }
+
+    @Override
+    public String getConnectionURL() {
+        return connectionUrl;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/da99f873/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/AbstractHive3QLProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/AbstractHive3QLProcessor.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/AbstractHive3QLProcessor.java
new file mode 100644
index 0000000..4fcce19
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/AbstractHive3QLProcessor.java
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hive;
+
+import org.antlr.runtime.tree.CommonTree;
+import org.apache.hadoop.hive.ql.parse.ASTNode;
+import org.apache.hadoop.hive.ql.parse.ParseDriver;
+import org.apache.hadoop.hive.ql.parse.ParseException;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dbcp.hive.Hive3DBCPService;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.StreamUtils;
+
+import java.math.BigDecimal;
+import java.nio.charset.Charset;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.SQLDataException;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * An abstract base class for HiveQL processors to share common data, methods, etc.
+ */
+public abstract class AbstractHive3QLProcessor extends AbstractSessionFactoryProcessor {
+
+    protected static final Pattern HIVEQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("hiveql\\.args\\.(\\d+)\\.type");
+    protected static final Pattern NUMBER_PATTERN = Pattern.compile("-?\\d+");
+    static String ATTR_INPUT_TABLES = "query.input.tables";
+    static String ATTR_OUTPUT_TABLES = "query.output.tables";
+
+
+    public static final PropertyDescriptor HIVE_DBCP_SERVICE = new PropertyDescriptor.Builder()
+            .name("hive3-dbcp-service")
+            .displayName("Hive Database Connection Pooling Service")
+            .description("The Hive Controller Service that is used to obtain connection(s) to the Hive database")
+            .required(true)
+            .identifiesControllerService(Hive3DBCPService.class)
+            .build();
+
+    public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
+            .name("hive3-charset")
+            .displayName("Character Set")
+            .description("Specifies the character set of the record data.")
+            .required(true)
+            .defaultValue("UTF-8")
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor QUERY_TIMEOUT = new PropertyDescriptor.Builder()
+            .name("hive3-query-timeout")
+            .displayName("Query timeout")
+            .description("Sets the number of seconds the driver will wait for a query to execute. "
+                    + "A value of 0 means no timeout. NOTE: Non-zero values may not be supported by the driver.")
+            .defaultValue("0")
+            .required(true)
+            .addValidator(StandardValidators.INTEGER_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    /**
+     * Determines the HiveQL statement that should be executed for the given FlowFile
+     *
+     * @param session  the session that can be used to access the given FlowFile
+     * @param flowFile the FlowFile whose HiveQL statement should be executed
+     * @return the HiveQL that is associated with the given FlowFile
+     */
+    protected String getHiveQL(final ProcessSession session, final FlowFile flowFile, final Charset charset) {
+        // Read the HiveQL from the FlowFile's content
+        final byte[] buffer = new byte[(int) flowFile.getSize()];
+        session.read(flowFile, in -> StreamUtils.fillBuffer(in, buffer));
+
+        // Create the PreparedStatement to use for this FlowFile.
+        return new String(buffer, charset);
+    }
+
+    private class ParameterHolder {
+        String attributeName;
+        int jdbcType;
+        String value;
+    }
+
+    /**
+     * Sets all of the appropriate parameters on the given PreparedStatement, based on the given FlowFile attributes.
+     *
+     * @param stmt       the statement to set the parameters on
+     * @param attributes the attributes from which to derive parameter indices, values, and types
+     * @throws SQLException if the PreparedStatement throws a SQLException when the appropriate setter is called
+     */
+    protected int setParameters(int base, final PreparedStatement stmt, int paramCount, final Map<String, String> attributes) throws SQLException {
+
+        Map<Integer, ParameterHolder> parmMap = new TreeMap<Integer, ParameterHolder>();
+
+        for (final Map.Entry<String, String> entry : attributes.entrySet()) {
+                final String key = entry.getKey();
+                final Matcher matcher = HIVEQL_TYPE_ATTRIBUTE_PATTERN.matcher(key);
+                if (matcher.matches()) {
+                    final int parameterIndex = Integer.parseInt(matcher.group(1));
+                    if (parameterIndex >= base && parameterIndex < base + paramCount) {
+                        final boolean isNumeric = NUMBER_PATTERN.matcher(entry.getValue()).matches();
+                        if (!isNumeric) {
+                            throw new SQLDataException("Value of the " + key + " attribute is '" + entry.getValue() + "', which is not a valid JDBC numeral jdbcType");
+                        }
+
+                        final String valueAttrName = "hiveql.args." + parameterIndex + ".value";
+
+                        ParameterHolder ph = new ParameterHolder();
+                        int realIndexLoc = parameterIndex - base +1;
+
+                        ph.jdbcType = Integer.parseInt(entry.getValue());
+                        ph.value = attributes.get(valueAttrName);
+                        ph.attributeName = valueAttrName;
+
+                        parmMap.put(realIndexLoc, ph);
+
+                    }
+                }
+        }
+
+
+        // Now that's we've retrieved the correct number of parameters and it's sorted, let's set them.
+        for (final Map.Entry<Integer, ParameterHolder> entry : parmMap.entrySet()) {
+            final Integer index = entry.getKey();
+            final ParameterHolder ph = entry.getValue();
+
+            try {
+                setParameter(stmt, ph.attributeName, index, ph.value, ph.jdbcType);
+            } catch (final NumberFormatException nfe) {
+                throw new SQLDataException("The value of the " + ph.attributeName + " is '" + ph.value + "', which cannot be converted into the necessary data jdbcType", nfe);
+            }
+        }
+        return base + paramCount;
+    }
+
+    /**
+     * Determines how to map the given value to the appropriate JDBC data jdbcType and sets the parameter on the
+     * provided PreparedStatement
+     *
+     * @param stmt           the PreparedStatement to set the parameter on
+     * @param attrName       the name of the attribute that the parameter is coming from - for logging purposes
+     * @param parameterIndex the index of the HiveQL parameter to set
+     * @param parameterValue the value of the HiveQL parameter to set
+     * @param jdbcType       the JDBC Type of the HiveQL parameter to set
+     * @throws SQLException if the PreparedStatement throws a SQLException when calling the appropriate setter
+     */
+    protected void setParameter(final PreparedStatement stmt, final String attrName, final int parameterIndex, final String parameterValue, final int jdbcType) throws SQLException {
+        if (parameterValue == null) {
+            stmt.setNull(parameterIndex, jdbcType);
+        } else {
+            try {
+                switch (jdbcType) {
+                    case Types.BIT:
+                    case Types.BOOLEAN:
+                        stmt.setBoolean(parameterIndex, Boolean.parseBoolean(parameterValue));
+                        break;
+                    case Types.TINYINT:
+                        stmt.setByte(parameterIndex, Byte.parseByte(parameterValue));
+                        break;
+                    case Types.SMALLINT:
+                        stmt.setShort(parameterIndex, Short.parseShort(parameterValue));
+                        break;
+                    case Types.INTEGER:
+                        stmt.setInt(parameterIndex, Integer.parseInt(parameterValue));
+                        break;
+                    case Types.BIGINT:
+                        stmt.setLong(parameterIndex, Long.parseLong(parameterValue));
+                        break;
+                    case Types.REAL:
+                        stmt.setFloat(parameterIndex, Float.parseFloat(parameterValue));
+                        break;
+                    case Types.FLOAT:
+                    case Types.DOUBLE:
+                        stmt.setDouble(parameterIndex, Double.parseDouble(parameterValue));
+                        break;
+                    case Types.DECIMAL:
+                    case Types.NUMERIC:
+                        stmt.setBigDecimal(parameterIndex, new BigDecimal(parameterValue));
+                        break;
+                    case Types.DATE:
+                        stmt.setDate(parameterIndex, new Date(Long.parseLong(parameterValue)));
+                        break;
+                    case Types.TIME:
+                        stmt.setTime(parameterIndex, new Time(Long.parseLong(parameterValue)));
+                        break;
+                    case Types.TIMESTAMP:
+                        stmt.setTimestamp(parameterIndex, new Timestamp(Long.parseLong(parameterValue)));
+                        break;
+                    case Types.CHAR:
+                    case Types.VARCHAR:
+                    case Types.LONGNVARCHAR:
+                    case Types.LONGVARCHAR:
+                        stmt.setString(parameterIndex, parameterValue);
+                        break;
+                    default:
+                        stmt.setObject(parameterIndex, parameterValue, jdbcType);
+                        break;
+                }
+            } catch (SQLException e) {
+                // Log which attribute/parameter had an error, then rethrow to be handled at the top level
+                getLogger().error("Error setting parameter {} to value from {} ({})", new Object[]{parameterIndex, attrName, parameterValue}, e);
+                throw e;
+            }
+        }
+    }
+
+    protected static class TableName {
+        private final String database;
+        private final String table;
+        private final boolean input;
+
+        TableName(String database, String table, boolean input) {
+            this.database = database;
+            this.table = table;
+            this.input = input;
+        }
+
+        public String getDatabase() {
+            return database;
+        }
+
+        public String getTable() {
+            return table;
+        }
+
+        public boolean isInput() {
+            return input;
+        }
+
+        @Override
+        public String toString() {
+            return database == null || database.isEmpty() ? table : database + '.' + table;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            TableName tableName = (TableName) o;
+
+            if (input != tableName.input) return false;
+            if (database != null ? !database.equals(tableName.database) : tableName.database != null) return false;
+            return table.equals(tableName.table);
+        }
+
+        @Override
+        public int hashCode() {
+            int result = database != null ? database.hashCode() : 0;
+            result = 31 * result + table.hashCode();
+            result = 31 * result + (input ? 1 : 0);
+            return result;
+        }
+    }
+
+    protected Set<TableName> findTableNames(final String query) {
+        final ASTNode node;
+        try {
+            node = new ParseDriver().parse(normalize(query));
+        } catch (ParseException e) {
+            // If failed to parse the query, just log a message, but continue.
+            getLogger().debug("Failed to parse query: {} due to {}", new Object[]{query, e}, e);
+            return Collections.emptySet();
+        }
+        final HashSet<TableName> tableNames = new HashSet<>();
+        findTableNames(node, tableNames);
+        return tableNames;
+    }
+
+    /**
+     * Normalize query.
+     * Hive resolves prepared statement parameters before executing a query,
+     * see {@link org.apache.hive.jdbc.HivePreparedStatement#updateSql(String, HashMap)} for detail.
+     * HiveParser does not expect '?' to be in a query string, and throws an Exception if there is one.
+     * In this normalize method, '?' is replaced to 'x' to avoid that.
+     */
+    private String normalize(String query) {
+        return query.replace('?', 'x');
+    }
+
+    private void findTableNames(final Object obj, final Set<TableName> tableNames) {
+        if (!(obj instanceof CommonTree)) {
+            return;
+        }
+        final CommonTree tree = (CommonTree) obj;
+        final int childCount = tree.getChildCount();
+        if ("TOK_TABNAME".equals(tree.getText())) {
+            final TableName tableName;
+            final boolean isInput = "TOK_TABREF".equals(tree.getParent().getText());
+            switch (childCount) {
+                case 1 :
+                    tableName = new TableName(null, tree.getChild(0).getText(), isInput);
+                    break;
+                case 2:
+                    tableName = new TableName(tree.getChild(0).getText(), tree.getChild(1).getText(), isInput);
+                    break;
+                default:
+                    throw new IllegalStateException("TOK_TABNAME does not have expected children, childCount=" + childCount);
+            }
+            // If parent is TOK_TABREF, then it is an input table.
+            tableNames.add(tableName);
+            return;
+        }
+        for (int i = 0; i < childCount; i++) {
+            findTableNames(tree.getChild(i), tableNames);
+        }
+    }
+
+    protected Map<String, String> toQueryTableAttributes(Set<TableName> tableNames) {
+        final Map<String, String> attributes = new HashMap<>();
+        for (TableName tableName : tableNames) {
+            final String attributeName = tableName.isInput() ? ATTR_INPUT_TABLES : ATTR_OUTPUT_TABLES;
+            if (attributes.containsKey(attributeName)) {
+                attributes.put(attributeName, attributes.get(attributeName) + "," + tableName);
+            } else {
+                attributes.put(attributeName, tableName.toString());
+            }
+        }
+        return attributes;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/da99f873/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3QL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3QL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3QL.java
new file mode 100644
index 0000000..989d085
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3QL.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hive;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dbcp.hive.Hive3DBCPService;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.pattern.ErrorTypes;
+import org.apache.nifi.processor.util.pattern.ExceptionHandler;
+import org.apache.nifi.processor.util.pattern.ExceptionHandler.OnError;
+import org.apache.nifi.processor.util.pattern.PartialFunctions.FetchFlowFiles;
+import org.apache.nifi.processor.util.pattern.PartialFunctions.InitConnection;
+import org.apache.nifi.processor.util.pattern.Put;
+import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
+import org.apache.nifi.processor.util.pattern.RoutingResult;
+
+import java.nio.charset.Charset;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.SQLNonTransientException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+@SeeAlso(SelectHive3QL.class)
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"sql", "hive", "put", "database", "update", "insert"})
+@CapabilityDescription("Executes a HiveQL DDL/DML command (UPDATE, INSERT, e.g.). The content of an incoming FlowFile is expected to be the HiveQL command "
+        + "to execute. The HiveQL command may use the ? to escape parameters. In this case, the parameters to use must exist as FlowFile attributes "
+        + "with the naming convention hiveql.args.N.type and hiveql.args.N.value, where N is a positive integer. The hiveql.args.N.type is expected to be "
+        + "a number indicating the JDBC Type. The content of the FlowFile is expected to be in UTF-8 format.")
+@ReadsAttributes({
+        @ReadsAttribute(attribute = "hiveql.args.N.type", description = "Incoming FlowFiles are expected to be parametrized HiveQL statements. The type of each Parameter is specified as an integer "
+                + "that represents the JDBC Type of the parameter."),
+        @ReadsAttribute(attribute = "hiveql.args.N.value", description = "Incoming FlowFiles are expected to be parametrized HiveQL statements. The value of the Parameters are specified as "
+                + "hiveql.args.1.value, hiveql.args.2.value, hiveql.args.3.value, and so on. The type of the hiveql.args.1.value Parameter is specified by the hiveql.args.1.type attribute.")
+})
+@WritesAttributes({
+        @WritesAttribute(attribute = "query.input.tables", description = "This attribute is written on the flow files routed to the 'success' relationships, "
+                + "and contains input table names (if any) in comma delimited 'databaseName.tableName' format."),
+        @WritesAttribute(attribute = "query.output.tables", description = "This attribute is written on the flow files routed to the 'success' relationships, "
+                + "and contains the target table names in 'databaseName.tableName' format.")
+})
+public class PutHive3QL extends AbstractHive3QLProcessor {
+
+    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+            .name("hive-batch-size")
+            .displayName("Batch Size")
+            .description("The preferred number of FlowFiles to put to the database in a single transaction")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("100")
+            .build();
+
+    public static final PropertyDescriptor STATEMENT_DELIMITER = new PropertyDescriptor.Builder()
+            .name("statement-delimiter")
+            .displayName("Statement Delimiter")
+            .description("Statement Delimiter used to separate SQL statements in a multiple statement script")
+            .required(true)
+            .defaultValue(";")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is routed to this relationship after the database is successfully updated")
+            .build();
+    public static final Relationship REL_RETRY = new Relationship.Builder()
+            .name("retry")
+            .description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is routed to this relationship if the database cannot be updated and retrying the operation will also fail, "
+                    + "such as an invalid query or an integrity constraint violation")
+            .build();
+
+
+    private final static List<PropertyDescriptor> propertyDescriptors;
+    private final static Set<Relationship> relationships;
+
+    /*
+     * Will ensure that the list of property descriptors is built only once.
+     * Will also create a Set of relationships
+     */
+    static {
+        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
+        _propertyDescriptors.add(HIVE_DBCP_SERVICE);
+        _propertyDescriptors.add(BATCH_SIZE);
+        _propertyDescriptors.add(QUERY_TIMEOUT);
+        _propertyDescriptors.add(CHARSET);
+        _propertyDescriptors.add(STATEMENT_DELIMITER);
+        _propertyDescriptors.add(RollbackOnFailure.ROLLBACK_ON_FAILURE);
+        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
+
+        Set<Relationship> _relationships = new HashSet<>();
+        _relationships.add(REL_SUCCESS);
+        _relationships.add(REL_FAILURE);
+        _relationships.add(REL_RETRY);
+        relationships = Collections.unmodifiableSet(_relationships);
+    }
+
+    private Put<FunctionContext, Connection> process;
+    private ExceptionHandler<FunctionContext> exceptionHandler;
+
+    @OnScheduled
+    public void constructProcess() {
+        exceptionHandler = new ExceptionHandler<>();
+        exceptionHandler.mapException(e -> {
+            if (e instanceof SQLNonTransientException) {
+                return ErrorTypes.InvalidInput;
+            } else if (e instanceof SQLException) {
+                return ErrorTypes.TemporalFailure;
+            } else {
+                return ErrorTypes.UnknownFailure;
+            }
+        });
+        exceptionHandler.adjustError(RollbackOnFailure.createAdjustError(getLogger()));
+
+        process = new Put<>();
+        process.setLogger(getLogger());
+        process.initConnection(initConnection);
+        process.fetchFlowFiles(fetchFlowFiles);
+        process.putFlowFile(putFlowFile);
+        process.adjustRoute(RollbackOnFailure.createAdjustRoute(REL_FAILURE, REL_RETRY));
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    private class FunctionContext extends RollbackOnFailure {
+        final Charset charset;
+        final String statementDelimiter;
+        final long startNanos = System.nanoTime();
+
+        String connectionUrl;
+
+
+        private FunctionContext(boolean rollbackOnFailure, Charset charset, String statementDelimiter) {
+            super(rollbackOnFailure, false);
+            this.charset = charset;
+            this.statementDelimiter = statementDelimiter;
+        }
+    }
+
+    private InitConnection<FunctionContext, Connection> initConnection = (context, session, fc, ff) -> {
+        final Hive3DBCPService dbcpService = context.getProperty(HIVE_DBCP_SERVICE).asControllerService(Hive3DBCPService.class);
+        final Connection connection = dbcpService.getConnection();
+        fc.connectionUrl = dbcpService.getConnectionURL();
+        return connection;
+    };
+
+    private FetchFlowFiles<FunctionContext> fetchFlowFiles = (context, session, functionContext, result) -> {
+        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+        return session.get(batchSize);
+    };
+
+    private Put.PutFlowFile<FunctionContext, Connection> putFlowFile = (context, session, fc, conn, flowFile, result) -> {
+        final String script = getHiveQL(session, flowFile, fc.charset);
+        String regex = "(?<!\\\\)" + Pattern.quote(fc.statementDelimiter);
+
+        String[] hiveQLs = script.split(regex);
+
+        final Set<TableName> tableNames = new HashSet<>();
+        exceptionHandler.execute(fc, flowFile, input -> {
+            int loc = 1;
+            for (String hiveQLStr: hiveQLs) {
+                getLogger().debug("HiveQL: {}", new Object[]{hiveQLStr});
+
+                final String hiveQL = hiveQLStr.trim();
+                if (!StringUtils.isEmpty(hiveQL)) {
+                    final PreparedStatement stmt = conn.prepareStatement(hiveQL);
+
+                    // Get ParameterMetadata
+                    // Hive JDBC Doesn't support this yet:
+                    // ParameterMetaData pmd = stmt.getParameterMetaData();
+                    // int paramCount = pmd.getParameterCount();
+                    int paramCount = StringUtils.countMatches(hiveQL, "?");
+
+                    if (paramCount > 0) {
+                        loc = setParameters(loc, stmt, paramCount, flowFile.getAttributes());
+                    }
+
+                    // Parse hiveQL and extract input/output tables
+                    try {
+                        tableNames.addAll(findTableNames(hiveQL));
+                    } catch (Exception e) {
+                        // If failed to parse the query, just log a warning message, but continue.
+                        getLogger().warn("Failed to parse hiveQL: {} due to {}", new Object[]{hiveQL, e}, e);
+                    }
+
+                    stmt.setQueryTimeout(context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(flowFile).asInteger());
+
+                    // Execute the statement
+                    stmt.execute();
+                    fc.proceed();
+                }
+            }
+
+            // Emit a Provenance SEND event
+            final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - fc.startNanos);
+
+            final FlowFile updatedFlowFile = session.putAllAttributes(flowFile, toQueryTableAttributes(tableNames));
+            session.getProvenanceReporter().send(updatedFlowFile, fc.connectionUrl, transmissionMillis, true);
+            result.routeTo(flowFile, REL_SUCCESS);
+
+        }, onFlowFileError(context, session, result));
+
+    };
+
+    private OnError<FunctionContext, FlowFile> onFlowFileError(final ProcessContext context, final ProcessSession session, final RoutingResult result) {
+        OnError<FunctionContext, FlowFile> onFlowFileError = ExceptionHandler.createOnError(context, session, result, REL_FAILURE, REL_RETRY);
+        onFlowFileError = onFlowFileError.andThen((c, i, r, e) -> {
+            switch (r.destination()) {
+                case Failure:
+                    getLogger().error("Failed to update Hive for {} due to {}; routing to failure", new Object[] {i, e}, e);
+                    break;
+                case Retry:
+                    getLogger().error("Failed to update Hive for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry",
+                            new Object[] {i, e}, e);
+                    break;
+            }
+        });
+        return RollbackOnFailure.createOnError(onFlowFileError);
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
+        final Boolean rollbackOnFailure = context.getProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE).asBoolean();
+        final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue());
+        final String statementDelimiter = context.getProperty(STATEMENT_DELIMITER).getValue();
+        final FunctionContext functionContext = new FunctionContext(rollbackOnFailure, charset, statementDelimiter);
+        RollbackOnFailure.onTrigger(context, sessionFactory, functionContext, getLogger(), session -> process.onTrigger(context, session, functionContext));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/da99f873/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
new file mode 100644
index 0000000..664915c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
@@ -0,0 +1,560 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hive;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.common.util.ShutdownHookManager;
+import org.apache.hive.streaming.ConnectionError;
+import org.apache.hive.streaming.HiveStreamingConnection;
+import org.apache.hive.streaming.InvalidTable;
+import org.apache.hive.streaming.SerializationError;
+import org.apache.hive.streaming.StreamingConnection;
+import org.apache.hive.streaming.StreamingException;
+import org.apache.hive.streaming.StreamingIOFailure;
+import org.apache.hive.streaming.TransactionError;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.hadoop.SecurityUtil;
+import org.apache.nifi.kerberos.KerberosCredentialsService;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.pattern.DiscontinuedException;
+import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
+import org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.util.hive.AuthenticationFailedException;
+import org.apache.nifi.util.hive.HiveConfigurator;
+import org.apache.nifi.util.hive.HiveOptions;
+import org.apache.hive.streaming.HiveRecordWriter;
+import org.apache.nifi.util.hive.HiveUtils;
+import org.apache.nifi.util.hive.ValidationResources;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.processors.hive.AbstractHive3QLProcessor.ATTR_OUTPUT_TABLES;
+
+@Tags({"hive", "streaming", "put", "database", "store"})
+@CapabilityDescription("This processor uses Hive Streaming to send flow file records to an Apache Hive 3.0+ table. "
+        + "The partition values are expected to be the 'last' fields of each record, so if the table is partitioned on column A for example, then the last field in "
+        + "each record should be field A.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "hivestreaming.record.count", description = "This attribute is written on the flow files routed to the 'success' "
+                + "and 'failure' relationships, and contains the number of records from the incoming flow file. All records in a flow file are committed as a single transaction."),
+        @WritesAttribute(attribute = "query.output.tables", description = "This attribute is written on the flow files routed to the 'success' "
+                + "and 'failure' relationships, and contains the target table name in 'databaseName.tableName' format.")
+})
+@RequiresInstanceClassLoading
+public class PutHive3Streaming extends AbstractProcessor {
+    // Attributes
+    public static final String HIVE_STREAMING_RECORD_COUNT_ATTR = "hivestreaming.record.count";
+
+    private static final String CLIENT_CACHE_DISABLED_PROPERTY = "hcatalog.hive.client.cache.disabled";
+
+    // Properties
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("The service for reading records from incoming flow files.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor METASTORE_URI = new PropertyDescriptor.Builder()
+            .name("hive3-stream-metastore-uri")
+            .displayName("Hive Metastore URI")
+            .description("The URI location for the Hive Metastore. Note that this is not the location of the Hive Server. The default port for the "
+                    + "Hive metastore is 9043.")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.URI_VALIDATOR)
+            .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))) // no start with / or end with /
+            .build();
+
+    static final PropertyDescriptor HIVE_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder()
+            .name("hive3-config-resources")
+            .displayName("Hive Configuration Resources")
+            .description("A file or comma separated list of files which contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop "
+                    + "will search the classpath for a 'hive-site.xml' file or will revert to a default configuration. Note that to enable authentication "
+                    + "with Kerberos e.g., the appropriate properties must be set in the configuration files. Also note that if Max Concurrent Tasks is set "
+                    + "to a number greater than one, the 'hcatalog.hive.client.cache.disabled' property will be forced to 'true' to avoid concurrency issues. "
+                    + "Please see the Hive documentation for more details.")
+            .required(false)
+            .addValidator(HiveUtils.createMultipleFilesExistValidator())
+            .build();
+
+    static final PropertyDescriptor DB_NAME = new PropertyDescriptor.Builder()
+            .name("hive3-stream-database-name")
+            .displayName("Database Name")
+            .description("The name of the database in which to put the data.")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
+            .name("hive3-stream-table-name")
+            .displayName("Table Name")
+            .description("The name of the database table in which to put the data.")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor PARTITION_VALUES = new PropertyDescriptor.Builder()
+            .name("hive3-stream-part-vals")
+            .displayName("Partition Values")
+            .description("Specifies a comma-separated list of the values for the partition columns of the target table. If the incoming records all have the same values "
+                    + "for the partition columns, those values can be entered here, resulting in a performance gain. If specified, this property will often contain "
+                    + "Expression Language, for example if PartitionRecord is upstream and two partitions 'name' and 'age' are used, then this property can be set to "
+                    + "${name},${age}.")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor AUTOCREATE_PARTITIONS = new PropertyDescriptor.Builder()
+            .name("hive3-stream-autocreate-partition")
+            .displayName("Auto-Create Partitions")
+            .description("Flag indicating whether partitions should be automatically created")
+            .required(true)
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .build();
+
+    static final PropertyDescriptor CALL_TIMEOUT = new PropertyDescriptor.Builder()
+            .name("hive3-stream-call-timeout")
+            .displayName("Call Timeout")
+            .description("The number of seconds allowed for a Hive Streaming operation to complete. A value of 0 indicates the processor should wait indefinitely on operations. "
+                    + "Note that although this property supports Expression Language, it will not be evaluated against incoming FlowFile attributes.")
+            .defaultValue("0")
+            .required(true)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor DISABLE_STREAMING_OPTIMIZATIONS = new PropertyDescriptor.Builder()
+            .name("hive3-stream-disable-optimizations")
+            .displayName("Disable Streaming Optimizations")
+            .description("Whether to disable streaming optimizations. Disabling streaming optimizations will have significant impact to performance and memory consumption.")
+            .required(true)
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+
+    static final PropertyDescriptor ROLLBACK_ON_FAILURE = RollbackOnFailure.createRollbackOnFailureProperty(
+            "NOTE: When an error occurred after a Hive streaming transaction which is derived from the same input FlowFile is already committed," +
+                    " (i.e. a FlowFile contains more records than 'Records per Transaction' and a failure occurred at the 2nd transaction or later)" +
+                    " then the succeeded records will be transferred to 'success' relationship while the original input FlowFile stays in incoming queue." +
+                    " Duplicated records can be created for the succeeded ones when the same FlowFile is processed again.");
+
+    static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
+            .name("kerberos-credentials-service")
+            .displayName("Kerberos Credentials Service")
+            .description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos")
+            .identifiesControllerService(KerberosCredentialsService.class)
+            .required(false)
+            .build();
+
+    // Relationships
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile containing Avro records routed to this relationship after the record has been successfully transmitted to Hive.")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile containing Avro records routed to this relationship if the record could not be transmitted to Hive.")
+            .build();
+
+    public static final Relationship REL_RETRY = new Relationship.Builder()
+            .name("retry")
+            .description("The incoming FlowFile is routed to this relationship if its records cannot be transmitted to Hive. Note that "
+                    + "some records may have been processed successfully, they will be routed (as Avro flow files) to the success relationship. "
+                    + "The combination of the retry, success, and failure relationships indicate how many records succeeded and/or failed. This "
+                    + "can be used to provide a retry capability since full rollback is not possible.")
+            .build();
+
+    private List<PropertyDescriptor> propertyDescriptors;
+    private Set<Relationship> relationships;
+
+    protected volatile HiveConfigurator hiveConfigurator = new HiveConfigurator();
+    protected volatile UserGroupInformation ugi;
+    protected volatile HiveConf hiveConfig;
+
+    protected volatile int callTimeout;
+    protected ExecutorService callTimeoutPool;
+    protected volatile boolean rollbackOnFailure;
+
+    // Holder of cached Configuration information so validation does not reload the same config over and over
+    private final AtomicReference<ValidationResources> validationResourceHolder = new AtomicReference<>();
+
+    @Override
+    protected void init(ProcessorInitializationContext context) {
+        List<PropertyDescriptor> props = new ArrayList<>();
+        props.add(RECORD_READER);
+        props.add(METASTORE_URI);
+        props.add(HIVE_CONFIGURATION_RESOURCES);
+        props.add(DB_NAME);
+        props.add(TABLE_NAME);
+        props.add(PARTITION_VALUES);
+        props.add(AUTOCREATE_PARTITIONS);
+        props.add(CALL_TIMEOUT);
+        props.add(DISABLE_STREAMING_OPTIMIZATIONS);
+        props.add(ROLLBACK_ON_FAILURE);
+        props.add(KERBEROS_CREDENTIALS_SERVICE);
+
+        propertyDescriptors = Collections.unmodifiableList(props);
+
+        Set<Relationship> _relationships = new HashSet<>();
+        _relationships.add(REL_SUCCESS);
+        _relationships.add(REL_FAILURE);
+        _relationships.add(REL_RETRY);
+        relationships = Collections.unmodifiableSet(_relationships);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
+        boolean confFileProvided = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).isSet();
+
+        final List<ValidationResult> problems = new ArrayList<>();
+
+        final KerberosCredentialsService credentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+
+        final String resolvedPrincipal = credentialsService != null ? credentialsService.getPrincipal() : null;
+        final String resolvedKeytab = credentialsService != null ? credentialsService.getKeytab() : null;
+        if (confFileProvided) {
+            final String configFiles = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
+            problems.addAll(hiveConfigurator.validate(configFiles, resolvedPrincipal, resolvedKeytab, validationResourceHolder, getLogger()));
+        }
+
+        return problems;
+    }
+
+    @OnScheduled
+    public void setup(final ProcessContext context) {
+        ComponentLog log = getLogger();
+        rollbackOnFailure = context.getProperty(ROLLBACK_ON_FAILURE).asBoolean();
+
+        final String configFiles = context.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue();
+        hiveConfig = hiveConfigurator.getConfigurationFromFiles(configFiles);
+
+        // If more than one concurrent task, force 'hcatalog.hive.client.cache.disabled' to true
+        if (context.getMaxConcurrentTasks() > 1) {
+            hiveConfig.setBoolean(CLIENT_CACHE_DISABLED_PROPERTY, true);
+        }
+
+        // add any dynamic properties to the Hive configuration
+        for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
+            final PropertyDescriptor descriptor = entry.getKey();
+            if (descriptor.isDynamic()) {
+                hiveConfig.set(descriptor.getName(), entry.getValue());
+            }
+        }
+
+        hiveConfigurator.preload(hiveConfig);
+
+        if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
+            final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+
+            final String resolvedPrincipal = credentialsService.getPrincipal();
+            final String resolvedKeytab = credentialsService.getKeytab();
+
+            log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[]{resolvedPrincipal, resolvedKeytab});
+            try {
+                ugi = hiveConfigurator.authenticate(hiveConfig, resolvedPrincipal, resolvedKeytab);
+            } catch (AuthenticationFailedException ae) {
+                throw new ProcessException("Kerberos authentication failed for Hive Streaming", ae);
+            }
+
+            log.info("Successfully logged in as principal {} with keytab {}", new Object[]{resolvedPrincipal, resolvedKeytab});
+        } else {
+            ugi = null;
+        }
+
+        callTimeout = context.getProperty(CALL_TIMEOUT).evaluateAttributeExpressions().asInteger() * 1000; // milliseconds
+        String timeoutName = "put-hive3-streaming-%d";
+        this.callTimeoutPool = Executors.newFixedThreadPool(1,
+                new ThreadFactoryBuilder().setNameFormat(timeoutName).build());
+    }
+
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final String dbName = context.getProperty(DB_NAME).evaluateAttributeExpressions(flowFile).getValue();
+        final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+
+        final ComponentLog log = getLogger();
+        final String metastoreUri = context.getProperty(METASTORE_URI).evaluateAttributeExpressions(flowFile).getValue();
+
+        final String partitionValuesString = context.getProperty(PARTITION_VALUES).evaluateAttributeExpressions(flowFile).getValue();
+        final boolean autoCreatePartitions = context.getProperty(AUTOCREATE_PARTITIONS).asBoolean();
+        final boolean disableStreamingOptimizations = context.getProperty(DISABLE_STREAMING_OPTIMIZATIONS).asBoolean();
+
+        HiveOptions o = new HiveOptions(metastoreUri, dbName, tableName)
+                .withHiveConf(hiveConfig)
+                .withAutoCreatePartitions(autoCreatePartitions)
+                .withCallTimeout(callTimeout)
+                .withStreamingOptimizations(!disableStreamingOptimizations);
+
+        if (!StringUtils.isEmpty(partitionValuesString)) {
+            List<String> staticPartitionValues = Arrays.stream(partitionValuesString.split(",")).filter(Objects::nonNull).map(String::trim).collect(Collectors.toList());
+            o = o.withStaticPartitionValues(staticPartitionValues);
+        }
+
+        if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
+            final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+            o = o.withKerberosPrincipal(credentialsService.getPrincipal()).withKerberosKeytab(credentialsService.getKeytab());
+        }
+
+        final HiveOptions options = o;
+
+        // Store the original class loader, then explicitly set it to this class's classloader (for use by the Hive Metastore)
+        ClassLoader originalClassloader = Thread.currentThread().getContextClassLoader();
+        Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+
+        StreamingConnection hiveStreamingConnection = null;
+
+        try (final InputStream rawIn = session.read(flowFile)) {
+            final RecordReader reader;
+
+            try (final BufferedInputStream in = new BufferedInputStream(rawIn)) {
+
+                // if we fail to create the RecordReader then we want to route to failure, so we need to
+                // handle this separately from the other IOExceptions which normally route to retry
+                try {
+                    reader = recordReaderFactory.createRecordReader(flowFile, in, getLogger());
+                } catch (Exception e) {
+                    throw new RecordReaderFactoryException("Unable to create RecordReader", e);
+                }
+
+                hiveStreamingConnection = makeStreamingConnection(options, reader);
+                // Add shutdown handler with higher priority than FileSystem shutdown hook so that streaming connection gets closed first before
+                // filesystem close (to avoid ClosedChannelException)
+                ShutdownHookManager.addShutdownHook(hiveStreamingConnection::close, FileSystem.SHUTDOWN_HOOK_PRIORITY + 1);
+
+                // Write records to Hive streaming, then commit and close
+                hiveStreamingConnection.beginTransaction();
+                hiveStreamingConnection.write(in);
+                hiveStreamingConnection.commitTransaction();
+                rawIn.close();
+
+                Map<String, String> updateAttributes = new HashMap<>();
+                updateAttributes.put(HIVE_STREAMING_RECORD_COUNT_ATTR, Long.toString(hiveStreamingConnection.getConnectionStats().getRecordsWritten()));
+                updateAttributes.put(ATTR_OUTPUT_TABLES, options.getQualifiedTableName());
+                flowFile = session.putAllAttributes(flowFile, updateAttributes);
+                session.getProvenanceReporter().send(flowFile, hiveStreamingConnection.getMetastoreUri());
+                session.transfer(flowFile, REL_SUCCESS);
+            } catch (TransactionError te) {
+                if (rollbackOnFailure) {
+                    throw new ProcessException(te.getLocalizedMessage(), te);
+                } else {
+                    throw new ShouldRetryException(te.getLocalizedMessage(), te);
+                }
+            } catch (RecordReaderFactoryException rrfe) {
+                throw new ProcessException(rrfe);
+            }
+        } catch (InvalidTable | SerializationError | StreamingIOFailure | IOException e) {
+            if (rollbackOnFailure) {
+                if (hiveStreamingConnection != null) {
+                    abortConnection(hiveStreamingConnection);
+                }
+                throw new ProcessException(e.getLocalizedMessage(), e);
+            } else {
+                Map<String, String> updateAttributes = new HashMap<>();
+                updateAttributes.put(HIVE_STREAMING_RECORD_COUNT_ATTR, Long.toString(hiveStreamingConnection.getConnectionStats().getRecordsWritten()));
+                updateAttributes.put(ATTR_OUTPUT_TABLES, options.getQualifiedTableName());
+                flowFile = session.putAllAttributes(flowFile, updateAttributes);
+                session.transfer(flowFile, REL_FAILURE);
+            }
+        } catch (DiscontinuedException e) {
+            // The input FlowFile processing is discontinued. Keep it in the input queue.
+            getLogger().warn("Discontinued processing for {} due to {}", new Object[]{flowFile, e}, e);
+            session.transfer(flowFile, Relationship.SELF);
+        } catch (ConnectionError ce) {
+            // If we can't connect to the metastore, yield the processor
+            context.yield();
+            throw new ProcessException("A connection to metastore cannot be established", ce);
+        } catch (ShouldRetryException e) {
+            // This exception is already a result of adjusting an error, so simply transfer the FlowFile to retry. Still need to abort the txn
+            getLogger().error(e.getLocalizedMessage(), e);
+            if (hiveStreamingConnection != null) {
+                abortConnection(hiveStreamingConnection);
+            }
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_RETRY);
+        } catch (StreamingException se) {
+            // Handle all other exceptions. These are often record-based exceptions (since Hive will throw a subclass of the exception caught above)
+            Throwable cause = se.getCause();
+            if (cause == null) cause = se;
+            // This is a failure on the incoming data, rollback on failure if specified; otherwise route to failure after penalizing (and abort txn in any case)
+            if (rollbackOnFailure) {
+                if (hiveStreamingConnection != null) {
+                    abortConnection(hiveStreamingConnection);
+                }
+                throw new ProcessException(cause.getLocalizedMessage(), cause);
+            } else {
+                flowFile = session.penalize(flowFile);
+                Map<String, String> updateAttributes = new HashMap<>();
+                updateAttributes.put(HIVE_STREAMING_RECORD_COUNT_ATTR, Long.toString(hiveStreamingConnection.getConnectionStats().getRecordsWritten()));
+                updateAttributes.put(ATTR_OUTPUT_TABLES, options.getQualifiedTableName());
+                flowFile = session.putAllAttributes(flowFile, updateAttributes);
+                session.transfer(flowFile, REL_FAILURE);
+            }
+
+        } catch (Throwable t) {
+            if (hiveStreamingConnection != null) {
+                abortConnection(hiveStreamingConnection);
+            }
+            throw (t instanceof ProcessException) ? (ProcessException) t : new ProcessException(t);
+        } finally {
+            closeConnection(hiveStreamingConnection);
+            // Restore original class loader, might not be necessary but is good practice since the processor task changed it
+            Thread.currentThread().setContextClassLoader(originalClassloader);
+        }
+    }
+
+    StreamingConnection makeStreamingConnection(HiveOptions options, RecordReader reader) throws StreamingException {
+        return HiveStreamingConnection.newBuilder()
+                .withDatabase(options.getDatabaseName())
+                .withTable(options.getTableName())
+                .withStaticPartitionValues(options.getStaticPartitionValues())
+                .withHiveConf(options.getHiveConf())
+                .withRecordWriter(new HiveRecordWriter(reader, getLogger()))
+                .withAgentInfo("NiFi " + this.getClass().getSimpleName() + " [" + this.getIdentifier()
+                        + "] thread " + Thread.currentThread().getId() + "[" + Thread.currentThread().getName() + "]")
+                .connect();
+    }
+
+    @OnStopped
+    public void cleanup() {
+        validationResourceHolder.set(null); // trigger re-validation of resources
+
+        ComponentLog log = getLogger();
+
+        if (callTimeoutPool != null) {
+            callTimeoutPool.shutdown();
+            try {
+                while (!callTimeoutPool.isTerminated()) {
+                    callTimeoutPool.awaitTermination(callTimeout, TimeUnit.MILLISECONDS);
+                }
+            } catch (Throwable t) {
+                log.warn("shutdown interrupted on " + callTimeoutPool, t);
+            }
+            callTimeoutPool = null;
+        }
+
+        ugi = null;
+    }
+
+    private void abortAndCloseConnection(StreamingConnection connection) {
+        try {
+            abortConnection(connection);
+            closeConnection(connection);
+        } catch (Exception ie) {
+            getLogger().warn("unable to close hive connections. ", ie);
+        }
+    }
+
+    /**
+     * Abort current Txn on the connection
+     */
+    private void abortConnection(StreamingConnection connection) {
+        if (connection != null) {
+            try {
+                connection.abortTransaction();
+            } catch (Exception e) {
+                getLogger().error("Failed to abort Hive Streaming transaction " + connection + " due to exception ", e);
+            }
+        }
+    }
+
+    /**
+     * Close the streaming connection
+     */
+    private void closeConnection(StreamingConnection connection) {
+        if (connection != null) {
+            try {
+                connection.close();
+            } catch (Exception e) {
+                getLogger().error("Failed to close Hive Streaming connection " + connection + " due to exception ", e);
+            }
+        }
+    }
+
+    private static class ShouldRetryException extends RuntimeException {
+        private ShouldRetryException(String message, Throwable cause) {
+            super(message, cause);
+        }
+    }
+}
+