You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by mattyb149 <gi...@git.apache.org> on 2016/07/27 03:29:35 UTC

[GitHub] nifi pull request #727: NIFI-1663: Add ConvertAvroToORC processor

GitHub user mattyb149 opened a pull request:

    https://github.com/apache/nifi/pull/727

    NIFI-1663: Add ConvertAvroToORC processor

    This PR is based on #706 which removed the ConvertAvroToORC processor using Hive 2.x and Apache ORC 1.x. This PR replaces that processor with one that uses Hive 1.2.1 (which includes hive-orc before it was split into its own Apache project).

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mattyb149/nifi old_orc

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/727.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #727
    
----
commit fa04188470e96d979819b7bf71723cad25c25105
Author: Matt Burgess <ma...@apache.org>
Date:   2016-07-21T15:59:41Z

    NIFI-1868: Add PutHiveStreaming processor

commit 108fb52113e57c424516fbfdcfc6b15edc34f12b
Author: Matt Burgess <ma...@apache.org>
Date:   2016-07-22T15:14:16Z

    NIFI-1868: Downgrade to Hive 1.2.1 and remove ConvertAvroToORC

commit 91de9edcbc85f90f104c24f1f18caf4f4f0283d9
Author: Matt Burgess <ma...@apache.org>
Date:   2016-07-27T03:25:11Z

    NIFI-1663: Add ConvertAvroToORC processor

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #727: NIFI-1663: Add ConvertAvroToORC processor

Posted by mattyb149 <gi...@git.apache.org>.
GitHub user mattyb149 reopened a pull request:

    https://github.com/apache/nifi/pull/727

    NIFI-1663: Add ConvertAvroToORC processor

    This PR is based on #706 which removed the ConvertAvroToORC processor using Hive 2.x and Apache ORC 1.x. This PR replaces that processor with one that uses Hive 1.2.1 (which includes hive-orc before it was split into its own Apache project).

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mattyb149/nifi old_orc

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/727.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #727
    
----
commit c550fdb1f3a6a140647eb85de273e6d20fc9bb4b
Author: Matt Burgess <ma...@apache.org>
Date:   2016-07-27T03:25:11Z

    NIFI-1663: Add ConvertAvroToORC processor

commit db7118780fe5cd16cfe919bfce171a453089ee35
Author: Matt Burgess <ma...@apache.org>
Date:   2016-08-04T13:59:21Z

    NIFI-1663: Updated NifiOrcUtils with review comments

commit 242e4dcc367c9f1b9f072dda965058c1646f25bf
Author: Matt Burgess <ma...@apache.org>
Date:   2016-08-05T21:13:00Z

    NIFI-1663: Added support to ConvertAvroToORC for nested records, added unit tests

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #727: NIFI-1663: Add ConvertAvroToORC processor

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/727#discussion_r73331884
  
    --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java ---
    @@ -0,0 +1,466 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.hadoop.hive.ql.io.orc;
    +
    +import org.apache.avro.Schema;
    +import org.apache.avro.generic.GenericData;
    +import org.apache.avro.util.Utf8;
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.hive.conf.HiveConf;
    +import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO;
    +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
    +import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
    +import org.apache.hadoop.hive.serde2.objectinspector.StructField;
    +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
    +import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
    +import org.apache.hadoop.io.BooleanWritable;
    +import org.apache.hadoop.io.BytesWritable;
    +import org.apache.hadoop.io.DoubleWritable;
    +import org.apache.hadoop.io.FloatWritable;
    +import org.apache.hadoop.io.IntWritable;
    +import org.apache.hadoop.io.LongWritable;
    +import org.apache.hadoop.io.MapWritable;
    +import org.apache.hadoop.io.Text;
    +import org.apache.hadoop.io.Writable;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.nio.ByteBuffer;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.stream.Collectors;
    +import java.util.stream.IntStream;
    +
    +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_DEFAULT_BLOCK_PADDING;
    +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_DEFAULT_BLOCK_SIZE;
    +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_DEFAULT_ROW_INDEX_STRIDE;
    +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_WRITE_FORMAT;
    +
    +/**
    + * Utility methods for ORC support (conversion from Avro, conversion to Hive types, e.g.
    + */
    +public class NiFiOrcUtils {
    +
    +    public static Object convertToORCObject(TypeInfo typeInfo, Object o) {
    +        if (o != null) {
    +            if (typeInfo instanceof UnionTypeInfo) {
    +                OrcUnion union = new OrcUnion();
    +                // Need to find which of the union types correspond to the primitive object
    +                TypeInfo objectTypeInfo = TypeInfoUtils.getTypeInfoFromObjectInspector(
    +                        ObjectInspectorFactory.getReflectionObjectInspector(o.getClass(), ObjectInspectorFactory.ObjectInspectorOptions.JAVA));
    +                List<TypeInfo> unionTypeInfos = ((UnionTypeInfo) typeInfo).getAllUnionObjectTypeInfos();
    +
    +                int index = 0;
    +                while (index < unionTypeInfos.size() && !unionTypeInfos.get(index).equals(objectTypeInfo)) {
    +                    index++;
    +                }
    +                if (index < unionTypeInfos.size()) {
    +                    union.set((byte) index, convertToORCObject(objectTypeInfo, o));
    +                } else {
    +                    throw new IllegalArgumentException("Object Type for class " + o.getClass().getName() + " not in Union declaration");
    +                }
    +                return union;
    +            }
    +            if (o instanceof Integer) {
    +                return new IntWritable((int) o);
    +            }
    +            if (o instanceof Boolean) {
    +                return new BooleanWritable((boolean) o);
    +            }
    +            if (o instanceof Long) {
    +                return new LongWritable((long) o);
    +            }
    +            if (o instanceof Float) {
    +                return new FloatWritable((float) o);
    +            }
    +            if (o instanceof Double) {
    +                return new DoubleWritable((double) o);
    +            }
    +            if (o instanceof String || o instanceof Utf8 || o instanceof GenericData.EnumSymbol) {
    +                return new Text(o.toString());
    +            }
    +            if (o instanceof ByteBuffer) {
    +                return new BytesWritable(((ByteBuffer) o).array());
    +            }
    +            if (o instanceof int[]) {
    +                int[] intArray = (int[]) o;
    +                return Arrays.stream(intArray)
    +                        .mapToObj((element) -> convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("int"), element))
    +                        .collect(Collectors.toList());
    +            }
    +            if (o instanceof long[]) {
    +                long[] longArray = (long[]) o;
    +                return Arrays.stream(longArray)
    +                        .mapToObj((element) -> convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("bigint"), element))
    +                        .collect(Collectors.toList());
    +            }
    +            if (o instanceof float[]) {
    +                float[] floatArray = (float[]) o;
    +                return IntStream.range(0, floatArray.length)
    +                        .mapToDouble(i -> floatArray[i])
    +                        .mapToObj((element) -> convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("float"), (float) element))
    +                        .collect(Collectors.toList());
    +            }
    +            if (o instanceof double[]) {
    +                double[] doubleArray = (double[]) o;
    +                return Arrays.stream(doubleArray)
    +                        .mapToObj((element) -> convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("double"), element))
    +                        .collect(Collectors.toList());
    +            }
    +            if (o instanceof boolean[]) {
    +                boolean[] booleanArray = (boolean[]) o;
    +                return IntStream.range(0, booleanArray.length)
    +                        .map(i -> booleanArray[i] ? 1 : 0)
    +                        .mapToObj((element) -> convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("boolean"), element == 1))
    +                        .collect(Collectors.toList());
    +            }
    +            if (o instanceof GenericData.Array) {
    +                GenericData.Array array = ((GenericData.Array) o);
    +                // The type information in this case is interpreted as a List
    +                TypeInfo listTypeInfo = ((ListTypeInfo) typeInfo).getListElementTypeInfo();
    +                return array.stream().map((element) -> convertToORCObject(listTypeInfo, element)).collect(Collectors.toList());
    +            }
    +            if (o instanceof List) {
    +                return o;
    +            }
    +            if (o instanceof Map) {
    +                MapWritable mapWritable = new MapWritable();
    +                TypeInfo keyInfo = ((MapTypeInfo) typeInfo).getMapKeyTypeInfo();
    +                TypeInfo valueInfo = ((MapTypeInfo) typeInfo).getMapKeyTypeInfo();
    +                // Unions are not allowed as key/value types, so if we convert the key and value objects,
    +                // they should return Writable objects
    +                ((Map) o).forEach((key, value) -> {
    +                    Object keyObject = convertToORCObject(keyInfo, key);
    +                    Object valueObject = convertToORCObject(valueInfo, value);
    +                    if (keyObject == null
    +                            || !(keyObject instanceof Writable)
    +                            || !(valueObject instanceof Writable)
    +                            ) {
    +                        throw new IllegalArgumentException("Maps may only contain Writable types, and the key cannot be null");
    +                    }
    +                    mapWritable.put((Writable) keyObject, (Writable) valueObject);
    +                });
    +                return mapWritable;
    +            }
    +
    +        }
    +        return null;
    --- End diff --
    
    I'll take a look, probably should be an IllegalArgumentException or something.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #727: NIFI-1663: Add ConvertAvroToORC processor

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on the issue:

    https://github.com/apache/nifi/pull/727
  
    Closing the PR and cancelling the current patch as there are issues with complex types, will reopen once fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #727: NIFI-1663: Add ConvertAvroToORC processor

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/727#discussion_r73255072
  
    --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java ---
    @@ -0,0 +1,657 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.hive;
    +
    +import com.google.common.util.concurrent.ThreadFactoryBuilder;
    +import org.apache.avro.Schema;
    +import org.apache.avro.file.DataFileStream;
    +import org.apache.avro.generic.GenericDatumReader;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.security.UserGroupInformation;
    +import org.apache.hive.hcatalog.streaming.ConnectionError;
    +import org.apache.hive.hcatalog.streaming.HiveEndPoint;
    +import org.apache.hive.hcatalog.streaming.SerializationError;
    +import org.apache.hive.hcatalog.streaming.StreamingException;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.hadoop.KerberosProperties;
    +import org.apache.nifi.hadoop.SecurityUtil;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.util.NiFiProperties;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.nifi.util.hive.AuthenticationFailedException;
    +import org.apache.nifi.util.hive.HiveConfigurator;
    +import org.apache.nifi.util.hive.HiveOptions;
    +import org.apache.nifi.util.hive.HiveUtils;
    +import org.apache.nifi.util.hive.HiveWriter;
    +import org.json.JSONException;
    +import org.json.JSONObject;
    +
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.Timer;
    +import java.util.TimerTask;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.regex.Pattern;
    +
    +/**
    + * This processor utilizes the Hive Streaming capability to insert data from the flow into a Hive database table.
    + */
    +@Tags({"hive", "streaming", "put", "database", "store"})
    +@CapabilityDescription("This processor uses Hive Streaming to send flow file data to an Apache Hive table. The incoming flow file is expected to be in "
    +        + "Avro format and the table must exist in Hive. Please see the Hive documentation for requirements on the Hive table (format, partitions, etc.). "
    +        + "The partition values are extracted from the Avro record based on the names of the partition columns as specified in the processor. ")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "hivestreaming.record.count", description = "The number of records from this flow file written using Hive Streaming.")
    +})
    +public class PutHiveStreaming extends AbstractProcessor {
    +
    +    // Attributes
    +    public static final String HIVE_STREAMING_RECORD_COUNT_ATTR = "hivestreaming.record.count";
    +
    +    // Validators
    +    private static final Validator GREATER_THAN_ONE_VALIDATOR = (subject, value, context) -> {
    +        if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
    +            return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
    +        }
    +
    +        String reason = null;
    +        try {
    +            final int intVal = Integer.parseInt(value);
    +
    +            if (intVal < 2) {
    +                reason = "value is less than 2";
    +            }
    +        } catch (final NumberFormatException e) {
    +            reason = "value is not a valid integer";
    +        }
    +
    +        return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build();
    +    };
    +
    +    // Properties
    +    public static final PropertyDescriptor METASTORE_URI = new PropertyDescriptor.Builder()
    +            .name("hive-stream-metastore-uri")
    +            .displayName("Hive Metastore URI")
    +            .description("The URI location for the Hive Metastore. Note that this is not the location of the Hive Server. The default port for the "
    +                    + "Hive metastore is 9043.")
    +            .required(true)
    +            .addValidator(StandardValidators.URI_VALIDATOR)
    +            .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))) // no start with / or end with /
    +            .build();
    +
    +    public static final PropertyDescriptor HIVE_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder()
    +            .name("hive-config-resources")
    +            .displayName("Hive Configuration Resources")
    +            .description("A file or comma separated list of files which contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop "
    +                    + "will search the classpath for a 'hive-site.xml' file or will revert to a default configuration. Note that to enable authentication "
    +                    + "with Kerberos e.g., the appropriate properties must be set in the configuration files. Please see the Hive documentation for more details.")
    +            .required(false)
    +            .addValidator(HiveUtils.createMultipleFilesExistValidator())
    +            .build();
    +
    +    public static final PropertyDescriptor DB_NAME = new PropertyDescriptor.Builder()
    +            .name("hive-stream-database-name")
    +            .displayName("Database Name")
    +            .description("The name of the database in which to put the data.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
    +            .name("hive-stream-table-name")
    +            .displayName("Table Name")
    +            .description("The name of the database table in which to put the data.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PARTITION_COLUMNS = new PropertyDescriptor.Builder()
    +            .name("hive-stream-partition-cols")
    +            .displayName("Partition Columns")
    +            .description("A comma-delimited list of column names on which the table has been partitioned. The order of values in this list must "
    +                    + "correspond exactly to the order of partition columns specified during the table creation.")
    +            .required(false)
    +            .expressionLanguageSupported(false)
    +            .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("[^,]+(,[^,]+)*"))) // comma-separated list with non-empty entries
    +            .build();
    +
    +    public static final PropertyDescriptor AUTOCREATE_PARTITIONS = new PropertyDescriptor.Builder()
    +            .name("hive-stream-autocreate-partition")
    +            .displayName("Auto-Create Partitions")
    +            .description("Flag indicating whether partitions should be automatically created")
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_OPEN_CONNECTIONS = new PropertyDescriptor.Builder()
    +            .name("hive-stream-max-open-connections")
    +            .displayName("Max Open Connections")
    +            .description("The maximum number of open connections that can be allocated from this pool at the same time, "
    +                    + "or negative for no limit.")
    +            .defaultValue("8")
    +            .required(true)
    +            .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +            .sensitive(false)
    +            .build();
    +
    +    public static final PropertyDescriptor HEARTBEAT_INTERVAL = new PropertyDescriptor.Builder()
    +            .name("hive-stream-heartbeat-interval")
    +            .displayName("Heartbeat Interval")
    +            .description("Indicates that a heartbeat should be sent when the specified number of seconds has elapsed. "
    +                    + "A value of 0 indicates that no heartbeat should be sent.")
    +            .defaultValue("60")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .sensitive(false)
    +            .build();
    +
    +    public static final PropertyDescriptor TXNS_PER_BATCH = new PropertyDescriptor.Builder()
    +            .name("hive-stream-transactions-per-batch")
    +            .displayName("Transactions per Batch")
    +            .description("A hint to Hive Streaming indicating how many transactions the processor task will need. This value must be greater than 1.")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .addValidator(GREATER_THAN_ONE_VALIDATOR)
    +            .defaultValue("100")
    +            .build();
    +
    +    // Relationships
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("A FlowFile is routed to this relationship after the database is successfully updated")
    +            .build();
    +    public static final Relationship REL_RETRY = new Relationship.Builder()
    --- End diff --
    
    Hmm, I wonder if this is really needed especially when REL_FAILURE could be looped back to the processor. Am I missing something?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #727: NIFI-1663: Add ConvertAvroToORC processor

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/727#discussion_r73254533
  
    --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java ---
    @@ -160,22 +162,9 @@
     
             if (confFileProvided) {
                 final String configFiles = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue();
    -            ValidationResources resources = validationResourceHolder.get();
    -
    -            // if no resources in the holder, or if the holder has different resources loaded,
    -            // then load the Configuration and set the new resources in the holder
    -            if (resources == null || !configFiles.equals(resources.getConfigResources())) {
    -                getLogger().debug("Reloading validation resources");
    -                resources = new ValidationResources(configFiles, HiveJdbcCommon.getConfigurationFromFiles(configFiles));
    -                validationResourceHolder.set(resources);
    -            }
    -
    -            final Configuration hiveConfig = resources.getConfiguration();
                 final String principal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).getValue();
    -            final String keytab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).getValue();
    -
    -            problems.addAll(KerberosProperties.validatePrincipalAndKeytab(
    -                    this.getClass().getSimpleName(), hiveConfig, principal, keytab, getLogger()));
    +            final String keyTab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).getValue();
    --- End diff --
    
    Can this be null?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #727: NIFI-1663: Add ConvertAvroToORC processor

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/727#discussion_r73331820
  
    --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java ---
    @@ -0,0 +1,466 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.hadoop.hive.ql.io.orc;
    +
    +import org.apache.avro.Schema;
    +import org.apache.avro.generic.GenericData;
    +import org.apache.avro.util.Utf8;
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.hive.conf.HiveConf;
    +import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO;
    +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
    +import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
    +import org.apache.hadoop.hive.serde2.objectinspector.StructField;
    +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
    +import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
    +import org.apache.hadoop.io.BooleanWritable;
    +import org.apache.hadoop.io.BytesWritable;
    +import org.apache.hadoop.io.DoubleWritable;
    +import org.apache.hadoop.io.FloatWritable;
    +import org.apache.hadoop.io.IntWritable;
    +import org.apache.hadoop.io.LongWritable;
    +import org.apache.hadoop.io.MapWritable;
    +import org.apache.hadoop.io.Text;
    +import org.apache.hadoop.io.Writable;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.nio.ByteBuffer;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.stream.Collectors;
    +import java.util.stream.IntStream;
    +
    +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_DEFAULT_BLOCK_PADDING;
    +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_DEFAULT_BLOCK_SIZE;
    +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_DEFAULT_ROW_INDEX_STRIDE;
    +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_WRITE_FORMAT;
    +
    +/**
    + * Utility methods for ORC support (conversion from Avro, conversion to Hive types, e.g.
    + */
    +public class NiFiOrcUtils {
    --- End diff --
    
    No, I will reduce the visibility. For util classes I usually start them as public, but since it's unlikely this NAR will be the parent/dependency of another, public is not needed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #727: NIFI-1663: Add ConvertAvroToORC processor

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/727#discussion_r73255530
  
    --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java ---
    @@ -0,0 +1,657 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.hive;
    +
    +import com.google.common.util.concurrent.ThreadFactoryBuilder;
    +import org.apache.avro.Schema;
    +import org.apache.avro.file.DataFileStream;
    +import org.apache.avro.generic.GenericDatumReader;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.security.UserGroupInformation;
    +import org.apache.hive.hcatalog.streaming.ConnectionError;
    +import org.apache.hive.hcatalog.streaming.HiveEndPoint;
    +import org.apache.hive.hcatalog.streaming.SerializationError;
    +import org.apache.hive.hcatalog.streaming.StreamingException;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.hadoop.KerberosProperties;
    +import org.apache.nifi.hadoop.SecurityUtil;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.util.NiFiProperties;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.nifi.util.hive.AuthenticationFailedException;
    +import org.apache.nifi.util.hive.HiveConfigurator;
    +import org.apache.nifi.util.hive.HiveOptions;
    +import org.apache.nifi.util.hive.HiveUtils;
    +import org.apache.nifi.util.hive.HiveWriter;
    +import org.json.JSONException;
    +import org.json.JSONObject;
    +
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.Timer;
    +import java.util.TimerTask;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.regex.Pattern;
    +
    +/**
    + * This processor utilizes the Hive Streaming capability to insert data from the flow into a Hive database table.
    + */
    +@Tags({"hive", "streaming", "put", "database", "store"})
    +@CapabilityDescription("This processor uses Hive Streaming to send flow file data to an Apache Hive table. The incoming flow file is expected to be in "
    +        + "Avro format and the table must exist in Hive. Please see the Hive documentation for requirements on the Hive table (format, partitions, etc.). "
    +        + "The partition values are extracted from the Avro record based on the names of the partition columns as specified in the processor. ")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "hivestreaming.record.count", description = "The number of records from this flow file written using Hive Streaming.")
    +})
    +public class PutHiveStreaming extends AbstractProcessor {
    +
    +    // Attributes
    +    public static final String HIVE_STREAMING_RECORD_COUNT_ATTR = "hivestreaming.record.count";
    +
    +    // Validators
    +    private static final Validator GREATER_THAN_ONE_VALIDATOR = (subject, value, context) -> {
    +        if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
    +            return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
    +        }
    +
    +        String reason = null;
    +        try {
    +            final int intVal = Integer.parseInt(value);
    +
    +            if (intVal < 2) {
    +                reason = "value is less than 2";
    +            }
    +        } catch (final NumberFormatException e) {
    +            reason = "value is not a valid integer";
    +        }
    +
    +        return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build();
    +    };
    +
    +    // Properties
    +    public static final PropertyDescriptor METASTORE_URI = new PropertyDescriptor.Builder()
    +            .name("hive-stream-metastore-uri")
    +            .displayName("Hive Metastore URI")
    +            .description("The URI location for the Hive Metastore. Note that this is not the location of the Hive Server. The default port for the "
    +                    + "Hive metastore is 9043.")
    +            .required(true)
    +            .addValidator(StandardValidators.URI_VALIDATOR)
    +            .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))) // no start with / or end with /
    +            .build();
    +
    +    public static final PropertyDescriptor HIVE_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder()
    +            .name("hive-config-resources")
    +            .displayName("Hive Configuration Resources")
    +            .description("A file or comma separated list of files which contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop "
    +                    + "will search the classpath for a 'hive-site.xml' file or will revert to a default configuration. Note that to enable authentication "
    +                    + "with Kerberos e.g., the appropriate properties must be set in the configuration files. Please see the Hive documentation for more details.")
    +            .required(false)
    +            .addValidator(HiveUtils.createMultipleFilesExistValidator())
    +            .build();
    +
    +    public static final PropertyDescriptor DB_NAME = new PropertyDescriptor.Builder()
    +            .name("hive-stream-database-name")
    +            .displayName("Database Name")
    +            .description("The name of the database in which to put the data.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
    +            .name("hive-stream-table-name")
    +            .displayName("Table Name")
    +            .description("The name of the database table in which to put the data.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PARTITION_COLUMNS = new PropertyDescriptor.Builder()
    +            .name("hive-stream-partition-cols")
    +            .displayName("Partition Columns")
    +            .description("A comma-delimited list of column names on which the table has been partitioned. The order of values in this list must "
    +                    + "correspond exactly to the order of partition columns specified during the table creation.")
    +            .required(false)
    +            .expressionLanguageSupported(false)
    +            .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("[^,]+(,[^,]+)*"))) // comma-separated list with non-empty entries
    +            .build();
    +
    +    public static final PropertyDescriptor AUTOCREATE_PARTITIONS = new PropertyDescriptor.Builder()
    +            .name("hive-stream-autocreate-partition")
    +            .displayName("Auto-Create Partitions")
    +            .description("Flag indicating whether partitions should be automatically created")
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_OPEN_CONNECTIONS = new PropertyDescriptor.Builder()
    +            .name("hive-stream-max-open-connections")
    +            .displayName("Max Open Connections")
    +            .description("The maximum number of open connections that can be allocated from this pool at the same time, "
    +                    + "or negative for no limit.")
    +            .defaultValue("8")
    +            .required(true)
    +            .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +            .sensitive(false)
    +            .build();
    +
    +    public static final PropertyDescriptor HEARTBEAT_INTERVAL = new PropertyDescriptor.Builder()
    +            .name("hive-stream-heartbeat-interval")
    +            .displayName("Heartbeat Interval")
    +            .description("Indicates that a heartbeat should be sent when the specified number of seconds has elapsed. "
    +                    + "A value of 0 indicates that no heartbeat should be sent.")
    +            .defaultValue("60")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .sensitive(false)
    +            .build();
    +
    +    public static final PropertyDescriptor TXNS_PER_BATCH = new PropertyDescriptor.Builder()
    +            .name("hive-stream-transactions-per-batch")
    +            .displayName("Transactions per Batch")
    +            .description("A hint to Hive Streaming indicating how many transactions the processor task will need. This value must be greater than 1.")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .addValidator(GREATER_THAN_ONE_VALIDATOR)
    +            .defaultValue("100")
    +            .build();
    +
    +    // Relationships
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("A FlowFile is routed to this relationship after the database is successfully updated")
    +            .build();
    +    public static final Relationship REL_RETRY = new Relationship.Builder()
    +            .name("retry")
    +            .description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed")
    +            .build();
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("A FlowFile is routed to this relationship if the database cannot be updated and retrying the operation will also fail.")
    +            .build();
    +
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +    private final static Set<Relationship> relationships;
    +
    +    private static final long TICKET_RENEWAL_PERIOD = 60000;
    +
    +    protected KerberosProperties kerberosProperties;
    +
    +    protected volatile HiveConfigurator hiveConfigurator = new HiveConfigurator();
    +    protected volatile UserGroupInformation ugi;
    +
    +    protected final AtomicBoolean isInitialized = new AtomicBoolean(false);
    +
    +    protected HiveOptions options;
    +    protected ExecutorService callTimeoutPool;
    +    protected transient Timer heartBeatTimer;
    +    protected AtomicBoolean sendHeartBeat = new AtomicBoolean(false);
    +    protected Map<HiveEndPoint, HiveWriter> allWriters;
    +
    +
    +    /*
    +     * Will ensure that the list of property descriptors is build only once.
    +     * Will also create a Set of relationships
    +     */
    +    static {
    +        propertyDescriptors = new ArrayList<>();
    +        propertyDescriptors.add(METASTORE_URI);
    +        propertyDescriptors.add(HIVE_CONFIGURATION_RESOURCES);
    +        propertyDescriptors.add(DB_NAME);
    +        propertyDescriptors.add(TABLE_NAME);
    +        propertyDescriptors.add(PARTITION_COLUMNS);
    +        propertyDescriptors.add(AUTOCREATE_PARTITIONS);
    +        propertyDescriptors.add(MAX_OPEN_CONNECTIONS);
    +        propertyDescriptors.add(HEARTBEAT_INTERVAL);
    +        propertyDescriptors.add(TXNS_PER_BATCH);
    +
    +        Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_SUCCESS);
    +        _relationships.add(REL_FAILURE);
    +        _relationships.add(REL_RETRY);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    @Override
    +    protected void init(ProcessorInitializationContext context) {
    +        kerberosProperties = getKerberosProperties();
    +        propertyDescriptors.add(kerberosProperties.getKerberosPrincipal());
    +        propertyDescriptors.add(kerberosProperties.getKerberosKeytab());
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +
    +    @OnScheduled
    +    public void setup(final ProcessContext context) {
    +        ComponentLog log = getLogger();
    +
    +        final String metastoreUri = context.getProperty(METASTORE_URI).getValue();
    +        final String dbName = context.getProperty(DB_NAME).getValue();
    +        final String tableName = context.getProperty(TABLE_NAME).getValue();
    +        final boolean autoCreatePartitions = context.getProperty(AUTOCREATE_PARTITIONS).asBoolean();
    +        final Integer maxConnections = context.getProperty(MAX_OPEN_CONNECTIONS).asInteger();
    +        final Integer heartbeatInterval = context.getProperty(HEARTBEAT_INTERVAL).asInteger();
    +        final Integer txnsPerBatch = context.getProperty(TXNS_PER_BATCH).asInteger();
    +        final String configFiles = context.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue();
    +        final Configuration hiveConfig = hiveConfigurator.getConfigurationFromFiles(configFiles);
    +
    +        // add any dynamic properties to the Hive configuration
    +        for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
    +            final PropertyDescriptor descriptor = entry.getKey();
    +            if (descriptor.isDynamic()) {
    +                hiveConfig.set(descriptor.getName(), entry.getValue());
    +            }
    +        }
    +
    +        options = new HiveOptions(metastoreUri, dbName, tableName)
    +                .withTxnsPerBatch(txnsPerBatch)
    +                .withAutoCreatePartitions(autoCreatePartitions)
    +                .withMaxOpenConnections(maxConnections)
    +                .withHeartBeatInterval(heartbeatInterval);
    +
    +        if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
    +            final String principal = context.getProperty(kerberosProperties.getKerberosPrincipal()).getValue();
    +            final String keyTab = context.getProperty(kerberosProperties.getKerberosKeytab()).getValue();
    +
    +            log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[]{principal, keyTab});
    +            try {
    +                ugi = hiveConfigurator.authenticate(hiveConfig, principal, keyTab, TICKET_RENEWAL_PERIOD, log);
    +            } catch (AuthenticationFailedException ae) {
    +                throw new ProcessException("Kerberos authentication failed for Hive Streaming", ae);
    +            }
    +            log.info("Successfully logged in as principal {} with keytab {}", new Object[]{principal, keyTab});
    +            options = options.withKerberosPrincipal(principal).withKerberosKeytab(keyTab);
    +        }
    +
    +        allWriters = new ConcurrentHashMap<>();
    +        String timeoutName = "put-hive-streaming-%d";
    +        this.callTimeoutPool = Executors.newFixedThreadPool(1,
    +                new ThreadFactoryBuilder().setNameFormat(timeoutName).build());
    +
    +        sendHeartBeat.set(true);
    +        heartBeatTimer = new Timer();
    +        setupHeartBeatTimer();
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final ComponentLog log = getLogger();
    +        try {
    +            final List<String> partitionColumnList;
    +            String partitionColumns = context.getProperty(PARTITION_COLUMNS).getValue();
    +            if (StringUtils.isEmpty(partitionColumns)) {
    +                partitionColumnList = Collections.emptyList();
    +            } else {
    +                String[] partitionCols = partitionColumns.split(",");
    +                partitionColumnList = new ArrayList<>(partitionCols.length);
    +                for (String col : partitionCols) {
    +                    partitionColumnList.add(col.trim());
    +                }
    +            }
    +
    +            // Store the original class loader, then explicitly set it to this class's classloader (for use by the Hive Metastore)
    +            ClassLoader originalClassloader = Thread.currentThread().getContextClassLoader();
    +            Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
    +
    +            int recordCount = 0;
    +            final List<HiveStreamingRecord> records = new LinkedList<>();
    +
    +            session.read(flowFile, in -> {
    +
    +                try (final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
    +
    +                    GenericRecord currRecord;
    +                    while (reader.hasNext()) {
    +                        currRecord = reader.next();
    +                        List<String> partitionValues = new ArrayList<>();
    +
    +                        for (String partition : partitionColumnList) {
    +                            Object partitionValue = currRecord.get(partition);
    +                            if (partitionValue == null) {
    +                                throw new IOException("Partition column '" + partition + "' not found in Avro record");
    +                            }
    +                            partitionValues.add(partitionValue.toString());
    +                        }
    +
    +                        List<Schema.Field> fields = currRecord.getSchema().getFields();
    +                        if (fields != null) {
    +                            JSONObject obj = new JSONObject();
    +                            for (Schema.Field field : fields) {
    +                                String fieldName = field.name();
    +                                // Skip fields that are partition columns, we extracted those values above to create an EndPoint
    +                                if (!partitionColumnList.contains(fieldName)) {
    +                                    Object value = currRecord.get(fieldName);
    +                                    try {
    +                                        obj.put(fieldName, value);
    +                                    } catch (JSONException je) {
    +                                        throw new IOException(je);
    +                                    }
    +                                }
    +                            }
    +                            records.add(new HiveStreamingRecord(partitionValues, obj));
    +                        }
    +                    }
    +                }
    +            });
    +
    +            // Write all records to Hive Streaming
    +            for (HiveStreamingRecord record : records) {
    +                HiveEndPoint endPoint = makeHiveEndPoint(record.getPartitionValues(), options);
    +                HiveWriter writer = getOrCreateWriter(endPoint);
    +                writer.write(record.getRecord().toString().getBytes(StandardCharsets.UTF_8));
    +                recordCount++;
    +            }
    +
    +            flowFile = session.putAttribute(flowFile, HIVE_STREAMING_RECORD_COUNT_ATTR, Integer.toString(recordCount));
    +            flushAllWriters(true);
    +
    +            session.getProvenanceReporter().send(flowFile, options.getMetaStoreURI());
    +            session.transfer(flowFile, REL_SUCCESS);
    +
    +            // Restore original class loader, might not be necessary but is good practice since the processor task changed it
    +            Thread.currentThread().setContextClassLoader(originalClassloader);
    --- End diff --
    
    So it won't be reset if Exception? Should it be part of finally?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #727: NIFI-1663: Add ConvertAvroToORC processor

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 closed the pull request at:

    https://github.com/apache/nifi/pull/727


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #727: NIFI-1663: Add ConvertAvroToORC processor

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/727#discussion_r73254901
  
    --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java ---
    @@ -0,0 +1,657 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.hive;
    +
    +import com.google.common.util.concurrent.ThreadFactoryBuilder;
    +import org.apache.avro.Schema;
    +import org.apache.avro.file.DataFileStream;
    +import org.apache.avro.generic.GenericDatumReader;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.security.UserGroupInformation;
    +import org.apache.hive.hcatalog.streaming.ConnectionError;
    +import org.apache.hive.hcatalog.streaming.HiveEndPoint;
    +import org.apache.hive.hcatalog.streaming.SerializationError;
    +import org.apache.hive.hcatalog.streaming.StreamingException;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.hadoop.KerberosProperties;
    +import org.apache.nifi.hadoop.SecurityUtil;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.util.NiFiProperties;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.nifi.util.hive.AuthenticationFailedException;
    +import org.apache.nifi.util.hive.HiveConfigurator;
    +import org.apache.nifi.util.hive.HiveOptions;
    +import org.apache.nifi.util.hive.HiveUtils;
    +import org.apache.nifi.util.hive.HiveWriter;
    +import org.json.JSONException;
    +import org.json.JSONObject;
    +
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.Timer;
    +import java.util.TimerTask;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.regex.Pattern;
    +
    +/**
    + * This processor utilizes the Hive Streaming capability to insert data from the flow into a Hive database table.
    + */
    +@Tags({"hive", "streaming", "put", "database", "store"})
    +@CapabilityDescription("This processor uses Hive Streaming to send flow file data to an Apache Hive table. The incoming flow file is expected to be in "
    +        + "Avro format and the table must exist in Hive. Please see the Hive documentation for requirements on the Hive table (format, partitions, etc.). "
    +        + "The partition values are extracted from the Avro record based on the names of the partition columns as specified in the processor. ")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "hivestreaming.record.count", description = "The number of records from this flow file written using Hive Streaming.")
    +})
    +public class PutHiveStreaming extends AbstractProcessor {
    +
    +    // Attributes
    +    public static final String HIVE_STREAMING_RECORD_COUNT_ATTR = "hivestreaming.record.count";
    +
    +    // Validators
    +    private static final Validator GREATER_THAN_ONE_VALIDATOR = (subject, value, context) -> {
    +        if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
    +            return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
    +        }
    +
    +        String reason = null;
    +        try {
    +            final int intVal = Integer.parseInt(value);
    +
    +            if (intVal < 2) {
    +                reason = "value is less than 2";
    +            }
    +        } catch (final NumberFormatException e) {
    +            reason = "value is not a valid integer";
    +        }
    +
    +        return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build();
    +    };
    +
    +    // Properties
    +    public static final PropertyDescriptor METASTORE_URI = new PropertyDescriptor.Builder()
    +            .name("hive-stream-metastore-uri")
    +            .displayName("Hive Metastore URI")
    +            .description("The URI location for the Hive Metastore. Note that this is not the location of the Hive Server. The default port for the "
    +                    + "Hive metastore is 9043.")
    +            .required(true)
    +            .addValidator(StandardValidators.URI_VALIDATOR)
    +            .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))) // no start with / or end with /
    +            .build();
    +
    +    public static final PropertyDescriptor HIVE_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder()
    +            .name("hive-config-resources")
    +            .displayName("Hive Configuration Resources")
    +            .description("A file or comma separated list of files which contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop "
    +                    + "will search the classpath for a 'hive-site.xml' file or will revert to a default configuration. Note that to enable authentication "
    +                    + "with Kerberos e.g., the appropriate properties must be set in the configuration files. Please see the Hive documentation for more details.")
    +            .required(false)
    +            .addValidator(HiveUtils.createMultipleFilesExistValidator())
    +            .build();
    +
    +    public static final PropertyDescriptor DB_NAME = new PropertyDescriptor.Builder()
    +            .name("hive-stream-database-name")
    +            .displayName("Database Name")
    +            .description("The name of the database in which to put the data.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
    +            .name("hive-stream-table-name")
    +            .displayName("Table Name")
    +            .description("The name of the database table in which to put the data.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PARTITION_COLUMNS = new PropertyDescriptor.Builder()
    +            .name("hive-stream-partition-cols")
    +            .displayName("Partition Columns")
    +            .description("A comma-delimited list of column names on which the table has been partitioned. The order of values in this list must "
    +                    + "correspond exactly to the order of partition columns specified during the table creation.")
    +            .required(false)
    +            .expressionLanguageSupported(false)
    +            .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("[^,]+(,[^,]+)*"))) // comma-separated list with non-empty entries
    +            .build();
    +
    +    public static final PropertyDescriptor AUTOCREATE_PARTITIONS = new PropertyDescriptor.Builder()
    +            .name("hive-stream-autocreate-partition")
    +            .displayName("Auto-Create Partitions")
    +            .description("Flag indicating whether partitions should be automatically created")
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_OPEN_CONNECTIONS = new PropertyDescriptor.Builder()
    +            .name("hive-stream-max-open-connections")
    +            .displayName("Max Open Connections")
    +            .description("The maximum number of open connections that can be allocated from this pool at the same time, "
    +                    + "or negative for no limit.")
    +            .defaultValue("8")
    +            .required(true)
    +            .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +            .sensitive(false)
    --- End diff --
    
    you can remove that


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #727: NIFI-1663: Add ConvertAvroToORC processor

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/727#discussion_r73253954
  
    --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java ---
    @@ -0,0 +1,466 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.hadoop.hive.ql.io.orc;
    +
    +import org.apache.avro.Schema;
    +import org.apache.avro.generic.GenericData;
    +import org.apache.avro.util.Utf8;
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.hive.conf.HiveConf;
    +import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO;
    +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
    +import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
    +import org.apache.hadoop.hive.serde2.objectinspector.StructField;
    +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
    +import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
    +import org.apache.hadoop.io.BooleanWritable;
    +import org.apache.hadoop.io.BytesWritable;
    +import org.apache.hadoop.io.DoubleWritable;
    +import org.apache.hadoop.io.FloatWritable;
    +import org.apache.hadoop.io.IntWritable;
    +import org.apache.hadoop.io.LongWritable;
    +import org.apache.hadoop.io.MapWritable;
    +import org.apache.hadoop.io.Text;
    +import org.apache.hadoop.io.Writable;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.nio.ByteBuffer;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.stream.Collectors;
    +import java.util.stream.IntStream;
    +
    +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_DEFAULT_BLOCK_PADDING;
    +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_DEFAULT_BLOCK_SIZE;
    +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_DEFAULT_ROW_INDEX_STRIDE;
    +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_WRITE_FORMAT;
    +
    +/**
    + * Utility methods for ORC support (conversion from Avro, conversion to Hive types, e.g.
    + */
    +public class NiFiOrcUtils {
    +
    +    public static Object convertToORCObject(TypeInfo typeInfo, Object o) {
    +        if (o != null) {
    +            if (typeInfo instanceof UnionTypeInfo) {
    +                OrcUnion union = new OrcUnion();
    +                // Need to find which of the union types correspond to the primitive object
    +                TypeInfo objectTypeInfo = TypeInfoUtils.getTypeInfoFromObjectInspector(
    +                        ObjectInspectorFactory.getReflectionObjectInspector(o.getClass(), ObjectInspectorFactory.ObjectInspectorOptions.JAVA));
    +                List<TypeInfo> unionTypeInfos = ((UnionTypeInfo) typeInfo).getAllUnionObjectTypeInfos();
    +
    +                int index = 0;
    +                while (index < unionTypeInfos.size() && !unionTypeInfos.get(index).equals(objectTypeInfo)) {
    +                    index++;
    +                }
    +                if (index < unionTypeInfos.size()) {
    +                    union.set((byte) index, convertToORCObject(objectTypeInfo, o));
    +                } else {
    +                    throw new IllegalArgumentException("Object Type for class " + o.getClass().getName() + " not in Union declaration");
    +                }
    +                return union;
    +            }
    +            if (o instanceof Integer) {
    +                return new IntWritable((int) o);
    +            }
    +            if (o instanceof Boolean) {
    +                return new BooleanWritable((boolean) o);
    +            }
    +            if (o instanceof Long) {
    +                return new LongWritable((long) o);
    +            }
    +            if (o instanceof Float) {
    +                return new FloatWritable((float) o);
    +            }
    +            if (o instanceof Double) {
    +                return new DoubleWritable((double) o);
    +            }
    +            if (o instanceof String || o instanceof Utf8 || o instanceof GenericData.EnumSymbol) {
    +                return new Text(o.toString());
    +            }
    +            if (o instanceof ByteBuffer) {
    +                return new BytesWritable(((ByteBuffer) o).array());
    +            }
    +            if (o instanceof int[]) {
    +                int[] intArray = (int[]) o;
    +                return Arrays.stream(intArray)
    +                        .mapToObj((element) -> convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("int"), element))
    +                        .collect(Collectors.toList());
    +            }
    +            if (o instanceof long[]) {
    +                long[] longArray = (long[]) o;
    +                return Arrays.stream(longArray)
    +                        .mapToObj((element) -> convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("bigint"), element))
    +                        .collect(Collectors.toList());
    +            }
    +            if (o instanceof float[]) {
    +                float[] floatArray = (float[]) o;
    +                return IntStream.range(0, floatArray.length)
    +                        .mapToDouble(i -> floatArray[i])
    +                        .mapToObj((element) -> convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("float"), (float) element))
    +                        .collect(Collectors.toList());
    +            }
    +            if (o instanceof double[]) {
    +                double[] doubleArray = (double[]) o;
    +                return Arrays.stream(doubleArray)
    +                        .mapToObj((element) -> convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("double"), element))
    +                        .collect(Collectors.toList());
    +            }
    +            if (o instanceof boolean[]) {
    +                boolean[] booleanArray = (boolean[]) o;
    +                return IntStream.range(0, booleanArray.length)
    +                        .map(i -> booleanArray[i] ? 1 : 0)
    +                        .mapToObj((element) -> convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("boolean"), element == 1))
    +                        .collect(Collectors.toList());
    +            }
    +            if (o instanceof GenericData.Array) {
    +                GenericData.Array array = ((GenericData.Array) o);
    +                // The type information in this case is interpreted as a List
    +                TypeInfo listTypeInfo = ((ListTypeInfo) typeInfo).getListElementTypeInfo();
    +                return array.stream().map((element) -> convertToORCObject(listTypeInfo, element)).collect(Collectors.toList());
    +            }
    +            if (o instanceof List) {
    +                return o;
    +            }
    +            if (o instanceof Map) {
    +                MapWritable mapWritable = new MapWritable();
    +                TypeInfo keyInfo = ((MapTypeInfo) typeInfo).getMapKeyTypeInfo();
    +                TypeInfo valueInfo = ((MapTypeInfo) typeInfo).getMapKeyTypeInfo();
    +                // Unions are not allowed as key/value types, so if we convert the key and value objects,
    +                // they should return Writable objects
    +                ((Map) o).forEach((key, value) -> {
    +                    Object keyObject = convertToORCObject(keyInfo, key);
    +                    Object valueObject = convertToORCObject(valueInfo, value);
    +                    if (keyObject == null
    +                            || !(keyObject instanceof Writable)
    +                            || !(valueObject instanceof Writable)
    +                            ) {
    +                        throw new IllegalArgumentException("Maps may only contain Writable types, and the key cannot be null");
    +                    }
    +                    mapWritable.put((Writable) keyObject, (Writable) valueObject);
    +                });
    +                return mapWritable;
    +            }
    +
    +        }
    +        return null;
    --- End diff --
    
    Is this intentional or should this really result in exception or at least some warning message. Basically I am looking at is as something that could not be converted.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #727: NIFI-1663: Add ConvertAvroToORC processor

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/727#discussion_r73254202
  
    --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java ---
    @@ -0,0 +1,466 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.hadoop.hive.ql.io.orc;
    +
    +import org.apache.avro.Schema;
    +import org.apache.avro.generic.GenericData;
    +import org.apache.avro.util.Utf8;
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.hive.conf.HiveConf;
    +import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO;
    +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
    +import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
    +import org.apache.hadoop.hive.serde2.objectinspector.StructField;
    +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
    +import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
    +import org.apache.hadoop.io.BooleanWritable;
    +import org.apache.hadoop.io.BytesWritable;
    +import org.apache.hadoop.io.DoubleWritable;
    +import org.apache.hadoop.io.FloatWritable;
    +import org.apache.hadoop.io.IntWritable;
    +import org.apache.hadoop.io.LongWritable;
    +import org.apache.hadoop.io.MapWritable;
    +import org.apache.hadoop.io.Text;
    +import org.apache.hadoop.io.Writable;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.nio.ByteBuffer;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.stream.Collectors;
    +import java.util.stream.IntStream;
    +
    +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_DEFAULT_BLOCK_PADDING;
    +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_DEFAULT_BLOCK_SIZE;
    +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_DEFAULT_ROW_INDEX_STRIDE;
    +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_WRITE_FORMAT;
    +
    +/**
    + * Utility methods for ORC support (conversion from Avro, conversion to Hive types, e.g.
    + */
    +public class NiFiOrcUtils {
    +
    +    public static Object convertToORCObject(TypeInfo typeInfo, Object o) {
    +        if (o != null) {
    +            if (typeInfo instanceof UnionTypeInfo) {
    +                OrcUnion union = new OrcUnion();
    +                // Need to find which of the union types correspond to the primitive object
    +                TypeInfo objectTypeInfo = TypeInfoUtils.getTypeInfoFromObjectInspector(
    +                        ObjectInspectorFactory.getReflectionObjectInspector(o.getClass(), ObjectInspectorFactory.ObjectInspectorOptions.JAVA));
    +                List<TypeInfo> unionTypeInfos = ((UnionTypeInfo) typeInfo).getAllUnionObjectTypeInfos();
    +
    +                int index = 0;
    +                while (index < unionTypeInfos.size() && !unionTypeInfos.get(index).equals(objectTypeInfo)) {
    +                    index++;
    +                }
    +                if (index < unionTypeInfos.size()) {
    +                    union.set((byte) index, convertToORCObject(objectTypeInfo, o));
    +                } else {
    +                    throw new IllegalArgumentException("Object Type for class " + o.getClass().getName() + " not in Union declaration");
    +                }
    +                return union;
    +            }
    +            if (o instanceof Integer) {
    +                return new IntWritable((int) o);
    +            }
    +            if (o instanceof Boolean) {
    +                return new BooleanWritable((boolean) o);
    +            }
    +            if (o instanceof Long) {
    +                return new LongWritable((long) o);
    +            }
    +            if (o instanceof Float) {
    +                return new FloatWritable((float) o);
    +            }
    +            if (o instanceof Double) {
    +                return new DoubleWritable((double) o);
    +            }
    +            if (o instanceof String || o instanceof Utf8 || o instanceof GenericData.EnumSymbol) {
    +                return new Text(o.toString());
    +            }
    +            if (o instanceof ByteBuffer) {
    +                return new BytesWritable(((ByteBuffer) o).array());
    +            }
    +            if (o instanceof int[]) {
    +                int[] intArray = (int[]) o;
    +                return Arrays.stream(intArray)
    +                        .mapToObj((element) -> convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("int"), element))
    +                        .collect(Collectors.toList());
    +            }
    +            if (o instanceof long[]) {
    +                long[] longArray = (long[]) o;
    +                return Arrays.stream(longArray)
    +                        .mapToObj((element) -> convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("bigint"), element))
    +                        .collect(Collectors.toList());
    +            }
    +            if (o instanceof float[]) {
    +                float[] floatArray = (float[]) o;
    +                return IntStream.range(0, floatArray.length)
    +                        .mapToDouble(i -> floatArray[i])
    +                        .mapToObj((element) -> convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("float"), (float) element))
    +                        .collect(Collectors.toList());
    +            }
    +            if (o instanceof double[]) {
    +                double[] doubleArray = (double[]) o;
    +                return Arrays.stream(doubleArray)
    +                        .mapToObj((element) -> convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("double"), element))
    +                        .collect(Collectors.toList());
    +            }
    +            if (o instanceof boolean[]) {
    +                boolean[] booleanArray = (boolean[]) o;
    +                return IntStream.range(0, booleanArray.length)
    +                        .map(i -> booleanArray[i] ? 1 : 0)
    +                        .mapToObj((element) -> convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("boolean"), element == 1))
    +                        .collect(Collectors.toList());
    +            }
    +            if (o instanceof GenericData.Array) {
    +                GenericData.Array array = ((GenericData.Array) o);
    +                // The type information in this case is interpreted as a List
    +                TypeInfo listTypeInfo = ((ListTypeInfo) typeInfo).getListElementTypeInfo();
    +                return array.stream().map((element) -> convertToORCObject(listTypeInfo, element)).collect(Collectors.toList());
    +            }
    +            if (o instanceof List) {
    +                return o;
    +            }
    +            if (o instanceof Map) {
    +                MapWritable mapWritable = new MapWritable();
    +                TypeInfo keyInfo = ((MapTypeInfo) typeInfo).getMapKeyTypeInfo();
    +                TypeInfo valueInfo = ((MapTypeInfo) typeInfo).getMapKeyTypeInfo();
    +                // Unions are not allowed as key/value types, so if we convert the key and value objects,
    +                // they should return Writable objects
    +                ((Map) o).forEach((key, value) -> {
    +                    Object keyObject = convertToORCObject(keyInfo, key);
    +                    Object valueObject = convertToORCObject(valueInfo, value);
    +                    if (keyObject == null
    +                            || !(keyObject instanceof Writable)
    +                            || !(valueObject instanceof Writable)
    +                            ) {
    +                        throw new IllegalArgumentException("Maps may only contain Writable types, and the key cannot be null");
    +                    }
    +                    mapWritable.put((Writable) keyObject, (Writable) valueObject);
    +                });
    +                return mapWritable;
    +            }
    +
    +        }
    +        return null;
    +    }
    +
    +
    +    /**
    +     * Create an object of OrcStruct given a TypeInfo and a list of objects
    +     *
    +     * @param typeInfo The TypeInfo object representing the ORC record schema
    +     * @param objs     ORC objects/Writables
    +     * @return an OrcStruct containing the specified objects for the specified schema
    +     */
    +    public static OrcStruct createOrcStruct(TypeInfo typeInfo, Object... objs) {
    +        SettableStructObjectInspector oi = (SettableStructObjectInspector) OrcStruct
    +                .createObjectInspector(typeInfo);
    +        List<StructField> fields = (List<StructField>) oi.getAllStructFieldRefs();
    +        OrcStruct result = (OrcStruct) oi.create();
    +        result.setNumFields(fields.size());
    +        for (int i = 0; i < fields.size(); i++) {
    +            oi.setStructFieldData(result, fields.get(i), objs[i]);
    --- End diff --
    
    Hmm, asking for IndexOutOfBounds. What if 'fields' is not the same length as 'objs'?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #727: NIFI-1663: Add ConvertAvroToORC processor

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on the issue:

    https://github.com/apache/nifi/pull/727
  
    Reviewing



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #727: NIFI-1663: Add ConvertAvroToORC processor

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/727#discussion_r73332095
  
    --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java ---
    @@ -160,22 +162,9 @@
     
             if (confFileProvided) {
                 final String configFiles = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue();
    -            ValidationResources resources = validationResourceHolder.get();
    -
    -            // if no resources in the holder, or if the holder has different resources loaded,
    -            // then load the Configuration and set the new resources in the holder
    -            if (resources == null || !configFiles.equals(resources.getConfigResources())) {
    -                getLogger().debug("Reloading validation resources");
    -                resources = new ValidationResources(configFiles, HiveJdbcCommon.getConfigurationFromFiles(configFiles));
    -                validationResourceHolder.set(resources);
    -            }
    -
    -            final Configuration hiveConfig = resources.getConfiguration();
                 final String principal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).getValue();
    -            final String keytab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).getValue();
    -
    -            problems.addAll(KerberosProperties.validatePrincipalAndKeytab(
    -                    this.getClass().getSimpleName(), hiveConfig, principal, keytab, getLogger()));
    +            final String keyTab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).getValue();
    --- End diff --
    
    I think the default properties include an empty set of properties, so it shouldn't be null. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #727: NIFI-1663: Add ConvertAvroToORC processor

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/727#discussion_r73520631
  
    --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java ---
    @@ -0,0 +1,466 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.hadoop.hive.ql.io.orc;
    +
    +import org.apache.avro.Schema;
    +import org.apache.avro.generic.GenericData;
    +import org.apache.avro.util.Utf8;
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.hive.conf.HiveConf;
    +import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO;
    +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
    +import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
    +import org.apache.hadoop.hive.serde2.objectinspector.StructField;
    +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
    +import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
    +import org.apache.hadoop.io.BooleanWritable;
    +import org.apache.hadoop.io.BytesWritable;
    +import org.apache.hadoop.io.DoubleWritable;
    +import org.apache.hadoop.io.FloatWritable;
    +import org.apache.hadoop.io.IntWritable;
    +import org.apache.hadoop.io.LongWritable;
    +import org.apache.hadoop.io.MapWritable;
    +import org.apache.hadoop.io.Text;
    +import org.apache.hadoop.io.Writable;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.nio.ByteBuffer;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.stream.Collectors;
    +import java.util.stream.IntStream;
    +
    +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_DEFAULT_BLOCK_PADDING;
    +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_DEFAULT_BLOCK_SIZE;
    +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_DEFAULT_ROW_INDEX_STRIDE;
    +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_WRITE_FORMAT;
    +
    +/**
    + * Utility methods for ORC support (conversion from Avro, conversion to Hive types, e.g.
    + */
    +public class NiFiOrcUtils {
    --- End diff --
    
    Actually it does have to be public, it is referenced from multiple packages.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #727: NIFI-1663: Add ConvertAvroToORC processor

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on the issue:

    https://github.com/apache/nifi/pull/727
  
    Aside from minor stylistic things (didn't comment on them), I am +1. Verified new tests, all pass. Test coverage on the overall module is high.
    Merging


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #727: NIFI-1663: Add ConvertAvroToORC processor

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/727#discussion_r73252749
  
    --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java ---
    @@ -0,0 +1,466 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.hadoop.hive.ql.io.orc;
    +
    +import org.apache.avro.Schema;
    +import org.apache.avro.generic.GenericData;
    +import org.apache.avro.util.Utf8;
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.hive.conf.HiveConf;
    +import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO;
    +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
    +import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
    +import org.apache.hadoop.hive.serde2.objectinspector.StructField;
    +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
    +import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
    +import org.apache.hadoop.io.BooleanWritable;
    +import org.apache.hadoop.io.BytesWritable;
    +import org.apache.hadoop.io.DoubleWritable;
    +import org.apache.hadoop.io.FloatWritable;
    +import org.apache.hadoop.io.IntWritable;
    +import org.apache.hadoop.io.LongWritable;
    +import org.apache.hadoop.io.MapWritable;
    +import org.apache.hadoop.io.Text;
    +import org.apache.hadoop.io.Writable;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.nio.ByteBuffer;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.stream.Collectors;
    +import java.util.stream.IntStream;
    +
    +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_DEFAULT_BLOCK_PADDING;
    +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_DEFAULT_BLOCK_SIZE;
    +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_DEFAULT_ROW_INDEX_STRIDE;
    +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_WRITE_FORMAT;
    +
    +/**
    + * Utility methods for ORC support (conversion from Avro, conversion to Hive types, e.g.
    + */
    +public class NiFiOrcUtils {
    --- End diff --
    
    Does it have to be public?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #727: NIFI-1663: Add ConvertAvroToORC processor

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/727#discussion_r73331976
  
    --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java ---
    @@ -0,0 +1,466 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.hadoop.hive.ql.io.orc;
    +
    +import org.apache.avro.Schema;
    +import org.apache.avro.generic.GenericData;
    +import org.apache.avro.util.Utf8;
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.hive.conf.HiveConf;
    +import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO;
    +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
    +import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
    +import org.apache.hadoop.hive.serde2.objectinspector.StructField;
    +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
    +import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
    +import org.apache.hadoop.io.BooleanWritable;
    +import org.apache.hadoop.io.BytesWritable;
    +import org.apache.hadoop.io.DoubleWritable;
    +import org.apache.hadoop.io.FloatWritable;
    +import org.apache.hadoop.io.IntWritable;
    +import org.apache.hadoop.io.LongWritable;
    +import org.apache.hadoop.io.MapWritable;
    +import org.apache.hadoop.io.Text;
    +import org.apache.hadoop.io.Writable;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.nio.ByteBuffer;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.stream.Collectors;
    +import java.util.stream.IntStream;
    +
    +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_DEFAULT_BLOCK_PADDING;
    +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_DEFAULT_BLOCK_SIZE;
    +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_DEFAULT_ROW_INDEX_STRIDE;
    +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_WRITE_FORMAT;
    +
    +/**
    + * Utility methods for ORC support (conversion from Avro, conversion to Hive types, e.g.
    + */
    +public class NiFiOrcUtils {
    +
    +    public static Object convertToORCObject(TypeInfo typeInfo, Object o) {
    +        if (o != null) {
    +            if (typeInfo instanceof UnionTypeInfo) {
    +                OrcUnion union = new OrcUnion();
    +                // Need to find which of the union types correspond to the primitive object
    +                TypeInfo objectTypeInfo = TypeInfoUtils.getTypeInfoFromObjectInspector(
    +                        ObjectInspectorFactory.getReflectionObjectInspector(o.getClass(), ObjectInspectorFactory.ObjectInspectorOptions.JAVA));
    +                List<TypeInfo> unionTypeInfos = ((UnionTypeInfo) typeInfo).getAllUnionObjectTypeInfos();
    +
    +                int index = 0;
    +                while (index < unionTypeInfos.size() && !unionTypeInfos.get(index).equals(objectTypeInfo)) {
    +                    index++;
    +                }
    +                if (index < unionTypeInfos.size()) {
    +                    union.set((byte) index, convertToORCObject(objectTypeInfo, o));
    +                } else {
    +                    throw new IllegalArgumentException("Object Type for class " + o.getClass().getName() + " not in Union declaration");
    +                }
    +                return union;
    +            }
    +            if (o instanceof Integer) {
    +                return new IntWritable((int) o);
    +            }
    +            if (o instanceof Boolean) {
    +                return new BooleanWritable((boolean) o);
    +            }
    +            if (o instanceof Long) {
    +                return new LongWritable((long) o);
    +            }
    +            if (o instanceof Float) {
    +                return new FloatWritable((float) o);
    +            }
    +            if (o instanceof Double) {
    +                return new DoubleWritable((double) o);
    +            }
    +            if (o instanceof String || o instanceof Utf8 || o instanceof GenericData.EnumSymbol) {
    +                return new Text(o.toString());
    +            }
    +            if (o instanceof ByteBuffer) {
    +                return new BytesWritable(((ByteBuffer) o).array());
    +            }
    +            if (o instanceof int[]) {
    +                int[] intArray = (int[]) o;
    +                return Arrays.stream(intArray)
    +                        .mapToObj((element) -> convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("int"), element))
    +                        .collect(Collectors.toList());
    +            }
    +            if (o instanceof long[]) {
    +                long[] longArray = (long[]) o;
    +                return Arrays.stream(longArray)
    +                        .mapToObj((element) -> convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("bigint"), element))
    +                        .collect(Collectors.toList());
    +            }
    +            if (o instanceof float[]) {
    +                float[] floatArray = (float[]) o;
    +                return IntStream.range(0, floatArray.length)
    +                        .mapToDouble(i -> floatArray[i])
    +                        .mapToObj((element) -> convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("float"), (float) element))
    +                        .collect(Collectors.toList());
    +            }
    +            if (o instanceof double[]) {
    +                double[] doubleArray = (double[]) o;
    +                return Arrays.stream(doubleArray)
    +                        .mapToObj((element) -> convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("double"), element))
    +                        .collect(Collectors.toList());
    +            }
    +            if (o instanceof boolean[]) {
    +                boolean[] booleanArray = (boolean[]) o;
    +                return IntStream.range(0, booleanArray.length)
    +                        .map(i -> booleanArray[i] ? 1 : 0)
    +                        .mapToObj((element) -> convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("boolean"), element == 1))
    +                        .collect(Collectors.toList());
    +            }
    +            if (o instanceof GenericData.Array) {
    +                GenericData.Array array = ((GenericData.Array) o);
    +                // The type information in this case is interpreted as a List
    +                TypeInfo listTypeInfo = ((ListTypeInfo) typeInfo).getListElementTypeInfo();
    +                return array.stream().map((element) -> convertToORCObject(listTypeInfo, element)).collect(Collectors.toList());
    +            }
    +            if (o instanceof List) {
    +                return o;
    +            }
    +            if (o instanceof Map) {
    +                MapWritable mapWritable = new MapWritable();
    +                TypeInfo keyInfo = ((MapTypeInfo) typeInfo).getMapKeyTypeInfo();
    +                TypeInfo valueInfo = ((MapTypeInfo) typeInfo).getMapKeyTypeInfo();
    +                // Unions are not allowed as key/value types, so if we convert the key and value objects,
    +                // they should return Writable objects
    +                ((Map) o).forEach((key, value) -> {
    +                    Object keyObject = convertToORCObject(keyInfo, key);
    +                    Object valueObject = convertToORCObject(valueInfo, value);
    +                    if (keyObject == null
    +                            || !(keyObject instanceof Writable)
    +                            || !(valueObject instanceof Writable)
    +                            ) {
    +                        throw new IllegalArgumentException("Maps may only contain Writable types, and the key cannot be null");
    +                    }
    +                    mapWritable.put((Writable) keyObject, (Writable) valueObject);
    +                });
    +                return mapWritable;
    +            }
    +
    +        }
    +        return null;
    +    }
    +
    +
    +    /**
    +     * Create an object of OrcStruct given a TypeInfo and a list of objects
    +     *
    +     * @param typeInfo The TypeInfo object representing the ORC record schema
    +     * @param objs     ORC objects/Writables
    +     * @return an OrcStruct containing the specified objects for the specified schema
    +     */
    +    public static OrcStruct createOrcStruct(TypeInfo typeInfo, Object... objs) {
    +        SettableStructObjectInspector oi = (SettableStructObjectInspector) OrcStruct
    +                .createObjectInspector(typeInfo);
    +        List<StructField> fields = (List<StructField>) oi.getAllStructFieldRefs();
    +        OrcStruct result = (OrcStruct) oi.create();
    +        result.setNumFields(fields.size());
    +        for (int i = 0; i < fields.size(); i++) {
    +            oi.setStructFieldData(result, fields.get(i), objs[i]);
    --- End diff --
    
    Will add some defensive code here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #727: NIFI-1663: Add ConvertAvroToORC processor

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/nifi/pull/727


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #727: NIFI-1663: Add ConvertAvroToORC processor

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/727#discussion_r73255367
  
    --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java ---
    @@ -0,0 +1,657 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.hive;
    +
    +import com.google.common.util.concurrent.ThreadFactoryBuilder;
    +import org.apache.avro.Schema;
    +import org.apache.avro.file.DataFileStream;
    +import org.apache.avro.generic.GenericDatumReader;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.security.UserGroupInformation;
    +import org.apache.hive.hcatalog.streaming.ConnectionError;
    +import org.apache.hive.hcatalog.streaming.HiveEndPoint;
    +import org.apache.hive.hcatalog.streaming.SerializationError;
    +import org.apache.hive.hcatalog.streaming.StreamingException;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.hadoop.KerberosProperties;
    +import org.apache.nifi.hadoop.SecurityUtil;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.util.NiFiProperties;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.nifi.util.hive.AuthenticationFailedException;
    +import org.apache.nifi.util.hive.HiveConfigurator;
    +import org.apache.nifi.util.hive.HiveOptions;
    +import org.apache.nifi.util.hive.HiveUtils;
    +import org.apache.nifi.util.hive.HiveWriter;
    +import org.json.JSONException;
    +import org.json.JSONObject;
    +
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.Timer;
    +import java.util.TimerTask;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.regex.Pattern;
    +
    +/**
    + * This processor utilizes the Hive Streaming capability to insert data from the flow into a Hive database table.
    + */
    +@Tags({"hive", "streaming", "put", "database", "store"})
    +@CapabilityDescription("This processor uses Hive Streaming to send flow file data to an Apache Hive table. The incoming flow file is expected to be in "
    +        + "Avro format and the table must exist in Hive. Please see the Hive documentation for requirements on the Hive table (format, partitions, etc.). "
    +        + "The partition values are extracted from the Avro record based on the names of the partition columns as specified in the processor. ")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "hivestreaming.record.count", description = "The number of records from this flow file written using Hive Streaming.")
    +})
    +public class PutHiveStreaming extends AbstractProcessor {
    +
    +    // Attributes
    +    public static final String HIVE_STREAMING_RECORD_COUNT_ATTR = "hivestreaming.record.count";
    +
    +    // Validators
    +    private static final Validator GREATER_THAN_ONE_VALIDATOR = (subject, value, context) -> {
    +        if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
    +            return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
    +        }
    +
    +        String reason = null;
    +        try {
    +            final int intVal = Integer.parseInt(value);
    +
    +            if (intVal < 2) {
    +                reason = "value is less than 2";
    +            }
    +        } catch (final NumberFormatException e) {
    +            reason = "value is not a valid integer";
    +        }
    +
    +        return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build();
    +    };
    +
    +    // Properties
    +    public static final PropertyDescriptor METASTORE_URI = new PropertyDescriptor.Builder()
    +            .name("hive-stream-metastore-uri")
    +            .displayName("Hive Metastore URI")
    +            .description("The URI location for the Hive Metastore. Note that this is not the location of the Hive Server. The default port for the "
    +                    + "Hive metastore is 9043.")
    +            .required(true)
    +            .addValidator(StandardValidators.URI_VALIDATOR)
    +            .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))) // no start with / or end with /
    +            .build();
    +
    +    public static final PropertyDescriptor HIVE_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder()
    +            .name("hive-config-resources")
    +            .displayName("Hive Configuration Resources")
    +            .description("A file or comma separated list of files which contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop "
    +                    + "will search the classpath for a 'hive-site.xml' file or will revert to a default configuration. Note that to enable authentication "
    +                    + "with Kerberos e.g., the appropriate properties must be set in the configuration files. Please see the Hive documentation for more details.")
    +            .required(false)
    +            .addValidator(HiveUtils.createMultipleFilesExistValidator())
    +            .build();
    +
    +    public static final PropertyDescriptor DB_NAME = new PropertyDescriptor.Builder()
    +            .name("hive-stream-database-name")
    +            .displayName("Database Name")
    +            .description("The name of the database in which to put the data.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
    +            .name("hive-stream-table-name")
    +            .displayName("Table Name")
    +            .description("The name of the database table in which to put the data.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PARTITION_COLUMNS = new PropertyDescriptor.Builder()
    +            .name("hive-stream-partition-cols")
    +            .displayName("Partition Columns")
    +            .description("A comma-delimited list of column names on which the table has been partitioned. The order of values in this list must "
    +                    + "correspond exactly to the order of partition columns specified during the table creation.")
    +            .required(false)
    +            .expressionLanguageSupported(false)
    +            .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("[^,]+(,[^,]+)*"))) // comma-separated list with non-empty entries
    +            .build();
    +
    +    public static final PropertyDescriptor AUTOCREATE_PARTITIONS = new PropertyDescriptor.Builder()
    +            .name("hive-stream-autocreate-partition")
    +            .displayName("Auto-Create Partitions")
    +            .description("Flag indicating whether partitions should be automatically created")
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_OPEN_CONNECTIONS = new PropertyDescriptor.Builder()
    +            .name("hive-stream-max-open-connections")
    +            .displayName("Max Open Connections")
    +            .description("The maximum number of open connections that can be allocated from this pool at the same time, "
    +                    + "or negative for no limit.")
    +            .defaultValue("8")
    +            .required(true)
    +            .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +            .sensitive(false)
    +            .build();
    +
    +    public static final PropertyDescriptor HEARTBEAT_INTERVAL = new PropertyDescriptor.Builder()
    +            .name("hive-stream-heartbeat-interval")
    +            .displayName("Heartbeat Interval")
    +            .description("Indicates that a heartbeat should be sent when the specified number of seconds has elapsed. "
    +                    + "A value of 0 indicates that no heartbeat should be sent.")
    +            .defaultValue("60")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .sensitive(false)
    +            .build();
    +
    +    public static final PropertyDescriptor TXNS_PER_BATCH = new PropertyDescriptor.Builder()
    +            .name("hive-stream-transactions-per-batch")
    +            .displayName("Transactions per Batch")
    +            .description("A hint to Hive Streaming indicating how many transactions the processor task will need. This value must be greater than 1.")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .addValidator(GREATER_THAN_ONE_VALIDATOR)
    +            .defaultValue("100")
    +            .build();
    +
    +    // Relationships
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("A FlowFile is routed to this relationship after the database is successfully updated")
    +            .build();
    +    public static final Relationship REL_RETRY = new Relationship.Builder()
    +            .name("retry")
    +            .description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed")
    +            .build();
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("A FlowFile is routed to this relationship if the database cannot be updated and retrying the operation will also fail.")
    +            .build();
    +
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +    private final static Set<Relationship> relationships;
    +
    +    private static final long TICKET_RENEWAL_PERIOD = 60000;
    +
    +    protected KerberosProperties kerberosProperties;
    +
    +    protected volatile HiveConfigurator hiveConfigurator = new HiveConfigurator();
    +    protected volatile UserGroupInformation ugi;
    +
    +    protected final AtomicBoolean isInitialized = new AtomicBoolean(false);
    +
    +    protected HiveOptions options;
    +    protected ExecutorService callTimeoutPool;
    +    protected transient Timer heartBeatTimer;
    +    protected AtomicBoolean sendHeartBeat = new AtomicBoolean(false);
    +    protected Map<HiveEndPoint, HiveWriter> allWriters;
    +
    +
    +    /*
    +     * Will ensure that the list of property descriptors is build only once.
    +     * Will also create a Set of relationships
    +     */
    +    static {
    +        propertyDescriptors = new ArrayList<>();
    +        propertyDescriptors.add(METASTORE_URI);
    +        propertyDescriptors.add(HIVE_CONFIGURATION_RESOURCES);
    +        propertyDescriptors.add(DB_NAME);
    +        propertyDescriptors.add(TABLE_NAME);
    +        propertyDescriptors.add(PARTITION_COLUMNS);
    +        propertyDescriptors.add(AUTOCREATE_PARTITIONS);
    +        propertyDescriptors.add(MAX_OPEN_CONNECTIONS);
    +        propertyDescriptors.add(HEARTBEAT_INTERVAL);
    +        propertyDescriptors.add(TXNS_PER_BATCH);
    +
    +        Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_SUCCESS);
    +        _relationships.add(REL_FAILURE);
    +        _relationships.add(REL_RETRY);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    @Override
    +    protected void init(ProcessorInitializationContext context) {
    +        kerberosProperties = getKerberosProperties();
    +        propertyDescriptors.add(kerberosProperties.getKerberosPrincipal());
    +        propertyDescriptors.add(kerberosProperties.getKerberosKeytab());
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +
    +    @OnScheduled
    +    public void setup(final ProcessContext context) {
    +        ComponentLog log = getLogger();
    +
    +        final String metastoreUri = context.getProperty(METASTORE_URI).getValue();
    +        final String dbName = context.getProperty(DB_NAME).getValue();
    +        final String tableName = context.getProperty(TABLE_NAME).getValue();
    +        final boolean autoCreatePartitions = context.getProperty(AUTOCREATE_PARTITIONS).asBoolean();
    +        final Integer maxConnections = context.getProperty(MAX_OPEN_CONNECTIONS).asInteger();
    +        final Integer heartbeatInterval = context.getProperty(HEARTBEAT_INTERVAL).asInteger();
    +        final Integer txnsPerBatch = context.getProperty(TXNS_PER_BATCH).asInteger();
    +        final String configFiles = context.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue();
    +        final Configuration hiveConfig = hiveConfigurator.getConfigurationFromFiles(configFiles);
    +
    +        // add any dynamic properties to the Hive configuration
    +        for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
    +            final PropertyDescriptor descriptor = entry.getKey();
    +            if (descriptor.isDynamic()) {
    +                hiveConfig.set(descriptor.getName(), entry.getValue());
    +            }
    +        }
    +
    +        options = new HiveOptions(metastoreUri, dbName, tableName)
    +                .withTxnsPerBatch(txnsPerBatch)
    +                .withAutoCreatePartitions(autoCreatePartitions)
    +                .withMaxOpenConnections(maxConnections)
    +                .withHeartBeatInterval(heartbeatInterval);
    +
    +        if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
    +            final String principal = context.getProperty(kerberosProperties.getKerberosPrincipal()).getValue();
    +            final String keyTab = context.getProperty(kerberosProperties.getKerberosKeytab()).getValue();
    +
    +            log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[]{principal, keyTab});
    +            try {
    +                ugi = hiveConfigurator.authenticate(hiveConfig, principal, keyTab, TICKET_RENEWAL_PERIOD, log);
    +            } catch (AuthenticationFailedException ae) {
    +                throw new ProcessException("Kerberos authentication failed for Hive Streaming", ae);
    +            }
    +            log.info("Successfully logged in as principal {} with keytab {}", new Object[]{principal, keyTab});
    +            options = options.withKerberosPrincipal(principal).withKerberosKeytab(keyTab);
    +        }
    +
    +        allWriters = new ConcurrentHashMap<>();
    +        String timeoutName = "put-hive-streaming-%d";
    +        this.callTimeoutPool = Executors.newFixedThreadPool(1,
    +                new ThreadFactoryBuilder().setNameFormat(timeoutName).build());
    +
    +        sendHeartBeat.set(true);
    +        heartBeatTimer = new Timer();
    +        setupHeartBeatTimer();
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final ComponentLog log = getLogger();
    +        try {
    +            final List<String> partitionColumnList;
    +            String partitionColumns = context.getProperty(PARTITION_COLUMNS).getValue();
    +            if (StringUtils.isEmpty(partitionColumns)) {
    +                partitionColumnList = Collections.emptyList();
    +            } else {
    +                String[] partitionCols = partitionColumns.split(",");
    +                partitionColumnList = new ArrayList<>(partitionCols.length);
    +                for (String col : partitionCols) {
    +                    partitionColumnList.add(col.trim());
    +                }
    +            }
    +
    +            // Store the original class loader, then explicitly set it to this class's classloader (for use by the Hive Metastore)
    +            ClassLoader originalClassloader = Thread.currentThread().getContextClassLoader();
    +            Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
    +
    +            int recordCount = 0;
    +            final List<HiveStreamingRecord> records = new LinkedList<>();
    +
    +            session.read(flowFile, in -> {
    +
    +                try (final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
    --- End diff --
    
    I know using Java 8 lambdas can be addictive, . . . and therefore dangerous as it may make code less readable. And IMHO this is the case. I would personally put the code in try/catch ins a separate method and then
    ```
     session.read(flowFile, in -> fooMethod(in))
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #727: NIFI-1663: Add ConvertAvroToORC processor

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/727#discussion_r73254929
  
    --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java ---
    @@ -0,0 +1,657 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.hive;
    +
    +import com.google.common.util.concurrent.ThreadFactoryBuilder;
    +import org.apache.avro.Schema;
    +import org.apache.avro.file.DataFileStream;
    +import org.apache.avro.generic.GenericDatumReader;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.security.UserGroupInformation;
    +import org.apache.hive.hcatalog.streaming.ConnectionError;
    +import org.apache.hive.hcatalog.streaming.HiveEndPoint;
    +import org.apache.hive.hcatalog.streaming.SerializationError;
    +import org.apache.hive.hcatalog.streaming.StreamingException;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.hadoop.KerberosProperties;
    +import org.apache.nifi.hadoop.SecurityUtil;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.util.NiFiProperties;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.nifi.util.hive.AuthenticationFailedException;
    +import org.apache.nifi.util.hive.HiveConfigurator;
    +import org.apache.nifi.util.hive.HiveOptions;
    +import org.apache.nifi.util.hive.HiveUtils;
    +import org.apache.nifi.util.hive.HiveWriter;
    +import org.json.JSONException;
    +import org.json.JSONObject;
    +
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.Timer;
    +import java.util.TimerTask;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.regex.Pattern;
    +
    +/**
    + * This processor utilizes the Hive Streaming capability to insert data from the flow into a Hive database table.
    + */
    +@Tags({"hive", "streaming", "put", "database", "store"})
    +@CapabilityDescription("This processor uses Hive Streaming to send flow file data to an Apache Hive table. The incoming flow file is expected to be in "
    +        + "Avro format and the table must exist in Hive. Please see the Hive documentation for requirements on the Hive table (format, partitions, etc.). "
    +        + "The partition values are extracted from the Avro record based on the names of the partition columns as specified in the processor. ")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "hivestreaming.record.count", description = "The number of records from this flow file written using Hive Streaming.")
    +})
    +public class PutHiveStreaming extends AbstractProcessor {
    +
    +    // Attributes
    +    public static final String HIVE_STREAMING_RECORD_COUNT_ATTR = "hivestreaming.record.count";
    +
    +    // Validators
    +    private static final Validator GREATER_THAN_ONE_VALIDATOR = (subject, value, context) -> {
    +        if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
    +            return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
    +        }
    +
    +        String reason = null;
    +        try {
    +            final int intVal = Integer.parseInt(value);
    +
    +            if (intVal < 2) {
    +                reason = "value is less than 2";
    +            }
    +        } catch (final NumberFormatException e) {
    +            reason = "value is not a valid integer";
    +        }
    +
    +        return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build();
    +    };
    +
    +    // Properties
    +    public static final PropertyDescriptor METASTORE_URI = new PropertyDescriptor.Builder()
    +            .name("hive-stream-metastore-uri")
    +            .displayName("Hive Metastore URI")
    +            .description("The URI location for the Hive Metastore. Note that this is not the location of the Hive Server. The default port for the "
    +                    + "Hive metastore is 9043.")
    +            .required(true)
    +            .addValidator(StandardValidators.URI_VALIDATOR)
    +            .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))) // no start with / or end with /
    +            .build();
    +
    +    public static final PropertyDescriptor HIVE_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder()
    +            .name("hive-config-resources")
    +            .displayName("Hive Configuration Resources")
    +            .description("A file or comma separated list of files which contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop "
    +                    + "will search the classpath for a 'hive-site.xml' file or will revert to a default configuration. Note that to enable authentication "
    +                    + "with Kerberos e.g., the appropriate properties must be set in the configuration files. Please see the Hive documentation for more details.")
    +            .required(false)
    +            .addValidator(HiveUtils.createMultipleFilesExistValidator())
    +            .build();
    +
    +    public static final PropertyDescriptor DB_NAME = new PropertyDescriptor.Builder()
    +            .name("hive-stream-database-name")
    +            .displayName("Database Name")
    +            .description("The name of the database in which to put the data.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
    +            .name("hive-stream-table-name")
    +            .displayName("Table Name")
    +            .description("The name of the database table in which to put the data.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PARTITION_COLUMNS = new PropertyDescriptor.Builder()
    +            .name("hive-stream-partition-cols")
    +            .displayName("Partition Columns")
    +            .description("A comma-delimited list of column names on which the table has been partitioned. The order of values in this list must "
    +                    + "correspond exactly to the order of partition columns specified during the table creation.")
    +            .required(false)
    +            .expressionLanguageSupported(false)
    +            .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("[^,]+(,[^,]+)*"))) // comma-separated list with non-empty entries
    +            .build();
    +
    +    public static final PropertyDescriptor AUTOCREATE_PARTITIONS = new PropertyDescriptor.Builder()
    +            .name("hive-stream-autocreate-partition")
    +            .displayName("Auto-Create Partitions")
    +            .description("Flag indicating whether partitions should be automatically created")
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_OPEN_CONNECTIONS = new PropertyDescriptor.Builder()
    +            .name("hive-stream-max-open-connections")
    +            .displayName("Max Open Connections")
    +            .description("The maximum number of open connections that can be allocated from this pool at the same time, "
    +                    + "or negative for no limit.")
    +            .defaultValue("8")
    +            .required(true)
    +            .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +            .sensitive(false)
    +            .build();
    +
    +    public static final PropertyDescriptor HEARTBEAT_INTERVAL = new PropertyDescriptor.Builder()
    +            .name("hive-stream-heartbeat-interval")
    +            .displayName("Heartbeat Interval")
    +            .description("Indicates that a heartbeat should be sent when the specified number of seconds has elapsed. "
    +                    + "A value of 0 indicates that no heartbeat should be sent.")
    +            .defaultValue("60")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .sensitive(false)
    --- End diff --
    
    same here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #727: NIFI-1663: Add ConvertAvroToORC processor

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on the issue:

    https://github.com/apache/nifi/pull/727
  
    I see a few comments about PutHiveStreaming, that is part of a previous commit that is being handled under #706, it's just in here in order to inherit the downgrade of Hive. Only the last commit needs to be reviewed, I will address the other comments in the other PR


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---