You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2018/06/13 18:33:25 UTC
[5/6] nifi git commit: NIFI-4963: Added Hive3 bundle - Incorporated
review comments - Added more defensive code for PutHive3Streaming error
handling
http://git-wip-us.apache.org/repos/asf/nifi/blob/da99f873/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/dbcp/hive/Hive3ConnectionPool.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/dbcp/hive/Hive3ConnectionPool.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/dbcp/hive/Hive3ConnectionPool.java
new file mode 100644
index 0000000..b0662b8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/dbcp/hive/Hive3ConnectionPool.java
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.dbcp.hive;
+
+import java.io.File;
+
+import org.apache.commons.dbcp.BasicDataSource;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.jdbc.HiveDriver;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.hadoop.KerberosProperties;
+import org.apache.nifi.hadoop.SecurityUtil;
+import org.apache.nifi.kerberos.KerberosCredentialsService;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.hive.AuthenticationFailedException;
+import org.apache.nifi.util.hive.HiveConfigurator;
+import org.apache.nifi.util.hive.HiveUtils;
+import org.apache.nifi.util.hive.ValidationResources;
+
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.security.PrivilegedExceptionAction;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+
+/**
+ * Implementation for Database Connection Pooling Service used for Apache Hive
+ * connections. Apache DBCP is used for connection pooling functionality.
+ */
+@RequiresInstanceClassLoading
+@Tags({"hive", "dbcp", "jdbc", "database", "connection", "pooling", "store"})
+@CapabilityDescription("Provides Database Connection Pooling Service for Apache Hive 3.x. Connections can be asked from pool and returned after usage.")
+public class Hive3ConnectionPool extends AbstractControllerService implements Hive3DBCPService {
+ private static final String ALLOW_EXPLICIT_KEYTAB = "NIFI_ALLOW_EXPLICIT_KEYTAB";
+
+ static final PropertyDescriptor DATABASE_URL = new PropertyDescriptor.Builder()
+ .name("hive-db-connect-url")
+ .displayName("Database Connection URL")
+ .description("A database connection URL used to connect to a database. May contain database system name, host, port, database name and some parameters."
+ + " The exact syntax of a database connection URL is specified by the Hive documentation. For example, the server principal is often included "
+ + "as a connection parameter when connecting to a secure Hive server.")
+ .defaultValue(null)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ static final PropertyDescriptor HIVE_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder()
+ .name("hive-config-resources")
+ .displayName("Hive Configuration Resources")
+ .description("A file or comma separated list of files which contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop "
+ + "will search the classpath for a 'hive-site.xml' file or will revert to a default configuration. Note that to enable authentication "
+ + "with Kerberos e.g., the appropriate properties must be set in the configuration files. Please see the Hive documentation for more details.")
+ .required(false)
+ .addValidator(HiveUtils.createMultipleFilesExistValidator())
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ static final PropertyDescriptor DB_USER = new PropertyDescriptor.Builder()
+ .name("hive-db-user")
+ .displayName("Database User")
+ .description("Database user name")
+ .defaultValue(null)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ static final PropertyDescriptor DB_PASSWORD = new PropertyDescriptor.Builder()
+ .name("hive-db-password")
+ .displayName("Password")
+ .description("The password for the database user")
+ .defaultValue(null)
+ .required(false)
+ .sensitive(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ static final PropertyDescriptor MAX_WAIT_TIME = new PropertyDescriptor.Builder()
+ .name("hive-max-wait-time")
+ .displayName("Max Wait Time")
+ .description("The maximum amount of time that the pool will wait (when there are no available connections) "
+ + " for a connection to be returned before failing, or -1 to wait indefinitely. ")
+ .defaultValue("500 millis")
+ .required(true)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ static final PropertyDescriptor MAX_TOTAL_CONNECTIONS = new PropertyDescriptor.Builder()
+ .name("hive-max-total-connections")
+ .displayName("Max Total Connections")
+ .description("The maximum number of active connections that can be allocated from this pool at the same time, "
+ + "or negative for no limit.")
+ .defaultValue("8")
+ .required(true)
+ .addValidator(StandardValidators.INTEGER_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ static final PropertyDescriptor VALIDATION_QUERY = new PropertyDescriptor.Builder()
+ .name("Validation-query")
+ .displayName("Validation query")
+ .description("Validation query used to validate connections before returning them. "
+ + "When a borrowed connection is invalid, it gets dropped and a new valid connection will be returned. "
+ + "NOTE: Using validation may have a performance penalty.")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ private static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
+ .name("kerberos-credentials-service")
+ .displayName("Kerberos Credentials Service")
+ .description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos")
+ .identifiesControllerService(KerberosCredentialsService.class)
+ .required(false)
+ .build();
+
+
+ private List<PropertyDescriptor> properties;
+
+ private String connectionUrl = "unknown";
+
+ // Holder of cached Configuration information so validation does not reload the same config over and over
+ private final AtomicReference<ValidationResources> validationResourceHolder = new AtomicReference<>();
+
+ private volatile BasicDataSource dataSource;
+
+ private volatile HiveConfigurator hiveConfigurator = new HiveConfigurator();
+ private volatile UserGroupInformation ugi;
+ private volatile File kerberosConfigFile = null;
+ private volatile KerberosProperties kerberosProperties;
+
+ @Override
+ protected void init(final ControllerServiceInitializationContext context) {
+ List<PropertyDescriptor> props = new ArrayList<>();
+ props.add(DATABASE_URL);
+ props.add(HIVE_CONFIGURATION_RESOURCES);
+ props.add(DB_USER);
+ props.add(DB_PASSWORD);
+ props.add(MAX_WAIT_TIME);
+ props.add(MAX_TOTAL_CONNECTIONS);
+ props.add(VALIDATION_QUERY);
+ props.add(KERBEROS_CREDENTIALS_SERVICE);
+
+ kerberosConfigFile = context.getKerberosConfigurationFile();
+ kerberosProperties = new KerberosProperties(kerberosConfigFile);
+ props.add(kerberosProperties.getKerberosPrincipal());
+ props.add(kerberosProperties.getKerberosKeytab());
+ properties = props;
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+ boolean confFileProvided = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).isSet();
+
+ final List<ValidationResult> problems = new ArrayList<>();
+
+ if (confFileProvided) {
+ final String explicitPrincipal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
+ final String explicitKeytab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
+ final KerberosCredentialsService credentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+
+ final String resolvedPrincipal;
+ final String resolvedKeytab;
+ if (credentialsService == null) {
+ resolvedPrincipal = explicitPrincipal;
+ resolvedKeytab = explicitKeytab;
+ } else {
+ resolvedPrincipal = credentialsService.getPrincipal();
+ resolvedKeytab = credentialsService.getKeytab();
+ }
+
+
+ final String configFiles = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
+ problems.addAll(hiveConfigurator.validate(configFiles, resolvedPrincipal, resolvedKeytab, validationResourceHolder, getLogger()));
+
+ if (credentialsService != null && (explicitPrincipal != null || explicitKeytab != null)) {
+ problems.add(new ValidationResult.Builder()
+ .subject("Kerberos Credentials")
+ .valid(false)
+ .explanation("Cannot specify both a Kerberos Credentials Service and a principal/keytab")
+ .build());
+ }
+
+ final String allowExplicitKeytabVariable = System.getenv(ALLOW_EXPLICIT_KEYTAB);
+ if ("false".equalsIgnoreCase(allowExplicitKeytabVariable) && (explicitPrincipal != null || explicitKeytab != null)) {
+ problems.add(new ValidationResult.Builder()
+ .subject("Kerberos Credentials")
+ .valid(false)
+ .explanation("The '" + ALLOW_EXPLICIT_KEYTAB + "' system environment variable is configured to forbid explicitly configuring principal/keytab in processors. "
+ + "The Kerberos Credentials Service should be used instead of setting the Kerberos Keytab or Kerberos Principal property.")
+ .build());
+ }
+ }
+
+ return problems;
+ }
+
+ /**
+ * Configures connection pool by creating an instance of the
+ * {@link BasicDataSource} based on configuration provided with
+ * {@link ConfigurationContext}.
+ * <p>
+ * This operation makes no guarantees that the actual connection could be
+ * made since the underlying system may still go off-line during normal
+ * operation of the connection pool.
+ * <p/>
+ * As of Apache NiFi 1.5.0, due to changes made to
+ * {@link SecurityUtil#loginKerberos(Configuration, String, String)}, which is used by this class invoking
+ * {@link HiveConfigurator#authenticate(Configuration, String, String)}
+ * to authenticate a principal with Kerberos, Hive controller services no longer
+ * attempt relogins explicitly. For more information, please read the documentation for
+ * {@link SecurityUtil#loginKerberos(Configuration, String, String)}.
+ * <p/>
+ * In previous versions of NiFi, a {@link org.apache.nifi.hadoop.KerberosTicketRenewer} was started by
+ * {@link HiveConfigurator#authenticate(Configuration, String, String, long)} when the Hive
+ * controller service was enabled. The use of a separate thread to explicitly relogin could cause race conditions
+ * with the implicit relogin attempts made by hadoop/Hive code on a thread that references the same
+ * {@link UserGroupInformation} instance. One of these threads could leave the
+ * {@link javax.security.auth.Subject} in {@link UserGroupInformation} to be cleared or in an unexpected state
+ * while the other thread is attempting to use the {@link javax.security.auth.Subject}, resulting in failed
+ * authentication attempts that would leave the Hive controller service in an unrecoverable state.
+ *
+ * @see SecurityUtil#loginKerberos(Configuration, String, String)
+ * @see HiveConfigurator#authenticate(Configuration, String, String)
+ * @see HiveConfigurator#authenticate(Configuration, String, String, long)
+ * @param context the configuration context
+ * @throws InitializationException if unable to create a database connection
+ */
+ @OnEnabled
+ public void onConfigured(final ConfigurationContext context) throws InitializationException {
+
+ ComponentLog log = getLogger();
+
+ final String configFiles = context.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
+ final Configuration hiveConfig = hiveConfigurator.getConfigurationFromFiles(configFiles);
+ final String validationQuery = context.getProperty(VALIDATION_QUERY).evaluateAttributeExpressions().getValue();
+
+ // add any dynamic properties to the Hive configuration
+ for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
+ final PropertyDescriptor descriptor = entry.getKey();
+ if (descriptor.isDynamic()) {
+ hiveConfig.set(descriptor.getName(), context.getProperty(descriptor).evaluateAttributeExpressions().getValue());
+ }
+ }
+
+ final String drv = HiveDriver.class.getName();
+ if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
+ final String explicitPrincipal = context.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
+ final String explicitKeytab = context.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
+ final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+
+ final String resolvedPrincipal;
+ final String resolvedKeytab;
+ if (credentialsService == null) {
+ resolvedPrincipal = explicitPrincipal;
+ resolvedKeytab = explicitKeytab;
+ } else {
+ resolvedPrincipal = credentialsService.getPrincipal();
+ resolvedKeytab = credentialsService.getKeytab();
+ }
+
+ log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab});
+ try {
+ ugi = hiveConfigurator.authenticate(hiveConfig, resolvedPrincipal, resolvedKeytab);
+ } catch (AuthenticationFailedException ae) {
+ log.error(ae.getMessage(), ae);
+ }
+
+ getLogger().info("Successfully logged in as principal {} with keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab});
+ }
+
+ final String user = context.getProperty(DB_USER).evaluateAttributeExpressions().getValue();
+ final String passw = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue();
+ final Long maxWaitMillis = context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
+ final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).evaluateAttributeExpressions().asInteger();
+
+ dataSource = new BasicDataSource();
+ dataSource.setDriverClassName(drv);
+
+ connectionUrl = context.getProperty(DATABASE_URL).evaluateAttributeExpressions().getValue();
+
+ dataSource.setMaxWait(maxWaitMillis);
+ dataSource.setMaxActive(maxTotal);
+
+ if (validationQuery != null && !validationQuery.isEmpty()) {
+ dataSource.setValidationQuery(validationQuery);
+ dataSource.setTestOnBorrow(true);
+ }
+
+ dataSource.setUrl(connectionUrl);
+ dataSource.setUsername(user);
+ dataSource.setPassword(passw);
+ }
+
+ /**
+ * Shutdown pool, close all open connections.
+ */
+ @OnDisabled
+ public void shutdown() {
+ try {
+ dataSource.close();
+ } catch (final SQLException e) {
+ throw new ProcessException(e);
+ }
+ }
+
+ @Override
+ public Connection getConnection() throws ProcessException {
+ try {
+ if (ugi != null) {
+ try {
+ return ugi.doAs((PrivilegedExceptionAction<Connection>) () -> dataSource.getConnection());
+ } catch (UndeclaredThrowableException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof SQLException) {
+ throw (SQLException) cause;
+ } else {
+ throw e;
+ }
+ }
+ } else {
+ getLogger().info("Simple Authentication");
+ return dataSource.getConnection();
+ }
+ } catch (SQLException | IOException | InterruptedException e) {
+ getLogger().error("Error getting Hive connection", e);
+ throw new ProcessException(e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "Hive3ConnectionPool[id=" + getIdentifier() + "]";
+ }
+
+ @Override
+ public String getConnectionURL() {
+ return connectionUrl;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/da99f873/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/AbstractHive3QLProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/AbstractHive3QLProcessor.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/AbstractHive3QLProcessor.java
new file mode 100644
index 0000000..4fcce19
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/AbstractHive3QLProcessor.java
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hive;
+
+import org.antlr.runtime.tree.CommonTree;
+import org.apache.hadoop.hive.ql.parse.ASTNode;
+import org.apache.hadoop.hive.ql.parse.ParseDriver;
+import org.apache.hadoop.hive.ql.parse.ParseException;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dbcp.hive.Hive3DBCPService;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.StreamUtils;
+
+import java.math.BigDecimal;
+import java.nio.charset.Charset;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.SQLDataException;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * An abstract base class for HiveQL processors to share common data, methods, etc.
+ */
+public abstract class AbstractHive3QLProcessor extends AbstractSessionFactoryProcessor {
+
+ protected static final Pattern HIVEQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("hiveql\\.args\\.(\\d+)\\.type");
+ protected static final Pattern NUMBER_PATTERN = Pattern.compile("-?\\d+");
+ static String ATTR_INPUT_TABLES = "query.input.tables";
+ static String ATTR_OUTPUT_TABLES = "query.output.tables";
+
+
+ public static final PropertyDescriptor HIVE_DBCP_SERVICE = new PropertyDescriptor.Builder()
+ .name("hive3-dbcp-service")
+ .displayName("Hive Database Connection Pooling Service")
+ .description("The Hive Controller Service that is used to obtain connection(s) to the Hive database")
+ .required(true)
+ .identifiesControllerService(Hive3DBCPService.class)
+ .build();
+
+ public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
+ .name("hive3-charset")
+ .displayName("Character Set")
+ .description("Specifies the character set of the record data.")
+ .required(true)
+ .defaultValue("UTF-8")
+ .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor QUERY_TIMEOUT = new PropertyDescriptor.Builder()
+ .name("hive3-query-timeout")
+ .displayName("Query timeout")
+ .description("Sets the number of seconds the driver will wait for a query to execute. "
+ + "A value of 0 means no timeout. NOTE: Non-zero values may not be supported by the driver.")
+ .defaultValue("0")
+ .required(true)
+ .addValidator(StandardValidators.INTEGER_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ /**
+ * Determines the HiveQL statement that should be executed for the given FlowFile
+ *
+ * @param session the session that can be used to access the given FlowFile
+ * @param flowFile the FlowFile whose HiveQL statement should be executed
+ * @return the HiveQL that is associated with the given FlowFile
+ */
+ protected String getHiveQL(final ProcessSession session, final FlowFile flowFile, final Charset charset) {
+ // Read the HiveQL from the FlowFile's content
+ final byte[] buffer = new byte[(int) flowFile.getSize()];
+ session.read(flowFile, in -> StreamUtils.fillBuffer(in, buffer));
+
+ // Create the PreparedStatement to use for this FlowFile.
+ return new String(buffer, charset);
+ }
+
+ private class ParameterHolder {
+ String attributeName;
+ int jdbcType;
+ String value;
+ }
+
+ /**
+ * Sets all of the appropriate parameters on the given PreparedStatement, based on the given FlowFile attributes.
+ *
+ * @param stmt the statement to set the parameters on
+ * @param attributes the attributes from which to derive parameter indices, values, and types
+ * @throws SQLException if the PreparedStatement throws a SQLException when the appropriate setter is called
+ */
+ protected int setParameters(int base, final PreparedStatement stmt, int paramCount, final Map<String, String> attributes) throws SQLException {
+
+ Map<Integer, ParameterHolder> parmMap = new TreeMap<Integer, ParameterHolder>();
+
+ for (final Map.Entry<String, String> entry : attributes.entrySet()) {
+ final String key = entry.getKey();
+ final Matcher matcher = HIVEQL_TYPE_ATTRIBUTE_PATTERN.matcher(key);
+ if (matcher.matches()) {
+ final int parameterIndex = Integer.parseInt(matcher.group(1));
+ if (parameterIndex >= base && parameterIndex < base + paramCount) {
+ final boolean isNumeric = NUMBER_PATTERN.matcher(entry.getValue()).matches();
+ if (!isNumeric) {
+ throw new SQLDataException("Value of the " + key + " attribute is '" + entry.getValue() + "', which is not a valid JDBC numeral jdbcType");
+ }
+
+ final String valueAttrName = "hiveql.args." + parameterIndex + ".value";
+
+ ParameterHolder ph = new ParameterHolder();
+ int realIndexLoc = parameterIndex - base +1;
+
+ ph.jdbcType = Integer.parseInt(entry.getValue());
+ ph.value = attributes.get(valueAttrName);
+ ph.attributeName = valueAttrName;
+
+ parmMap.put(realIndexLoc, ph);
+
+ }
+ }
+ }
+
+
+ // Now that's we've retrieved the correct number of parameters and it's sorted, let's set them.
+ for (final Map.Entry<Integer, ParameterHolder> entry : parmMap.entrySet()) {
+ final Integer index = entry.getKey();
+ final ParameterHolder ph = entry.getValue();
+
+ try {
+ setParameter(stmt, ph.attributeName, index, ph.value, ph.jdbcType);
+ } catch (final NumberFormatException nfe) {
+ throw new SQLDataException("The value of the " + ph.attributeName + " is '" + ph.value + "', which cannot be converted into the necessary data jdbcType", nfe);
+ }
+ }
+ return base + paramCount;
+ }
+
+ /**
+ * Determines how to map the given value to the appropriate JDBC data jdbcType and sets the parameter on the
+ * provided PreparedStatement
+ *
+ * @param stmt the PreparedStatement to set the parameter on
+ * @param attrName the name of the attribute that the parameter is coming from - for logging purposes
+ * @param parameterIndex the index of the HiveQL parameter to set
+ * @param parameterValue the value of the HiveQL parameter to set
+ * @param jdbcType the JDBC Type of the HiveQL parameter to set
+ * @throws SQLException if the PreparedStatement throws a SQLException when calling the appropriate setter
+ */
+ protected void setParameter(final PreparedStatement stmt, final String attrName, final int parameterIndex, final String parameterValue, final int jdbcType) throws SQLException {
+ if (parameterValue == null) {
+ stmt.setNull(parameterIndex, jdbcType);
+ } else {
+ try {
+ switch (jdbcType) {
+ case Types.BIT:
+ case Types.BOOLEAN:
+ stmt.setBoolean(parameterIndex, Boolean.parseBoolean(parameterValue));
+ break;
+ case Types.TINYINT:
+ stmt.setByte(parameterIndex, Byte.parseByte(parameterValue));
+ break;
+ case Types.SMALLINT:
+ stmt.setShort(parameterIndex, Short.parseShort(parameterValue));
+ break;
+ case Types.INTEGER:
+ stmt.setInt(parameterIndex, Integer.parseInt(parameterValue));
+ break;
+ case Types.BIGINT:
+ stmt.setLong(parameterIndex, Long.parseLong(parameterValue));
+ break;
+ case Types.REAL:
+ stmt.setFloat(parameterIndex, Float.parseFloat(parameterValue));
+ break;
+ case Types.FLOAT:
+ case Types.DOUBLE:
+ stmt.setDouble(parameterIndex, Double.parseDouble(parameterValue));
+ break;
+ case Types.DECIMAL:
+ case Types.NUMERIC:
+ stmt.setBigDecimal(parameterIndex, new BigDecimal(parameterValue));
+ break;
+ case Types.DATE:
+ stmt.setDate(parameterIndex, new Date(Long.parseLong(parameterValue)));
+ break;
+ case Types.TIME:
+ stmt.setTime(parameterIndex, new Time(Long.parseLong(parameterValue)));
+ break;
+ case Types.TIMESTAMP:
+ stmt.setTimestamp(parameterIndex, new Timestamp(Long.parseLong(parameterValue)));
+ break;
+ case Types.CHAR:
+ case Types.VARCHAR:
+ case Types.LONGNVARCHAR:
+ case Types.LONGVARCHAR:
+ stmt.setString(parameterIndex, parameterValue);
+ break;
+ default:
+ stmt.setObject(parameterIndex, parameterValue, jdbcType);
+ break;
+ }
+ } catch (SQLException e) {
+ // Log which attribute/parameter had an error, then rethrow to be handled at the top level
+ getLogger().error("Error setting parameter {} to value from {} ({})", new Object[]{parameterIndex, attrName, parameterValue}, e);
+ throw e;
+ }
+ }
+ }
+
+ protected static class TableName {
+ private final String database;
+ private final String table;
+ private final boolean input;
+
+ TableName(String database, String table, boolean input) {
+ this.database = database;
+ this.table = table;
+ this.input = input;
+ }
+
+ public String getDatabase() {
+ return database;
+ }
+
+ public String getTable() {
+ return table;
+ }
+
+ public boolean isInput() {
+ return input;
+ }
+
+ @Override
+ public String toString() {
+ return database == null || database.isEmpty() ? table : database + '.' + table;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ TableName tableName = (TableName) o;
+
+ if (input != tableName.input) return false;
+ if (database != null ? !database.equals(tableName.database) : tableName.database != null) return false;
+ return table.equals(tableName.table);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = database != null ? database.hashCode() : 0;
+ result = 31 * result + table.hashCode();
+ result = 31 * result + (input ? 1 : 0);
+ return result;
+ }
+ }
+
+ protected Set<TableName> findTableNames(final String query) {
+ final ASTNode node;
+ try {
+ node = new ParseDriver().parse(normalize(query));
+ } catch (ParseException e) {
+ // If failed to parse the query, just log a message, but continue.
+ getLogger().debug("Failed to parse query: {} due to {}", new Object[]{query, e}, e);
+ return Collections.emptySet();
+ }
+ final HashSet<TableName> tableNames = new HashSet<>();
+ findTableNames(node, tableNames);
+ return tableNames;
+ }
+
+ /**
+ * Normalize query.
+ * Hive resolves prepared statement parameters before executing a query,
+ * see {@link org.apache.hive.jdbc.HivePreparedStatement#updateSql(String, HashMap)} for detail.
+ * HiveParser does not expect '?' to be in a query string, and throws an Exception if there is one.
+ * In this normalize method, '?' is replaced to 'x' to avoid that.
+ */
+ private String normalize(String query) {
+ return query.replace('?', 'x');
+ }
+
+ private void findTableNames(final Object obj, final Set<TableName> tableNames) {
+ if (!(obj instanceof CommonTree)) {
+ return;
+ }
+ final CommonTree tree = (CommonTree) obj;
+ final int childCount = tree.getChildCount();
+ if ("TOK_TABNAME".equals(tree.getText())) {
+ final TableName tableName;
+ final boolean isInput = "TOK_TABREF".equals(tree.getParent().getText());
+ switch (childCount) {
+ case 1 :
+ tableName = new TableName(null, tree.getChild(0).getText(), isInput);
+ break;
+ case 2:
+ tableName = new TableName(tree.getChild(0).getText(), tree.getChild(1).getText(), isInput);
+ break;
+ default:
+ throw new IllegalStateException("TOK_TABNAME does not have expected children, childCount=" + childCount);
+ }
+ // If parent is TOK_TABREF, then it is an input table.
+ tableNames.add(tableName);
+ return;
+ }
+ for (int i = 0; i < childCount; i++) {
+ findTableNames(tree.getChild(i), tableNames);
+ }
+ }
+
+ protected Map<String, String> toQueryTableAttributes(Set<TableName> tableNames) {
+ final Map<String, String> attributes = new HashMap<>();
+ for (TableName tableName : tableNames) {
+ final String attributeName = tableName.isInput() ? ATTR_INPUT_TABLES : ATTR_OUTPUT_TABLES;
+ if (attributes.containsKey(attributeName)) {
+ attributes.put(attributeName, attributes.get(attributeName) + "," + tableName);
+ } else {
+ attributes.put(attributeName, tableName.toString());
+ }
+ }
+ return attributes;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/da99f873/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3QL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3QL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3QL.java
new file mode 100644
index 0000000..989d085
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3QL.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hive;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dbcp.hive.Hive3DBCPService;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.pattern.ErrorTypes;
+import org.apache.nifi.processor.util.pattern.ExceptionHandler;
+import org.apache.nifi.processor.util.pattern.ExceptionHandler.OnError;
+import org.apache.nifi.processor.util.pattern.PartialFunctions.FetchFlowFiles;
+import org.apache.nifi.processor.util.pattern.PartialFunctions.InitConnection;
+import org.apache.nifi.processor.util.pattern.Put;
+import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
+import org.apache.nifi.processor.util.pattern.RoutingResult;
+
+import java.nio.charset.Charset;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.SQLNonTransientException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+@SeeAlso(SelectHive3QL.class)
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"sql", "hive", "put", "database", "update", "insert"})
+@CapabilityDescription("Executes a HiveQL DDL/DML command (UPDATE, INSERT, e.g.). The content of an incoming FlowFile is expected to be the HiveQL command "
+ + "to execute. The HiveQL command may use the ? to escape parameters. In this case, the parameters to use must exist as FlowFile attributes "
+ + "with the naming convention hiveql.args.N.type and hiveql.args.N.value, where N is a positive integer. The hiveql.args.N.type is expected to be "
+ + "a number indicating the JDBC Type. The content of the FlowFile is expected to be in UTF-8 format.")
+@ReadsAttributes({
+ @ReadsAttribute(attribute = "hiveql.args.N.type", description = "Incoming FlowFiles are expected to be parametrized HiveQL statements. The type of each Parameter is specified as an integer "
+ + "that represents the JDBC Type of the parameter."),
+ @ReadsAttribute(attribute = "hiveql.args.N.value", description = "Incoming FlowFiles are expected to be parametrized HiveQL statements. The value of the Parameters are specified as "
+ + "hiveql.args.1.value, hiveql.args.2.value, hiveql.args.3.value, and so on. The type of the hiveql.args.1.value Parameter is specified by the hiveql.args.1.type attribute.")
+})
+@WritesAttributes({
+ @WritesAttribute(attribute = "query.input.tables", description = "This attribute is written on the flow files routed to the 'success' relationships, "
+ + "and contains input table names (if any) in comma delimited 'databaseName.tableName' format."),
+ @WritesAttribute(attribute = "query.output.tables", description = "This attribute is written on the flow files routed to the 'success' relationships, "
+ + "and contains the target table names in 'databaseName.tableName' format.")
+})
+public class PutHive3QL extends AbstractHive3QLProcessor {
+
+ public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+ .name("hive-batch-size")
+ .displayName("Batch Size")
+ .description("The preferred number of FlowFiles to put to the database in a single transaction")
+ .required(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("100")
+ .build();
+
+ public static final PropertyDescriptor STATEMENT_DELIMITER = new PropertyDescriptor.Builder()
+ .name("statement-delimiter")
+ .displayName("Statement Delimiter")
+ .description("Statement Delimiter used to separate SQL statements in a multiple statement script")
+ .required(true)
+ .defaultValue(";")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .build();
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("A FlowFile is routed to this relationship after the database is successfully updated")
+ .build();
+ public static final Relationship REL_RETRY = new Relationship.Builder()
+ .name("retry")
+ .description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed")
+ .build();
+ public static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("A FlowFile is routed to this relationship if the database cannot be updated and retrying the operation will also fail, "
+ + "such as an invalid query or an integrity constraint violation")
+ .build();
+
+
+ private final static List<PropertyDescriptor> propertyDescriptors;
+ private final static Set<Relationship> relationships;
+
+ /*
+ * Will ensure that the list of property descriptors is built only once.
+ * Will also create a Set of relationships
+ */
+ static {
+ List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
+ _propertyDescriptors.add(HIVE_DBCP_SERVICE);
+ _propertyDescriptors.add(BATCH_SIZE);
+ _propertyDescriptors.add(QUERY_TIMEOUT);
+ _propertyDescriptors.add(CHARSET);
+ _propertyDescriptors.add(STATEMENT_DELIMITER);
+ _propertyDescriptors.add(RollbackOnFailure.ROLLBACK_ON_FAILURE);
+ propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
+
+ Set<Relationship> _relationships = new HashSet<>();
+ _relationships.add(REL_SUCCESS);
+ _relationships.add(REL_FAILURE);
+ _relationships.add(REL_RETRY);
+ relationships = Collections.unmodifiableSet(_relationships);
+ }
+
+ private Put<FunctionContext, Connection> process;
+ private ExceptionHandler<FunctionContext> exceptionHandler;
+
+ @OnScheduled
+ public void constructProcess() {
+ exceptionHandler = new ExceptionHandler<>();
+ exceptionHandler.mapException(e -> {
+ if (e instanceof SQLNonTransientException) {
+ return ErrorTypes.InvalidInput;
+ } else if (e instanceof SQLException) {
+ return ErrorTypes.TemporalFailure;
+ } else {
+ return ErrorTypes.UnknownFailure;
+ }
+ });
+ exceptionHandler.adjustError(RollbackOnFailure.createAdjustError(getLogger()));
+
+ process = new Put<>();
+ process.setLogger(getLogger());
+ process.initConnection(initConnection);
+ process.fetchFlowFiles(fetchFlowFiles);
+ process.putFlowFile(putFlowFile);
+ process.adjustRoute(RollbackOnFailure.createAdjustRoute(REL_FAILURE, REL_RETRY));
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return propertyDescriptors;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ private class FunctionContext extends RollbackOnFailure {
+ final Charset charset;
+ final String statementDelimiter;
+ final long startNanos = System.nanoTime();
+
+ String connectionUrl;
+
+
+ private FunctionContext(boolean rollbackOnFailure, Charset charset, String statementDelimiter) {
+ super(rollbackOnFailure, false);
+ this.charset = charset;
+ this.statementDelimiter = statementDelimiter;
+ }
+ }
+
+ private InitConnection<FunctionContext, Connection> initConnection = (context, session, fc, ff) -> {
+ final Hive3DBCPService dbcpService = context.getProperty(HIVE_DBCP_SERVICE).asControllerService(Hive3DBCPService.class);
+ final Connection connection = dbcpService.getConnection();
+ fc.connectionUrl = dbcpService.getConnectionURL();
+ return connection;
+ };
+
+ private FetchFlowFiles<FunctionContext> fetchFlowFiles = (context, session, functionContext, result) -> {
+ final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+ return session.get(batchSize);
+ };
+
+ private Put.PutFlowFile<FunctionContext, Connection> putFlowFile = (context, session, fc, conn, flowFile, result) -> {
+ final String script = getHiveQL(session, flowFile, fc.charset);
+ String regex = "(?<!\\\\)" + Pattern.quote(fc.statementDelimiter);
+
+ String[] hiveQLs = script.split(regex);
+
+ final Set<TableName> tableNames = new HashSet<>();
+ exceptionHandler.execute(fc, flowFile, input -> {
+ int loc = 1;
+ for (String hiveQLStr: hiveQLs) {
+ getLogger().debug("HiveQL: {}", new Object[]{hiveQLStr});
+
+ final String hiveQL = hiveQLStr.trim();
+ if (!StringUtils.isEmpty(hiveQL)) {
+ final PreparedStatement stmt = conn.prepareStatement(hiveQL);
+
+ // Get ParameterMetadata
+ // Hive JDBC Doesn't support this yet:
+ // ParameterMetaData pmd = stmt.getParameterMetaData();
+ // int paramCount = pmd.getParameterCount();
+ int paramCount = StringUtils.countMatches(hiveQL, "?");
+
+ if (paramCount > 0) {
+ loc = setParameters(loc, stmt, paramCount, flowFile.getAttributes());
+ }
+
+ // Parse hiveQL and extract input/output tables
+ try {
+ tableNames.addAll(findTableNames(hiveQL));
+ } catch (Exception e) {
+ // If failed to parse the query, just log a warning message, but continue.
+ getLogger().warn("Failed to parse hiveQL: {} due to {}", new Object[]{hiveQL, e}, e);
+ }
+
+ stmt.setQueryTimeout(context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(flowFile).asInteger());
+
+ // Execute the statement
+ stmt.execute();
+ fc.proceed();
+ }
+ }
+
+ // Emit a Provenance SEND event
+ final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - fc.startNanos);
+
+ final FlowFile updatedFlowFile = session.putAllAttributes(flowFile, toQueryTableAttributes(tableNames));
+ session.getProvenanceReporter().send(updatedFlowFile, fc.connectionUrl, transmissionMillis, true);
+ result.routeTo(flowFile, REL_SUCCESS);
+
+ }, onFlowFileError(context, session, result));
+
+ };
+
+ private OnError<FunctionContext, FlowFile> onFlowFileError(final ProcessContext context, final ProcessSession session, final RoutingResult result) {
+ OnError<FunctionContext, FlowFile> onFlowFileError = ExceptionHandler.createOnError(context, session, result, REL_FAILURE, REL_RETRY);
+ onFlowFileError = onFlowFileError.andThen((c, i, r, e) -> {
+ switch (r.destination()) {
+ case Failure:
+ getLogger().error("Failed to update Hive for {} due to {}; routing to failure", new Object[] {i, e}, e);
+ break;
+ case Retry:
+ getLogger().error("Failed to update Hive for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry",
+ new Object[] {i, e}, e);
+ break;
+ }
+ });
+ return RollbackOnFailure.createOnError(onFlowFileError);
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
+ final Boolean rollbackOnFailure = context.getProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE).asBoolean();
+ final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue());
+ final String statementDelimiter = context.getProperty(STATEMENT_DELIMITER).getValue();
+ final FunctionContext functionContext = new FunctionContext(rollbackOnFailure, charset, statementDelimiter);
+ RollbackOnFailure.onTrigger(context, sessionFactory, functionContext, getLogger(), session -> process.onTrigger(context, session, functionContext));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/da99f873/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
new file mode 100644
index 0000000..664915c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
@@ -0,0 +1,560 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hive;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.common.util.ShutdownHookManager;
+import org.apache.hive.streaming.ConnectionError;
+import org.apache.hive.streaming.HiveStreamingConnection;
+import org.apache.hive.streaming.InvalidTable;
+import org.apache.hive.streaming.SerializationError;
+import org.apache.hive.streaming.StreamingConnection;
+import org.apache.hive.streaming.StreamingException;
+import org.apache.hive.streaming.StreamingIOFailure;
+import org.apache.hive.streaming.TransactionError;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.hadoop.SecurityUtil;
+import org.apache.nifi.kerberos.KerberosCredentialsService;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.pattern.DiscontinuedException;
+import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
+import org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.util.hive.AuthenticationFailedException;
+import org.apache.nifi.util.hive.HiveConfigurator;
+import org.apache.nifi.util.hive.HiveOptions;
+import org.apache.hive.streaming.HiveRecordWriter;
+import org.apache.nifi.util.hive.HiveUtils;
+import org.apache.nifi.util.hive.ValidationResources;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.processors.hive.AbstractHive3QLProcessor.ATTR_OUTPUT_TABLES;
+
+@Tags({"hive", "streaming", "put", "database", "store"})
+@CapabilityDescription("This processor uses Hive Streaming to send flow file records to an Apache Hive 3.0+ table. "
+ + "The partition values are expected to be the 'last' fields of each record, so if the table is partitioned on column A for example, then the last field in "
+ + "each record should be field A.")
+@WritesAttributes({
+ @WritesAttribute(attribute = "hivestreaming.record.count", description = "This attribute is written on the flow files routed to the 'success' "
+ + "and 'failure' relationships, and contains the number of records from the incoming flow file. All records in a flow file are committed as a single transaction."),
+ @WritesAttribute(attribute = "query.output.tables", description = "This attribute is written on the flow files routed to the 'success' "
+ + "and 'failure' relationships, and contains the target table name in 'databaseName.tableName' format.")
+})
+@RequiresInstanceClassLoading
+public class PutHive3Streaming extends AbstractProcessor {
+ // Attributes
+ public static final String HIVE_STREAMING_RECORD_COUNT_ATTR = "hivestreaming.record.count";
+
+ private static final String CLIENT_CACHE_DISABLED_PROPERTY = "hcatalog.hive.client.cache.disabled";
+
+ // Properties
+ static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+ .name("record-reader")
+ .displayName("Record Reader")
+ .description("The service for reading records from incoming flow files.")
+ .identifiesControllerService(RecordReaderFactory.class)
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor METASTORE_URI = new PropertyDescriptor.Builder()
+ .name("hive3-stream-metastore-uri")
+ .displayName("Hive Metastore URI")
+ .description("The URI location for the Hive Metastore. Note that this is not the location of the Hive Server. The default port for the "
+ + "Hive metastore is 9043.")
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.URI_VALIDATOR)
+ .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))) // no start with / or end with /
+ .build();
+
+ static final PropertyDescriptor HIVE_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder()
+ .name("hive3-config-resources")
+ .displayName("Hive Configuration Resources")
+ .description("A file or comma separated list of files which contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop "
+ + "will search the classpath for a 'hive-site.xml' file or will revert to a default configuration. Note that to enable authentication "
+ + "with Kerberos e.g., the appropriate properties must be set in the configuration files. Also note that if Max Concurrent Tasks is set "
+ + "to a number greater than one, the 'hcatalog.hive.client.cache.disabled' property will be forced to 'true' to avoid concurrency issues. "
+ + "Please see the Hive documentation for more details.")
+ .required(false)
+ .addValidator(HiveUtils.createMultipleFilesExistValidator())
+ .build();
+
+ static final PropertyDescriptor DB_NAME = new PropertyDescriptor.Builder()
+ .name("hive3-stream-database-name")
+ .displayName("Database Name")
+ .description("The name of the database in which to put the data.")
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
+ .name("hive3-stream-table-name")
+ .displayName("Table Name")
+ .description("The name of the database table in which to put the data.")
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor PARTITION_VALUES = new PropertyDescriptor.Builder()
+ .name("hive3-stream-part-vals")
+ .displayName("Partition Values")
+ .description("Specifies a comma-separated list of the values for the partition columns of the target table. If the incoming records all have the same values "
+ + "for the partition columns, those values can be entered here, resulting in a performance gain. If specified, this property will often contain "
+ + "Expression Language, for example if PartitionRecord is upstream and two partitions 'name' and 'age' are used, then this property can be set to "
+ + "${name},${age}.")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor AUTOCREATE_PARTITIONS = new PropertyDescriptor.Builder()
+ .name("hive3-stream-autocreate-partition")
+ .displayName("Auto-Create Partitions")
+ .description("Flag indicating whether partitions should be automatically created")
+ .required(true)
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .build();
+
+ static final PropertyDescriptor CALL_TIMEOUT = new PropertyDescriptor.Builder()
+ .name("hive3-stream-call-timeout")
+ .displayName("Call Timeout")
+ .description("The number of seconds allowed for a Hive Streaming operation to complete. A value of 0 indicates the processor should wait indefinitely on operations. "
+ + "Note that although this property supports Expression Language, it will not be evaluated against incoming FlowFile attributes.")
+ .defaultValue("0")
+ .required(true)
+ .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ static final PropertyDescriptor DISABLE_STREAMING_OPTIMIZATIONS = new PropertyDescriptor.Builder()
+ .name("hive3-stream-disable-optimizations")
+ .displayName("Disable Streaming Optimizations")
+ .description("Whether to disable streaming optimizations. Disabling streaming optimizations will have significant impact to performance and memory consumption.")
+ .required(true)
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .build();
+
+
+ static final PropertyDescriptor ROLLBACK_ON_FAILURE = RollbackOnFailure.createRollbackOnFailureProperty(
+ "NOTE: When an error occurred after a Hive streaming transaction which is derived from the same input FlowFile is already committed," +
+ " (i.e. a FlowFile contains more records than 'Records per Transaction' and a failure occurred at the 2nd transaction or later)" +
+ " then the succeeded records will be transferred to 'success' relationship while the original input FlowFile stays in incoming queue." +
+ " Duplicated records can be created for the succeeded ones when the same FlowFile is processed again.");
+
+ static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
+ .name("kerberos-credentials-service")
+ .displayName("Kerberos Credentials Service")
+ .description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos")
+ .identifiesControllerService(KerberosCredentialsService.class)
+ .required(false)
+ .build();
+
+ // Relationships
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("A FlowFile containing Avro records routed to this relationship after the record has been successfully transmitted to Hive.")
+ .build();
+
+ public static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("A FlowFile containing Avro records routed to this relationship if the record could not be transmitted to Hive.")
+ .build();
+
+ public static final Relationship REL_RETRY = new Relationship.Builder()
+ .name("retry")
+ .description("The incoming FlowFile is routed to this relationship if its records cannot be transmitted to Hive. Note that "
+ + "some records may have been processed successfully, they will be routed (as Avro flow files) to the success relationship. "
+ + "The combination of the retry, success, and failure relationships indicate how many records succeeded and/or failed. This "
+ + "can be used to provide a retry capability since full rollback is not possible.")
+ .build();
+
+ private List<PropertyDescriptor> propertyDescriptors;
+ private Set<Relationship> relationships;
+
+ protected volatile HiveConfigurator hiveConfigurator = new HiveConfigurator();
+ protected volatile UserGroupInformation ugi;
+ protected volatile HiveConf hiveConfig;
+
+ protected volatile int callTimeout;
+ protected ExecutorService callTimeoutPool;
+ protected volatile boolean rollbackOnFailure;
+
+ // Holder of cached Configuration information so validation does not reload the same config over and over
+ private final AtomicReference<ValidationResources> validationResourceHolder = new AtomicReference<>();
+
+ @Override
+ protected void init(ProcessorInitializationContext context) {
+ List<PropertyDescriptor> props = new ArrayList<>();
+ props.add(RECORD_READER);
+ props.add(METASTORE_URI);
+ props.add(HIVE_CONFIGURATION_RESOURCES);
+ props.add(DB_NAME);
+ props.add(TABLE_NAME);
+ props.add(PARTITION_VALUES);
+ props.add(AUTOCREATE_PARTITIONS);
+ props.add(CALL_TIMEOUT);
+ props.add(DISABLE_STREAMING_OPTIMIZATIONS);
+ props.add(ROLLBACK_ON_FAILURE);
+ props.add(KERBEROS_CREDENTIALS_SERVICE);
+
+ propertyDescriptors = Collections.unmodifiableList(props);
+
+ Set<Relationship> _relationships = new HashSet<>();
+ _relationships.add(REL_SUCCESS);
+ _relationships.add(REL_FAILURE);
+ _relationships.add(REL_RETRY);
+ relationships = Collections.unmodifiableSet(_relationships);
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return propertyDescriptors;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
+ boolean confFileProvided = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).isSet();
+
+ final List<ValidationResult> problems = new ArrayList<>();
+
+ final KerberosCredentialsService credentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+
+ final String resolvedPrincipal = credentialsService != null ? credentialsService.getPrincipal() : null;
+ final String resolvedKeytab = credentialsService != null ? credentialsService.getKeytab() : null;
+ if (confFileProvided) {
+ final String configFiles = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
+ problems.addAll(hiveConfigurator.validate(configFiles, resolvedPrincipal, resolvedKeytab, validationResourceHolder, getLogger()));
+ }
+
+ return problems;
+ }
+
+ @OnScheduled
+ public void setup(final ProcessContext context) {
+ ComponentLog log = getLogger();
+ rollbackOnFailure = context.getProperty(ROLLBACK_ON_FAILURE).asBoolean();
+
+ final String configFiles = context.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue();
+ hiveConfig = hiveConfigurator.getConfigurationFromFiles(configFiles);
+
+ // If more than one concurrent task, force 'hcatalog.hive.client.cache.disabled' to true
+ if (context.getMaxConcurrentTasks() > 1) {
+ hiveConfig.setBoolean(CLIENT_CACHE_DISABLED_PROPERTY, true);
+ }
+
+ // add any dynamic properties to the Hive configuration
+ for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
+ final PropertyDescriptor descriptor = entry.getKey();
+ if (descriptor.isDynamic()) {
+ hiveConfig.set(descriptor.getName(), entry.getValue());
+ }
+ }
+
+ hiveConfigurator.preload(hiveConfig);
+
+ if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
+ final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+
+ final String resolvedPrincipal = credentialsService.getPrincipal();
+ final String resolvedKeytab = credentialsService.getKeytab();
+
+ log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[]{resolvedPrincipal, resolvedKeytab});
+ try {
+ ugi = hiveConfigurator.authenticate(hiveConfig, resolvedPrincipal, resolvedKeytab);
+ } catch (AuthenticationFailedException ae) {
+ throw new ProcessException("Kerberos authentication failed for Hive Streaming", ae);
+ }
+
+ log.info("Successfully logged in as principal {} with keytab {}", new Object[]{resolvedPrincipal, resolvedKeytab});
+ } else {
+ ugi = null;
+ }
+
+ callTimeout = context.getProperty(CALL_TIMEOUT).evaluateAttributeExpressions().asInteger() * 1000; // milliseconds
+ String timeoutName = "put-hive3-streaming-%d";
+ this.callTimeoutPool = Executors.newFixedThreadPool(1,
+ new ThreadFactoryBuilder().setNameFormat(timeoutName).build());
+ }
+
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+ final String dbName = context.getProperty(DB_NAME).evaluateAttributeExpressions(flowFile).getValue();
+ final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+
+ final ComponentLog log = getLogger();
+ final String metastoreUri = context.getProperty(METASTORE_URI).evaluateAttributeExpressions(flowFile).getValue();
+
+ final String partitionValuesString = context.getProperty(PARTITION_VALUES).evaluateAttributeExpressions(flowFile).getValue();
+ final boolean autoCreatePartitions = context.getProperty(AUTOCREATE_PARTITIONS).asBoolean();
+ final boolean disableStreamingOptimizations = context.getProperty(DISABLE_STREAMING_OPTIMIZATIONS).asBoolean();
+
+ HiveOptions o = new HiveOptions(metastoreUri, dbName, tableName)
+ .withHiveConf(hiveConfig)
+ .withAutoCreatePartitions(autoCreatePartitions)
+ .withCallTimeout(callTimeout)
+ .withStreamingOptimizations(!disableStreamingOptimizations);
+
+ if (!StringUtils.isEmpty(partitionValuesString)) {
+ List<String> staticPartitionValues = Arrays.stream(partitionValuesString.split(",")).filter(Objects::nonNull).map(String::trim).collect(Collectors.toList());
+ o = o.withStaticPartitionValues(staticPartitionValues);
+ }
+
+ if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
+ final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+ o = o.withKerberosPrincipal(credentialsService.getPrincipal()).withKerberosKeytab(credentialsService.getKeytab());
+ }
+
+ final HiveOptions options = o;
+
+ // Store the original class loader, then explicitly set it to this class's classloader (for use by the Hive Metastore)
+ ClassLoader originalClassloader = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+
+ StreamingConnection hiveStreamingConnection = null;
+
+ try (final InputStream rawIn = session.read(flowFile)) {
+ final RecordReader reader;
+
+ try (final BufferedInputStream in = new BufferedInputStream(rawIn)) {
+
+ // if we fail to create the RecordReader then we want to route to failure, so we need to
+ // handle this separately from the other IOExceptions which normally route to retry
+ try {
+ reader = recordReaderFactory.createRecordReader(flowFile, in, getLogger());
+ } catch (Exception e) {
+ throw new RecordReaderFactoryException("Unable to create RecordReader", e);
+ }
+
+ hiveStreamingConnection = makeStreamingConnection(options, reader);
+ // Add shutdown handler with higher priority than FileSystem shutdown hook so that streaming connection gets closed first before
+ // filesystem close (to avoid ClosedChannelException)
+ ShutdownHookManager.addShutdownHook(hiveStreamingConnection::close, FileSystem.SHUTDOWN_HOOK_PRIORITY + 1);
+
+ // Write records to Hive streaming, then commit and close
+ hiveStreamingConnection.beginTransaction();
+ hiveStreamingConnection.write(in);
+ hiveStreamingConnection.commitTransaction();
+ rawIn.close();
+
+ Map<String, String> updateAttributes = new HashMap<>();
+ updateAttributes.put(HIVE_STREAMING_RECORD_COUNT_ATTR, Long.toString(hiveStreamingConnection.getConnectionStats().getRecordsWritten()));
+ updateAttributes.put(ATTR_OUTPUT_TABLES, options.getQualifiedTableName());
+ flowFile = session.putAllAttributes(flowFile, updateAttributes);
+ session.getProvenanceReporter().send(flowFile, hiveStreamingConnection.getMetastoreUri());
+ session.transfer(flowFile, REL_SUCCESS);
+ } catch (TransactionError te) {
+ if (rollbackOnFailure) {
+ throw new ProcessException(te.getLocalizedMessage(), te);
+ } else {
+ throw new ShouldRetryException(te.getLocalizedMessage(), te);
+ }
+ } catch (RecordReaderFactoryException rrfe) {
+ throw new ProcessException(rrfe);
+ }
+ } catch (InvalidTable | SerializationError | StreamingIOFailure | IOException e) {
+ if (rollbackOnFailure) {
+ if (hiveStreamingConnection != null) {
+ abortConnection(hiveStreamingConnection);
+ }
+ throw new ProcessException(e.getLocalizedMessage(), e);
+ } else {
+ Map<String, String> updateAttributes = new HashMap<>();
+ updateAttributes.put(HIVE_STREAMING_RECORD_COUNT_ATTR, Long.toString(hiveStreamingConnection.getConnectionStats().getRecordsWritten()));
+ updateAttributes.put(ATTR_OUTPUT_TABLES, options.getQualifiedTableName());
+ flowFile = session.putAllAttributes(flowFile, updateAttributes);
+ session.transfer(flowFile, REL_FAILURE);
+ }
+ } catch (DiscontinuedException e) {
+ // The input FlowFile processing is discontinued. Keep it in the input queue.
+ getLogger().warn("Discontinued processing for {} due to {}", new Object[]{flowFile, e}, e);
+ session.transfer(flowFile, Relationship.SELF);
+ } catch (ConnectionError ce) {
+ // If we can't connect to the metastore, yield the processor
+ context.yield();
+ throw new ProcessException("A connection to metastore cannot be established", ce);
+ } catch (ShouldRetryException e) {
+ // This exception is already a result of adjusting an error, so simply transfer the FlowFile to retry. Still need to abort the txn
+ getLogger().error(e.getLocalizedMessage(), e);
+ if (hiveStreamingConnection != null) {
+ abortConnection(hiveStreamingConnection);
+ }
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_RETRY);
+ } catch (StreamingException se) {
+ // Handle all other exceptions. These are often record-based exceptions (since Hive will throw a subclass of the exception caught above)
+ Throwable cause = se.getCause();
+ if (cause == null) cause = se;
+ // This is a failure on the incoming data, rollback on failure if specified; otherwise route to failure after penalizing (and abort txn in any case)
+ if (rollbackOnFailure) {
+ if (hiveStreamingConnection != null) {
+ abortConnection(hiveStreamingConnection);
+ }
+ throw new ProcessException(cause.getLocalizedMessage(), cause);
+ } else {
+ flowFile = session.penalize(flowFile);
+ Map<String, String> updateAttributes = new HashMap<>();
+ updateAttributes.put(HIVE_STREAMING_RECORD_COUNT_ATTR, Long.toString(hiveStreamingConnection.getConnectionStats().getRecordsWritten()));
+ updateAttributes.put(ATTR_OUTPUT_TABLES, options.getQualifiedTableName());
+ flowFile = session.putAllAttributes(flowFile, updateAttributes);
+ session.transfer(flowFile, REL_FAILURE);
+ }
+
+ } catch (Throwable t) {
+ if (hiveStreamingConnection != null) {
+ abortConnection(hiveStreamingConnection);
+ }
+ throw (t instanceof ProcessException) ? (ProcessException) t : new ProcessException(t);
+ } finally {
+ closeConnection(hiveStreamingConnection);
+ // Restore original class loader, might not be necessary but is good practice since the processor task changed it
+ Thread.currentThread().setContextClassLoader(originalClassloader);
+ }
+ }
+
+ StreamingConnection makeStreamingConnection(HiveOptions options, RecordReader reader) throws StreamingException {
+ return HiveStreamingConnection.newBuilder()
+ .withDatabase(options.getDatabaseName())
+ .withTable(options.getTableName())
+ .withStaticPartitionValues(options.getStaticPartitionValues())
+ .withHiveConf(options.getHiveConf())
+ .withRecordWriter(new HiveRecordWriter(reader, getLogger()))
+ .withAgentInfo("NiFi " + this.getClass().getSimpleName() + " [" + this.getIdentifier()
+ + "] thread " + Thread.currentThread().getId() + "[" + Thread.currentThread().getName() + "]")
+ .connect();
+ }
+
+ @OnStopped
+ public void cleanup() {
+ validationResourceHolder.set(null); // trigger re-validation of resources
+
+ ComponentLog log = getLogger();
+
+ if (callTimeoutPool != null) {
+ callTimeoutPool.shutdown();
+ try {
+ while (!callTimeoutPool.isTerminated()) {
+ callTimeoutPool.awaitTermination(callTimeout, TimeUnit.MILLISECONDS);
+ }
+ } catch (Throwable t) {
+ log.warn("shutdown interrupted on " + callTimeoutPool, t);
+ }
+ callTimeoutPool = null;
+ }
+
+ ugi = null;
+ }
+
+ private void abortAndCloseConnection(StreamingConnection connection) {
+ try {
+ abortConnection(connection);
+ closeConnection(connection);
+ } catch (Exception ie) {
+ getLogger().warn("unable to close hive connections. ", ie);
+ }
+ }
+
+ /**
+ * Abort current Txn on the connection
+ */
+ private void abortConnection(StreamingConnection connection) {
+ if (connection != null) {
+ try {
+ connection.abortTransaction();
+ } catch (Exception e) {
+ getLogger().error("Failed to abort Hive Streaming transaction " + connection + " due to exception ", e);
+ }
+ }
+ }
+
+ /**
+ * Close the streaming connection
+ */
+ private void closeConnection(StreamingConnection connection) {
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (Exception e) {
+ getLogger().error("Failed to close Hive Streaming connection " + connection + " due to exception ", e);
+ }
+ }
+ }
+
+ private static class ShouldRetryException extends RuntimeException {
+ private ShouldRetryException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+}
+