You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by bbende <gi...@git.apache.org> on 2017/04/27 21:33:51 UTC

[GitHub] nifi pull request #1712: NIFI-3724 - Add Put/Fetch Parquet Processors

GitHub user bbende opened a pull request:

    https://github.com/apache/nifi/pull/1712

    NIFI-3724 - Add Put/Fetch Parquet Processors

    This PR adds a new nifi-parquet-bundle with PutParquet and FetchParquet processors. These work similar to PutHDFS and FetchHDFS, but instead read and write Records.
    
    While working on this I needed to reuse portions of the record reader/writer code, and thus refactored some of the project structure which caused many files to move around.
    
    Summary of changes:
    - Created nifi-parquet-bundle
    - Created nifi-commons/nifi-record to hold domain/API related to records
    - Created nifi-nar-bundles/nifi-extension-utils as a place for utility code specific to extensions
    - Moved nifi-commons/nifi-processor-utils under nifi-extension-utils
    - Moved nifi-commons/nifi-hadoop-utils under nifi-extension-utils
    - Create nifi-extension-utils/nifi-record-utils for utility code related records
    
    To test the Parquet processors you can create a core-site.xml with a local file system and read/write parquet to local directories:
    
    ```
    <configuration>
        <property>
            <name>fs.defaultFS</name>
            <value>file:///</value>
        </property>
    </configuration>
    ```


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/bbende/nifi parquet-bundle

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/1712.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1712
    
----
commit a35e5957f5ff8c47df5352b7b1a5ef494fed8633
Author: Bryan Bende <bb...@apache.org>
Date:   2017-04-12T22:25:31Z

    NIFI-3724 - Initial commit of Parquet bundle with PutParquet and FetchParquet
    - Creating nifi-records-utils to share utility code from record services
    - Refactoring Parquet tests to use MockRecorderParser and MockRecordWriter
    - Refactoring AbstractPutHDFSRecord to use schema access strategy
    - Adding custom validate to AbstractPutHDFSRecord and adding handling of UNION types when writing Records as Avro
    - Refactoring project structure to get CS API references out of nifi-commons, introducing nifi-extension-utils under nifi-nar-bundles

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1712: NIFI-3724 - Add Put/Fetch Parquet Processors

Posted by alopresto <gi...@git.apache.org>.
Github user alopresto commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1712#discussion_r114157766
  
    --- Diff: nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java ---
    @@ -0,0 +1,505 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.hadoop;
    +
    +import org.apache.commons.io.IOUtils;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.fs.permission.FsPermission;
    +import org.apache.hadoop.security.UserGroupInformation;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.configuration.DefaultSettings;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.FlowFileAccessException;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.hadoop.exception.FailureException;
    +import org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
    +import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter;
    +import org.apache.nifi.schema.access.SchemaAccessStrategy;
    +import org.apache.nifi.schema.access.SchemaAccessUtils;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.schemaregistry.services.SchemaRegistry;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.WriteResult;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.RecordSet;
    +import org.apache.nifi.util.StopWatch;
    +
    +import java.io.BufferedInputStream;
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.security.PrivilegedAction;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import static org.apache.nifi.schema.access.SchemaAccessUtils.HWX_CONTENT_ENCODED_SCHEMA;
    +import static org.apache.nifi.schema.access.SchemaAccessUtils.HWX_SCHEMA_REF_ATTRIBUTES;
    +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY;
    +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME;
    +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPERTY;
    +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REGISTRY;
    +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT;
    +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT_PROPERTY;
    +
    +/**
    + * Base class for processors that write Records to HDFS.
    + */
    +@TriggerWhenEmpty // trigger when empty so we have a chance to perform a Kerberos re-login
    +@DefaultSettings(yieldDuration = "100 ms") // decrease the default yield since we are triggering when empty
    +public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
    +
    +
    +    public static final PropertyDescriptor COMPRESSION_TYPE = new PropertyDescriptor.Builder()
    +            .name("compression-type")
    +            .displayName("Compression Type")
    +            .description("The type of compression for the file being written.")
    +            .required(true)
    +            .build();
    +
    +    public static final PropertyDescriptor OVERWRITE = new PropertyDescriptor.Builder()
    +            .name("overwrite")
    +            .displayName("Overwrite Files")
    +            .description("Whether or not to overwrite existing files in the same directory with the same name. When set to false, " +
    +                    "flow files will be routed to failure when a file exists in the same directory with the same name.")
    +            .allowableValues("true", "false")
    +            .defaultValue("false")
    +            .required(true)
    +            .build();
    +
    +    public static final PropertyDescriptor UMASK = new PropertyDescriptor.Builder()
    +            .name("permissions-umask")
    +            .displayName("Permissions umask")
    +            .description("A umask represented as an octal number which determines the permissions of files written to HDFS. " +
    +                    "This overrides the Hadoop Configuration dfs.umaskmode")
    +            .addValidator(HadoopValidators.UMASK_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor REMOTE_OWNER = new PropertyDescriptor.Builder()
    +            .name("remote-owner")
    +            .displayName("Remote Owner")
    +            .description("Changes the owner of the HDFS file to this value after it is written. " +
    +                    "This only works if NiFi is running as a user that has HDFS super user privilege to change owner")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor REMOTE_GROUP = new PropertyDescriptor.Builder()
    +            .name("remote-group")
    +            .displayName("Remote Group")
    +            .description("Changes the group of the HDFS file to this value after it is written. " +
    +                    "This only works if NiFi is running as a user that has HDFS super user privilege to change group")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
    +            .name("record-reader")
    +            .displayName("Record Reader")
    +            .description("The service for reading records from incoming flow files.")
    +            .identifiesControllerService(RecordReaderFactory.class)
    +            .required(true)
    +            .build();
    +
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("Flow Files that have been successfully processed are transferred to this relationship")
    +            .build();
    +
    +    public static final Relationship REL_RETRY = new Relationship.Builder()
    +            .name("retry")
    +            .description("Flow Files that could not be processed due to issues that can be retried are transferred to this relationship")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("Flow Files that could not be processed due to issue that cannot be retried are transferred to this relationship")
    +            .build();
    +
    +    public static final String RECORD_COUNT_ATTR = "record.count";
    +
    +    private volatile String remoteOwner;
    +    private volatile String remoteGroup;
    +    private volatile SchemaAccessStrategy schemaAccessStrategy;
    +
    +    private volatile Set<Relationship> putHdfsRecordRelationships;
    +    private volatile List<PropertyDescriptor> putHdfsRecordProperties;
    +
    +    private final List<AllowableValue> strategyList = Collections.unmodifiableList(Arrays.asList(
    +            SCHEMA_NAME_PROPERTY,
    +            SCHEMA_TEXT_PROPERTY,
    +            HWX_SCHEMA_REF_ATTRIBUTES,
    +            HWX_CONTENT_ENCODED_SCHEMA
    +    ));
    +
    +
    +    @Override
    +    protected final void init(final ProcessorInitializationContext context) {
    +        super.init(context);
    +
    +        final Set<Relationship> rels = new HashSet<>();
    +        rels.add(REL_SUCCESS);
    +        rels.add(REL_RETRY);
    +        rels.add(REL_FAILURE);
    +        this.putHdfsRecordRelationships = Collections.unmodifiableSet(rels);
    +
    +        final List<PropertyDescriptor> props = new ArrayList<>(properties);
    +        props.add(RECORD_READER);
    +
    +        props.add(new PropertyDescriptor.Builder()
    +                .fromPropertyDescriptor(DIRECTORY)
    +                .description("The parent directory to which files should be written. Will be created if it doesn't exist.")
    +                .build());
    +
    +        final AllowableValue[] strategies = getSchemaAccessStrategyValues().toArray(new AllowableValue[0]);
    +
    +        props.add(new PropertyDescriptor.Builder()
    +                .fromPropertyDescriptor(SCHEMA_ACCESS_STRATEGY)
    +                .description("Specifies how to obtain the schema that is to be used for writing the data.")
    +                .allowableValues(strategies)
    +                .defaultValue(getDefaultSchemaAccessStrategy().getValue())
    +                .build());
    +
    +        props.add(SCHEMA_REGISTRY);
    +        props.add(SCHEMA_NAME);
    +        props.add(SCHEMA_TEXT);
    +
    +        final AllowableValue[] compressionTypes = getCompressionTypes(context).toArray(new AllowableValue[0]);
    +
    +        props.add(new PropertyDescriptor.Builder()
    +                .fromPropertyDescriptor(COMPRESSION_TYPE)
    +                .allowableValues(compressionTypes)
    +                .defaultValue(getDefaultCompressionType(context))
    +                .build());
    +
    +        props.add(OVERWRITE);
    +        props.add(UMASK);
    +        props.add(REMOTE_GROUP);
    +        props.add(REMOTE_OWNER);
    +        props.addAll(getAdditionalProperties());
    +        this.putHdfsRecordProperties = Collections.unmodifiableList(props);
    +    }
    +
    +    protected List<AllowableValue> getSchemaAccessStrategyValues() {
    +        return strategyList;
    +    }
    +
    +    protected AllowableValue getDefaultSchemaAccessStrategy() {
    +        return SCHEMA_NAME_PROPERTY;
    +    }
    +
    +    private PropertyDescriptor getSchemaAcessStrategyDescriptor() {
    +        return getPropertyDescriptor(SCHEMA_ACCESS_STRATEGY.getName());
    +    }
    +
    +    /**
    +     * @param context the initialization context
    +     * @return the possible compression types
    +     */
    +    public abstract List<AllowableValue> getCompressionTypes(final ProcessorInitializationContext context);
    +
    +    /**
    +     * @param context the initialization context
    +     * @return the default compression type
    +     */
    +    public abstract String getDefaultCompressionType(final ProcessorInitializationContext context);
    +
    +    /**
    +     * Allows sub-classes to add additional properties, called from initialize.
    +     *
    +     * @return additional properties to add to the overall list
    +     */
    +    public List<PropertyDescriptor> getAdditionalProperties() {
    +        return Collections.emptyList();
    +    }
    +
    +    @Override
    +    public final Set<Relationship> getRelationships() {
    +        return putHdfsRecordRelationships;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +       return putHdfsRecordProperties;
    +    }
    +
    +    @Override
    +    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
    +        final String schemaAccessStrategy = validationContext.getProperty(getSchemaAcessStrategyDescriptor()).getValue();
    +        return SchemaAccessUtils.validateSchemaAccessStrategy(validationContext, schemaAccessStrategy, getSchemaAccessStrategyValues());
    +    }
    +
    +    @OnScheduled
    +    public final void onScheduled(final ProcessContext context) throws IOException {
    +        super.abstractOnScheduled(context);
    +
    +        final SchemaRegistry schemaRegistry = context.getProperty(SCHEMA_REGISTRY).asControllerService(SchemaRegistry.class);
    +
    +        final PropertyDescriptor descriptor = getPropertyDescriptor(SCHEMA_ACCESS_STRATEGY.getName());
    +        final String schemaAccess = context.getProperty(descriptor).getValue();
    +        this.schemaAccessStrategy = SchemaAccessUtils.getSchemaAccessStrategy(schemaAccess, schemaRegistry, context);
    +
    +        this.remoteOwner = context.getProperty(REMOTE_OWNER).getValue();
    +        this.remoteGroup = context.getProperty(REMOTE_GROUP).getValue();
    +
    +        // Set umask once, to avoid thread safety issues doing it in onTrigger
    +        final PropertyValue umaskProp = context.getProperty(UMASK);
    +        final short dfsUmask;
    +        if (umaskProp.isSet()) {
    +            dfsUmask = Short.parseShort(umaskProp.getValue(), 8);
    +        } else {
    +            dfsUmask = FsPermission.DEFAULT_UMASK;
    +        }
    +        final Configuration conf = getConfiguration();
    +        FsPermission.setUMask(conf, new FsPermission(dfsUmask));
    +    }
    +
    +    /**
    +     * Sub-classes provide the appropriate HDFSRecordWriter.
    +     *
    +     * @param context the process context to obtain additional configuration
    +     * @param flowFile the flow file being written
    +     * @param conf the Configuration instance
    +     * @param path the path to write to
    +     * @param schema the schema for writing
    +     * @return the HDFSRecordWriter
    +     * @throws IOException if an error occurs creating the writer or processing the schema
    +     */
    +    public abstract HDFSRecordWriter createHDFSRecordWriter(
    +            final ProcessContext context,
    +            final FlowFile flowFile,
    +            final Configuration conf,
    +            final Path path,
    +            final RecordSchema schema) throws IOException, SchemaNotFoundException;
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        // do this before getting a flow file so that we always get a chance to attempt Kerberos relogin
    +        final FileSystem fileSystem = getFileSystem();
    +        final Configuration configuration = getConfiguration();
    +        final UserGroupInformation ugi = getUserGroupInformation();
    +
    +        if (configuration == null || fileSystem == null || ugi == null) {
    +            getLogger().error("Processor not configured properly because Configuration, FileSystem, or UserGroupInformation was null");
    +            context.yield();
    +            return;
    +        }
    +
    +        final FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            context.yield();
    +            return;
    +        }
    +
    +        ugi.doAs((PrivilegedAction<Object>)() -> {
    +            Path tempDotCopyFile = null;
    +            FlowFile putFlowFile = flowFile;
    +            try {
    +                final String filenameValue = putFlowFile.getAttribute(CoreAttributes.FILENAME.key()); // TODO codec extension
    +                final String directoryValue = context.getProperty(DIRECTORY).evaluateAttributeExpressions(putFlowFile).getValue();
    +
    +                // create the directory if it doesn't exist
    +                final Path directoryPath = new Path(directoryValue);
    +                createDirectory(fileSystem, directoryPath, remoteOwner, remoteGroup);
    +
    +                // write to tempFile first and on success rename to destFile
    +                final Path tempFile = new Path(directoryPath, "." + filenameValue);
    +                final Path destFile = new Path(directoryPath, filenameValue);
    +
    +                final boolean destinationExists = fileSystem.exists(destFile) || fileSystem.exists(tempFile);
    +                final boolean shouldOverwrite = context.getProperty(OVERWRITE).asBoolean();
    +
    +                // if the tempFile or destFile already exist, and overwrite is set to false, then transfer to failure
    +                if (destinationExists && !shouldOverwrite) {
    +                    session.transfer(session.penalize(putFlowFile), REL_FAILURE);
    +                    getLogger().warn("penalizing {} and routing to failure because file with same name already exists", new Object[]{putFlowFile});
    +                    return null;
    +                }
    +
    +                final AtomicReference<Throwable> exceptionHolder = new AtomicReference<>(null);
    +                final AtomicReference<WriteResult> writeResult = new AtomicReference<>();
    +                final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
    +
    +                final FlowFile flowFileIn = putFlowFile;
    +                final StopWatch stopWatch = new StopWatch(true);
    +
    +                // Read records from the incoming FlowFile and write them the tempFile
    +                session.read(putFlowFile, (final InputStream rawIn) -> {
    +                    RecordReader recordReader = null;
    +                    HDFSRecordWriter recordWriter = null;
    +
    +                    try (final BufferedInputStream in = new BufferedInputStream(rawIn)) {
    +                        final RecordSchema destRecordSchema = schemaAccessStrategy.getSchema(flowFile, in);
    +                        recordWriter = createHDFSRecordWriter(context, flowFile, configuration, tempFile, destRecordSchema);
    +
    +                        // if we fail to create the RecordReader then we want to route to failure, so we need to
    +                        // handle this separately from the other IOExceptions which normally rout to retry
    +                        try {
    +                            recordReader = recordReaderFactory.createRecordReader(flowFileIn, in, getLogger());
    +                        } catch (Exception e) {
    +                            final RecordReaderFactoryException rrfe = new RecordReaderFactoryException("Unable to create RecordReader", e);
    +                            exceptionHolder.set(rrfe);
    +                            return;
    +                        }
    +
    +                        final RecordSet recordSet = recordReader.createRecordSet();
    +                        writeResult.set(recordWriter.write(recordSet));
    +
    +                    } catch (Exception e) {
    +                        exceptionHolder.set(e);
    +                    } finally {
    +                        IOUtils.closeQuietly(recordReader);
    +                        IOUtils.closeQuietly(recordWriter);
    +                    }
    +                });
    +                stopWatch.stop();
    +
    +                final String dataRate = stopWatch.calculateDataRate(putFlowFile.getSize());
    +                final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
    +                tempDotCopyFile = tempFile;
    +
    +                // if any errors happened within the session.read then throw the exception so we jump
    +                // into one of the appropriate catch blocks below
    +                if (exceptionHolder.get() != null) {
    +                    throw exceptionHolder.get();
    +                }
    +
    +                // Attempt to rename from the tempFile to destFile, and change owner if successfully renamed
    +                rename(fileSystem, tempFile, destFile);
    +                changeOwner(fileSystem, destFile, remoteOwner, remoteGroup);
    +
    +                getLogger().info("Wrote {} to {} in {} milliseconds at a rate of {}", new Object[]{putFlowFile, destFile, millis, dataRate});
    +
    +                putFlowFile = postProcess(context, session, putFlowFile, destFile);
    +
    +                final String outputPath = destFile.toString();
    +                final String newFilename = destFile.getName();
    +                final String hdfsPath = destFile.getParent().toString();
    +
    +                // Update the filename and absolute path attributes
    +                final Map<String,String> attributes = new HashMap<>(writeResult.get().getAttributes());
    +                attributes.put(CoreAttributes.FILENAME.key(), newFilename);
    +                attributes.put(ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
    +                attributes.put(RECORD_COUNT_ATTR, String.valueOf(writeResult.get().getRecordCount()));
    +                putFlowFile = session.putAllAttributes(putFlowFile, attributes);
    +
    +                // Send a provenance event and transfer to success
    +                final String transitUri = (outputPath.startsWith("/")) ? "hdfs:/" + outputPath : "hdfs://" + outputPath;
    +                session.getProvenanceReporter().send(putFlowFile, transitUri);
    +                session.transfer(putFlowFile, REL_SUCCESS);
    +
    +            } catch (IOException | FlowFileAccessException e) {
    +                deleteQuietly(fileSystem, tempDotCopyFile);
    +                getLogger().error("Failed to write due to {}", new Object[]{e});
    +                session.transfer(session.penalize(putFlowFile), REL_RETRY);
    +                context.yield();
    +            } catch (Throwable t) {
    +                deleteQuietly(fileSystem, tempDotCopyFile);
    +                getLogger().error("Failed to write due to {}", new Object[]{t});
    +                session.transfer(putFlowFile, REL_FAILURE);
    +            }
    +
    +            return null;
    +        });
    +    }
    +
    +    /**
    +     * This method will be called after successfully writing to the destination file and renaming the file to it's final name
    +     * in order to give sub-classes a chance to take action before transferring to success.
    +     *
    +     * @param context the context
    +     * @param session the session
    +     * @param flowFile the flow file being processed
    +     * @param destFile the destination file written to
    +     * @return an updated FlowFile reference
    +     */
    +    protected FlowFile postProcess(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, final Path destFile) {
    +        return flowFile;
    +    }
    +
    +    protected void rename(final FileSystem fileSystem, final Path srcFile, final Path destFile) throws IOException, InterruptedException, FailureException {
    +        boolean renamed = false;
    +        for (int i = 0; i < 10; i++) { // try to rename multiple times.
    +            if (fileSystem.rename(srcFile, destFile)) {
    +                renamed = true;
    +                break;// rename was successful
    +            }
    +            Thread.sleep(200L);// try waiting to let whatever might cause rename failure to resolve
    +        }
    +        if (!renamed) {
    --- End diff --
    
    My read of this is that if the rename operation fails 10x, the source file is deleted. Is that captured anywhere in the docs/Javadocs, etc.? Would be a little confusing for a user unless the only context for this method is renaming the temporary file to the persistent one. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1712: NIFI-3724 - Add Put/Fetch Parquet Processors

Posted by alopresto <gi...@git.apache.org>.
Github user alopresto commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1712#discussion_r114154289
  
    --- Diff: nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java ---
    @@ -0,0 +1,279 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.hadoop;
    +
    +import org.apache.commons.io.input.NullInputStream;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.security.AccessControlException;
    +import org.apache.hadoop.security.UserGroupInformation;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.configuration.DefaultSettings;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.FlowFileAccessException;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.hadoop.record.HDFSRecordReader;
    +import org.apache.nifi.serialization.RecordSetWriter;
    +import org.apache.nifi.serialization.RecordSetWriterFactory;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.WriteResult;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.RecordSet;
    +import org.apache.nifi.util.StopWatch;
    +
    +import java.io.BufferedOutputStream;
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.net.URI;
    +import java.security.PrivilegedAction;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Base processor for reading a data from HDFS that can be fetched into records.
    + */
    +@TriggerWhenEmpty // trigger when empty so we have a chance to perform a Kerberos re-login
    +@DefaultSettings(yieldDuration = "100 ms") // decrease the default yield since we are triggering when empty
    +public abstract class AbstractFetchHDFSRecord extends AbstractHadoopProcessor {
    +
    +    public static final PropertyDescriptor FILENAME = new PropertyDescriptor.Builder()
    +            .name("filename")
    +            .displayName("Filename")
    +            .description("The name of the file to retrieve")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("${path}/${filename}")
    +            .addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
    +            .name("record-writer")
    +            .displayName("Record Writer")
    +            .description("The service for writing records to the FlowFile content")
    +            .identifiesControllerService(RecordSetWriterFactory.class)
    +            .required(true)
    +            .build();
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("FlowFiles will be routed to this relationship once they have been updated with the content of the file")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("FlowFiles will be routed to this relationship if the content of the file cannot be retrieved and trying again will likely not be helpful. "
    +                    + "This would occur, for instance, if the file is not found or if there is a permissions issue")
    +            .build();
    +
    +    public static final Relationship REL_RETRY = new Relationship.Builder()
    +            .name("retry")
    +            .description("FlowFiles will be routed to this relationship if the content of the file cannot be retrieved, but might be able to be in the future if tried again. "
    +                    + "This generally indicates that the Fetch should be tried again.")
    +            .build();
    +
    +    public static final String FETCH_FAILURE_REASON_ATTR = "fetch.failure.reason";
    +    public static final String RECORD_COUNT_ATTR = "record.count";
    +
    +    private volatile Set<Relationship> fetchHdfsRecordRelationships;
    +    private volatile List<PropertyDescriptor> fetchHdfsRecordProperties;
    +
    +    @Override
    +    protected final void init(final ProcessorInitializationContext context) {
    +        super.init(context);
    +
    +        final Set<Relationship> rels = new HashSet<>();
    +        rels.add(REL_SUCCESS);
    +        rels.add(REL_RETRY);
    +        rels.add(REL_FAILURE);
    +        this.fetchHdfsRecordRelationships = Collections.unmodifiableSet(rels);
    +
    +        final List<PropertyDescriptor> props = new ArrayList<>(properties);
    +        props.add(FILENAME);
    +        props.add(RECORD_WRITER);
    +        props.addAll(getAdditionalProperties());
    +        this.fetchHdfsRecordProperties = Collections.unmodifiableList(props);
    +    }
    +
    +    /**
    +     * Allows sub-classes to add additional properties, called from initialize.
    +     *
    +     * @return additional properties to add to the overall list
    +     */
    +    public List<PropertyDescriptor> getAdditionalProperties() {
    +        return Collections.emptyList();
    +    }
    +
    +    @Override
    +    public final Set<Relationship> getRelationships() {
    +        return fetchHdfsRecordRelationships;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return fetchHdfsRecordProperties;
    +    }
    +
    +    /**
    +     * Sub-classes provide the appropriate HDFSRecordReader.
    +     *
    +     * @param context the process context to obtain additional configuration
    +     * @param flowFile the flow file being written
    +     * @param conf the Configuration instance
    +     * @param path the path to write to
    +     * @return the HDFSRecordWriter
    +     * @throws IOException if an error occurs creating the writer
    +     */
    +    public abstract HDFSRecordReader createHDFSRecordReader(final ProcessContext context, final FlowFile flowFile, final Configuration conf, final Path path)
    +            throws IOException;
    +
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        // do this before getting a flow file so that we always get a chance to attempt Kerberos relogin
    +        final FileSystem fileSystem = getFileSystem();
    +        final Configuration configuration = getConfiguration();
    +        final UserGroupInformation ugi = getUserGroupInformation();
    +
    +        if (configuration == null || fileSystem == null || ugi == null) {
    +            getLogger().error("Processor not configured properly because Configuration, FileSystem, or UserGroupInformation was null");
    +            context.yield();
    +            return;
    +        }
    +
    +        final FlowFile originalFlowFile = session.get();
    +        if (originalFlowFile == null ) {
    +            context.yield();
    +            return;
    +        }
    +
    +
    +        ugi.doAs((PrivilegedAction<Object>)() -> {
    +            FlowFile child = null;
    +            final String filenameValue = context.getProperty(FILENAME).evaluateAttributeExpressions(originalFlowFile).getValue();
    +            try {
    +                final Path path = new Path(filenameValue);
    +                final AtomicReference<Throwable> exceptionHolder = new AtomicReference<>(null);
    +                final AtomicReference<WriteResult> writeResult = new AtomicReference<>();
    +
    +                final RecordSetWriterFactory recordSetWriterFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
    +                final RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(getLogger(), originalFlowFile, new NullInputStream(0));
    +
    +                final StopWatch stopWatch = new StopWatch(true);
    +
    +                // use a child FlowFile so that if any error occurs we can route the original untouched FlowFile to retry/failure
    +                child = session.create(originalFlowFile);
    +                child = session.write(child, (final OutputStream rawOut) -> {
    +                    try (final BufferedOutputStream out = new BufferedOutputStream(rawOut);
    +                         final HDFSRecordReader recordReader = createHDFSRecordReader(context, originalFlowFile, configuration, path)) {
    +
    +                        final RecordSchema emptySchema = new SimpleRecordSchema(Collections.emptyList());
    +
    +                        final RecordSet recordSet = new RecordSet() {
    +                            @Override
    +                            public RecordSchema getSchema() throws IOException {
    +                                return emptySchema;
    +                            }
    +
    +                            @Override
    +                            public Record next() throws IOException {
    +                                return recordReader.nextRecord();
    +                            }
    +                        };
    +
    +                        writeResult.set(recordSetWriter.write(recordSet, out));
    +                    } catch (Exception e) {
    +                        exceptionHolder.set(e);
    +                    }
    +                });
    +
    +                stopWatch.stop();
    +
    +                // if any errors happened within the session.write then throw the exception so we jump
    +                // into one of the appropriate catch blocks below
    +                if (exceptionHolder.get() != null) {
    +                    throw exceptionHolder.get();
    +                }
    +
    +                FlowFile successFlowFile = postProcess(context, session, child, path);
    +
    +                final Map<String,String> attributes = new HashMap<>(writeResult.get().getAttributes());
    +                attributes.put(RECORD_COUNT_ATTR, String.valueOf(writeResult.get().getRecordCount()));
    +                attributes.put(CoreAttributes.MIME_TYPE.key(), recordSetWriter.getMimeType());
    +                successFlowFile = session.putAllAttributes(successFlowFile, attributes);
    +
    +                final URI uri = path.toUri();
    +                getLogger().info("Successfully received content from {} for {} in {}", new Object[] {uri, successFlowFile, stopWatch.getDuration()});
    --- End diff --
    
    Add unit of `milliseconds` to the log output of the duration. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1712: NIFI-3724 - Add Put/Fetch Parquet Processors

Posted by alopresto <gi...@git.apache.org>.
Github user alopresto commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1712#discussion_r114147981
  
    --- Diff: nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java ---
    @@ -67,40 +61,14 @@
      */
     @RequiresInstanceClassLoading(cloneAncestorResources = true)
     public abstract class AbstractHadoopProcessor extends AbstractProcessor {
    -    /**
    -     * Compression Type Enum
    -     */
    -    public enum CompressionType {
    -        NONE,
    -        DEFAULT,
    -        BZIP,
    -        GZIP,
    -        LZ4,
    -        SNAPPY,
    -        AUTOMATIC;
    -
    -        @Override
    -        public String toString() {
    -            switch (this) {
    -                case NONE: return "NONE";
    -                case DEFAULT: return DefaultCodec.class.getName();
    -                case BZIP: return BZip2Codec.class.getName();
    -                case GZIP: return GzipCodec.class.getName();
    -                case LZ4: return Lz4Codec.class.getName();
    -                case SNAPPY: return SnappyCodec.class.getName();
    -                case AUTOMATIC: return "Automatically Detected";
    -            }
    -            return null;
    -        }
    -    }
     
         // properties
         public static final PropertyDescriptor HADOOP_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder()
                 .name("Hadoop Configuration Resources")
                 .description("A file or comma separated list of files which contains the Hadoop file system configuration. Without this, Hadoop "
                         + "will search the classpath for a 'core-site.xml' and 'hdfs-site.xml' file or will revert to a default configuration.")
                 .required(false)
    -            .addValidator(createMultipleFilesExistValidator())
    +            .addValidator(HadoopValidators.MULTIPLE_FILE_EXISTS_VALIDATOR)
    --- End diff --
    
    Minor comment -- until I read the source code for this, my interpretation was that this validator ensured that *multiple files existed* -- i.e. one file provided would fail. Perhaps we can rename this `ONE_OR_MORE_FILES_EXIST_VALIDATOR`? Not a giant issue but potentially confusing. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #1712: NIFI-3724 - Add Put/Fetch Parquet Processors

Posted by nellashapiro123 <gi...@git.apache.org>.
Github user nellashapiro123 commented on the issue:

    https://github.com/apache/nifi/pull/1712
  
    Has anybody been able to use fetchParquet processor successfully? I am getting SchemaNotFound exception. I have created the file with PutParquet and Spark can read this parquet file.


---

[GitHub] nifi pull request #1712: NIFI-3724 - Add Put/Fetch Parquet Processors

Posted by alopresto <gi...@git.apache.org>.
Github user alopresto commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1712#discussion_r114152045
  
    --- Diff: nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java ---
    @@ -0,0 +1,156 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.schema.access;
    +
    +import org.apache.nifi.avro.AvroSchemaValidator;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.schemaregistry.services.SchemaRegistry;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.List;
    +
    +public class SchemaAccessUtils {
    +
    +    public static final AllowableValue SCHEMA_NAME_PROPERTY = new AllowableValue("schema-name", "Use 'Schema Name' Property",
    +            "The name of the Schema to use is specified by the 'Schema Name' Property. The value of this property is used to lookup the Schema in the configured Schema Registry service.");
    +    public static final AllowableValue SCHEMA_TEXT_PROPERTY = new AllowableValue("schema-text-property", "Use 'Schema Text' Property",
    +            "The text of the Schema itself is specified by the 'Schema Text' Property. The value of this property must be a valid Avro Schema. "
    +                    + "If Expression Language is used, the value of the 'Schema Text' property must be valid after substituting the expressions.");
    +    public static final AllowableValue HWX_CONTENT_ENCODED_SCHEMA = new AllowableValue("hwx-content-encoded-schema", "HWX Content-Encoded Schema Reference",
    +            "The content of the FlowFile contains a reference to a schema in the Schema Registry service. The reference is encoded as a single byte indicating the 'protocol version', "
    +                    + "followed by 8 bytes indicating the schema identifier, and finally 4 bytes indicating the schema version, as per the Hortonworks Schema Registry serializers and deserializers, "
    +                    + "found at https://github.com/hortonworks/registry");
    +    public static final AllowableValue HWX_SCHEMA_REF_ATTRIBUTES = new AllowableValue("hwx-schema-ref-attributes", "HWX Schema Reference Attributes",
    +            "The FlowFile contains 3 Attributes that will be used to lookup a Schema from the configured Schema Registry: 'schema.identifier', 'schema.version', and 'schema.protocol.version'");
    +
    +    public  static final PropertyDescriptor SCHEMA_REGISTRY = new PropertyDescriptor.Builder()
    +            .name("Schema Registry")
    +            .description("Specifies the Controller Service to use for the Schema Registry")
    +            .identifiesControllerService(SchemaRegistry.class)
    +            .required(false)
    +            .build();
    +
    +    public  static final PropertyDescriptor SCHEMA_ACCESS_STRATEGY = new PropertyDescriptor.Builder()
    +            .name("Schema Access Strategy")
    +            .description("Specifies how to obtain the schema that is to be used for interpreting the data.")
    +            .allowableValues(SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA)
    +            .defaultValue(SCHEMA_NAME_PROPERTY.getValue())
    +            .required(true)
    +            .build();
    +
    +    public static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder()
    +            .name("Schema Name")
    +            .description("Specifies the name of the schema to lookup in the Schema Registry property")
    --- End diff --
    
    Please use a `name` value without a space and provide a `displayName` field with human-facing value (i.e. "Schema Name"). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1712: NIFI-3724 - Add Put/Fetch Parquet Processors

Posted by alopresto <gi...@git.apache.org>.
Github user alopresto commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1712#discussion_r114148462
  
    --- Diff: nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java ---
    @@ -0,0 +1,496 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.avro;
    +
    +import org.apache.avro.LogicalType;
    +import org.apache.avro.LogicalTypes;
    +import org.apache.avro.Schema;
    +import org.apache.avro.Schema.Field;
    +import org.apache.avro.Schema.Type;
    +import org.apache.avro.generic.GenericData;
    +import org.apache.avro.generic.GenericFixed;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.avro.util.Utf8;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.DataType;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.SchemaIdentifier;
    +import org.apache.nifi.serialization.record.util.DataTypeUtils;
    +import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
    +
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.time.Duration;
    +import java.time.temporal.ChronoUnit;
    +import java.util.ArrayList;
    +import java.util.Date;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.concurrent.TimeUnit;
    +import java.util.stream.Collectors;
    +
    +public class AvroTypeUtil {
    +    public static final String AVRO_SCHEMA_FORMAT = "avro";
    +
    +    public static Schema extractAvroSchema(final RecordSchema recordSchema) throws SchemaNotFoundException {
    +        final Optional<String> schemaFormatOption = recordSchema.getSchemaFormat();
    --- End diff --
    
    Should we guard against `null` here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #1712: NIFI-3724 - Add Put/Fetch Parquet Processors

Posted by alopresto <gi...@git.apache.org>.
Github user alopresto commented on the issue:

    https://github.com/apache/nifi/pull/1712
  
    Thanks. Merging. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #1712: NIFI-3724 - Add Put/Fetch Parquet Processors

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on the issue:

    https://github.com/apache/nifi/pull/1712
  
    Thanks Andy, I just pushed a commit that addresses your comments, we should be good to go. I am going to look into the template issue, but I agree that it is not caused by the changes in this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1712: NIFI-3724 - Add Put/Fetch Parquet Processors

Posted by alopresto <gi...@git.apache.org>.
Github user alopresto commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1712#discussion_r114159819
  
    --- Diff: nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java ---
    @@ -0,0 +1,156 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.schema.access;
    +
    +import org.apache.nifi.avro.AvroSchemaValidator;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.schemaregistry.services.SchemaRegistry;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.List;
    +
    +public class SchemaAccessUtils {
    +
    +    public static final AllowableValue SCHEMA_NAME_PROPERTY = new AllowableValue("schema-name", "Use 'Schema Name' Property",
    +            "The name of the Schema to use is specified by the 'Schema Name' Property. The value of this property is used to lookup the Schema in the configured Schema Registry service.");
    +    public static final AllowableValue SCHEMA_TEXT_PROPERTY = new AllowableValue("schema-text-property", "Use 'Schema Text' Property",
    +            "The text of the Schema itself is specified by the 'Schema Text' Property. The value of this property must be a valid Avro Schema. "
    +                    + "If Expression Language is used, the value of the 'Schema Text' property must be valid after substituting the expressions.");
    +    public static final AllowableValue HWX_CONTENT_ENCODED_SCHEMA = new AllowableValue("hwx-content-encoded-schema", "HWX Content-Encoded Schema Reference",
    +            "The content of the FlowFile contains a reference to a schema in the Schema Registry service. The reference is encoded as a single byte indicating the 'protocol version', "
    +                    + "followed by 8 bytes indicating the schema identifier, and finally 4 bytes indicating the schema version, as per the Hortonworks Schema Registry serializers and deserializers, "
    +                    + "found at https://github.com/hortonworks/registry");
    +    public static final AllowableValue HWX_SCHEMA_REF_ATTRIBUTES = new AllowableValue("hwx-schema-ref-attributes", "HWX Schema Reference Attributes",
    +            "The FlowFile contains 3 Attributes that will be used to lookup a Schema from the configured Schema Registry: 'schema.identifier', 'schema.version', and 'schema.protocol.version'");
    +
    +    public  static final PropertyDescriptor SCHEMA_REGISTRY = new PropertyDescriptor.Builder()
    +            .name("Schema Registry")
    +            .description("Specifies the Controller Service to use for the Schema Registry")
    +            .identifiesControllerService(SchemaRegistry.class)
    +            .required(false)
    +            .build();
    +
    +    public  static final PropertyDescriptor SCHEMA_ACCESS_STRATEGY = new PropertyDescriptor.Builder()
    +            .name("Schema Access Strategy")
    +            .description("Specifies how to obtain the schema that is to be used for interpreting the data.")
    +            .allowableValues(SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA)
    +            .defaultValue(SCHEMA_NAME_PROPERTY.getValue())
    +            .required(true)
    +            .build();
    +
    +    public static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder()
    +            .name("Schema Name")
    +            .description("Specifies the name of the schema to lookup in the Schema Registry property")
    --- End diff --
    
    I don't mean it has to be difficult to read, but as this is used as the value identifier when the flow is serialized to XML, some formatters/etc. could break lines on the space or otherwise manipulate it. I think it's safer to avoid spaces (and most of the other examples are `formatted-like-this`). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1712: NIFI-3724 - Add Put/Fetch Parquet Processors

Posted by alopresto <gi...@git.apache.org>.
Github user alopresto commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1712#discussion_r114151998
  
    --- Diff: nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java ---
    @@ -0,0 +1,156 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.schema.access;
    +
    +import org.apache.nifi.avro.AvroSchemaValidator;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.schemaregistry.services.SchemaRegistry;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.List;
    +
    +public class SchemaAccessUtils {
    +
    +    public static final AllowableValue SCHEMA_NAME_PROPERTY = new AllowableValue("schema-name", "Use 'Schema Name' Property",
    +            "The name of the Schema to use is specified by the 'Schema Name' Property. The value of this property is used to lookup the Schema in the configured Schema Registry service.");
    +    public static final AllowableValue SCHEMA_TEXT_PROPERTY = new AllowableValue("schema-text-property", "Use 'Schema Text' Property",
    +            "The text of the Schema itself is specified by the 'Schema Text' Property. The value of this property must be a valid Avro Schema. "
    +                    + "If Expression Language is used, the value of the 'Schema Text' property must be valid after substituting the expressions.");
    +    public static final AllowableValue HWX_CONTENT_ENCODED_SCHEMA = new AllowableValue("hwx-content-encoded-schema", "HWX Content-Encoded Schema Reference",
    +            "The content of the FlowFile contains a reference to a schema in the Schema Registry service. The reference is encoded as a single byte indicating the 'protocol version', "
    +                    + "followed by 8 bytes indicating the schema identifier, and finally 4 bytes indicating the schema version, as per the Hortonworks Schema Registry serializers and deserializers, "
    +                    + "found at https://github.com/hortonworks/registry");
    +    public static final AllowableValue HWX_SCHEMA_REF_ATTRIBUTES = new AllowableValue("hwx-schema-ref-attributes", "HWX Schema Reference Attributes",
    +            "The FlowFile contains 3 Attributes that will be used to lookup a Schema from the configured Schema Registry: 'schema.identifier', 'schema.version', and 'schema.protocol.version'");
    +
    +    public  static final PropertyDescriptor SCHEMA_REGISTRY = new PropertyDescriptor.Builder()
    +            .name("Schema Registry")
    +            .description("Specifies the Controller Service to use for the Schema Registry")
    +            .identifiesControllerService(SchemaRegistry.class)
    +            .required(false)
    +            .build();
    +
    +    public  static final PropertyDescriptor SCHEMA_ACCESS_STRATEGY = new PropertyDescriptor.Builder()
    +            .name("Schema Access Strategy")
    --- End diff --
    
    Please use a `name` value without a space and provide a `displayName` field with human-facing value (i.e. "Schema Access Strategy"). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1712: NIFI-3724 - Add Put/Fetch Parquet Processors

Posted by alopresto <gi...@git.apache.org>.
Github user alopresto commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1712#discussion_r114151958
  
    --- Diff: nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java ---
    @@ -0,0 +1,156 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.schema.access;
    +
    +import org.apache.nifi.avro.AvroSchemaValidator;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.schemaregistry.services.SchemaRegistry;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.List;
    +
    +public class SchemaAccessUtils {
    +
    +    public static final AllowableValue SCHEMA_NAME_PROPERTY = new AllowableValue("schema-name", "Use 'Schema Name' Property",
    +            "The name of the Schema to use is specified by the 'Schema Name' Property. The value of this property is used to lookup the Schema in the configured Schema Registry service.");
    +    public static final AllowableValue SCHEMA_TEXT_PROPERTY = new AllowableValue("schema-text-property", "Use 'Schema Text' Property",
    +            "The text of the Schema itself is specified by the 'Schema Text' Property. The value of this property must be a valid Avro Schema. "
    +                    + "If Expression Language is used, the value of the 'Schema Text' property must be valid after substituting the expressions.");
    +    public static final AllowableValue HWX_CONTENT_ENCODED_SCHEMA = new AllowableValue("hwx-content-encoded-schema", "HWX Content-Encoded Schema Reference",
    +            "The content of the FlowFile contains a reference to a schema in the Schema Registry service. The reference is encoded as a single byte indicating the 'protocol version', "
    +                    + "followed by 8 bytes indicating the schema identifier, and finally 4 bytes indicating the schema version, as per the Hortonworks Schema Registry serializers and deserializers, "
    +                    + "found at https://github.com/hortonworks/registry");
    +    public static final AllowableValue HWX_SCHEMA_REF_ATTRIBUTES = new AllowableValue("hwx-schema-ref-attributes", "HWX Schema Reference Attributes",
    +            "The FlowFile contains 3 Attributes that will be used to lookup a Schema from the configured Schema Registry: 'schema.identifier', 'schema.version', and 'schema.protocol.version'");
    +
    +    public  static final PropertyDescriptor SCHEMA_REGISTRY = new PropertyDescriptor.Builder()
    +            .name("Schema Registry")
    --- End diff --
    
    Please use a `name` value without a space and provide a `displayName` field with human-facing value (i.e. "Schema Registry"). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1712: NIFI-3724 - Add Put/Fetch Parquet Processors

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1712#discussion_r114153914
  
    --- Diff: nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java ---
    @@ -0,0 +1,156 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.schema.access;
    +
    +import org.apache.nifi.avro.AvroSchemaValidator;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.schemaregistry.services.SchemaRegistry;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.List;
    +
    +public class SchemaAccessUtils {
    +
    +    public static final AllowableValue SCHEMA_NAME_PROPERTY = new AllowableValue("schema-name", "Use 'Schema Name' Property",
    +            "The name of the Schema to use is specified by the 'Schema Name' Property. The value of this property is used to lookup the Schema in the configured Schema Registry service.");
    +    public static final AllowableValue SCHEMA_TEXT_PROPERTY = new AllowableValue("schema-text-property", "Use 'Schema Text' Property",
    +            "The text of the Schema itself is specified by the 'Schema Text' Property. The value of this property must be a valid Avro Schema. "
    +                    + "If Expression Language is used, the value of the 'Schema Text' property must be valid after substituting the expressions.");
    +    public static final AllowableValue HWX_CONTENT_ENCODED_SCHEMA = new AllowableValue("hwx-content-encoded-schema", "HWX Content-Encoded Schema Reference",
    +            "The content of the FlowFile contains a reference to a schema in the Schema Registry service. The reference is encoded as a single byte indicating the 'protocol version', "
    +                    + "followed by 8 bytes indicating the schema identifier, and finally 4 bytes indicating the schema version, as per the Hortonworks Schema Registry serializers and deserializers, "
    +                    + "found at https://github.com/hortonworks/registry");
    +    public static final AllowableValue HWX_SCHEMA_REF_ATTRIBUTES = new AllowableValue("hwx-schema-ref-attributes", "HWX Schema Reference Attributes",
    +            "The FlowFile contains 3 Attributes that will be used to lookup a Schema from the configured Schema Registry: 'schema.identifier', 'schema.version', and 'schema.protocol.version'");
    +
    +    public  static final PropertyDescriptor SCHEMA_REGISTRY = new PropertyDescriptor.Builder()
    +            .name("Schema Registry")
    +            .description("Specifies the Controller Service to use for the Schema Registry")
    +            .identifiesControllerService(SchemaRegistry.class)
    +            .required(false)
    +            .build();
    +
    +    public  static final PropertyDescriptor SCHEMA_ACCESS_STRATEGY = new PropertyDescriptor.Builder()
    +            .name("Schema Access Strategy")
    +            .description("Specifies how to obtain the schema that is to be used for interpreting the data.")
    +            .allowableValues(SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA)
    +            .defaultValue(SCHEMA_NAME_PROPERTY.getValue())
    +            .required(true)
    +            .build();
    +
    +    public static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder()
    +            .name("Schema Name")
    +            .description("Specifies the name of the schema to lookup in the Schema Registry property")
    --- End diff --
    
    just to be clear the only thing that needs to be truly Human Readable is displayName.  However, the 'name' doesn't have to look like something only a computer would like :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #1712: NIFI-3724 - Add Put/Fetch Parquet Processors

Posted by alopresto <gi...@git.apache.org>.
Github user alopresto commented on the issue:

    https://github.com/apache/nifi/pull/1712
  
    Reviewing...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #1712: NIFI-3724 - Add Put/Fetch Parquet Processors

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on the issue:

    https://github.com/apache/nifi/pull/1712
  
    @nellashapiro123 it would probably be best to ask this on the mailing lists:
    https://nifi.apache.org/mailing_lists.html
    
    If you send an email, please provide more info about your flow like which reader and writer is FetchParquet using? what schema access strategy is each reader and writer using? and if using schema access by name, what is the value of the schema.name attribute coming into FetchParquet?


---

[GitHub] nifi issue #1712: NIFI-3724 - Add Put/Fetch Parquet Processors

Posted by masterlittle <gi...@git.apache.org>.
Github user masterlittle commented on the issue:

    https://github.com/apache/nifi/pull/1712
  
    Hi, Does this enable writing the parquet files to S3 bucket? If not then is there any way I can achieve the same?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1712: NIFI-3724 - Add Put/Fetch Parquet Processors

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/nifi/pull/1712


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1712: NIFI-3724 - Add Put/Fetch Parquet Processors

Posted by alopresto <gi...@git.apache.org>.
Github user alopresto commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1712#discussion_r114179324
  
    --- Diff: nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java ---
    @@ -0,0 +1,505 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.hadoop;
    +
    +import org.apache.commons.io.IOUtils;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.fs.permission.FsPermission;
    +import org.apache.hadoop.security.UserGroupInformation;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.configuration.DefaultSettings;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.FlowFileAccessException;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.hadoop.exception.FailureException;
    +import org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
    +import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter;
    +import org.apache.nifi.schema.access.SchemaAccessStrategy;
    +import org.apache.nifi.schema.access.SchemaAccessUtils;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.schemaregistry.services.SchemaRegistry;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.WriteResult;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.RecordSet;
    +import org.apache.nifi.util.StopWatch;
    +
    +import java.io.BufferedInputStream;
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.security.PrivilegedAction;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import static org.apache.nifi.schema.access.SchemaAccessUtils.HWX_CONTENT_ENCODED_SCHEMA;
    +import static org.apache.nifi.schema.access.SchemaAccessUtils.HWX_SCHEMA_REF_ATTRIBUTES;
    +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY;
    +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME;
    +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPERTY;
    +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REGISTRY;
    +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT;
    +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT_PROPERTY;
    +
    +/**
    + * Base class for processors that write Records to HDFS.
    + */
    +@TriggerWhenEmpty // trigger when empty so we have a chance to perform a Kerberos re-login
    +@DefaultSettings(yieldDuration = "100 ms") // decrease the default yield since we are triggering when empty
    +public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
    +
    +
    +    public static final PropertyDescriptor COMPRESSION_TYPE = new PropertyDescriptor.Builder()
    +            .name("compression-type")
    +            .displayName("Compression Type")
    +            .description("The type of compression for the file being written.")
    +            .required(true)
    +            .build();
    +
    +    public static final PropertyDescriptor OVERWRITE = new PropertyDescriptor.Builder()
    +            .name("overwrite")
    +            .displayName("Overwrite Files")
    +            .description("Whether or not to overwrite existing files in the same directory with the same name. When set to false, " +
    +                    "flow files will be routed to failure when a file exists in the same directory with the same name.")
    +            .allowableValues("true", "false")
    +            .defaultValue("false")
    +            .required(true)
    +            .build();
    +
    +    public static final PropertyDescriptor UMASK = new PropertyDescriptor.Builder()
    +            .name("permissions-umask")
    +            .displayName("Permissions umask")
    +            .description("A umask represented as an octal number which determines the permissions of files written to HDFS. " +
    +                    "This overrides the Hadoop Configuration dfs.umaskmode")
    +            .addValidator(HadoopValidators.UMASK_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor REMOTE_OWNER = new PropertyDescriptor.Builder()
    +            .name("remote-owner")
    +            .displayName("Remote Owner")
    +            .description("Changes the owner of the HDFS file to this value after it is written. " +
    +                    "This only works if NiFi is running as a user that has HDFS super user privilege to change owner")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor REMOTE_GROUP = new PropertyDescriptor.Builder()
    +            .name("remote-group")
    +            .displayName("Remote Group")
    +            .description("Changes the group of the HDFS file to this value after it is written. " +
    +                    "This only works if NiFi is running as a user that has HDFS super user privilege to change group")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
    +            .name("record-reader")
    +            .displayName("Record Reader")
    +            .description("The service for reading records from incoming flow files.")
    +            .identifiesControllerService(RecordReaderFactory.class)
    +            .required(true)
    +            .build();
    +
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("Flow Files that have been successfully processed are transferred to this relationship")
    +            .build();
    +
    +    public static final Relationship REL_RETRY = new Relationship.Builder()
    +            .name("retry")
    +            .description("Flow Files that could not be processed due to issues that can be retried are transferred to this relationship")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("Flow Files that could not be processed due to issue that cannot be retried are transferred to this relationship")
    +            .build();
    +
    +    public static final String RECORD_COUNT_ATTR = "record.count";
    +
    +    private volatile String remoteOwner;
    +    private volatile String remoteGroup;
    +    private volatile SchemaAccessStrategy schemaAccessStrategy;
    +
    +    private volatile Set<Relationship> putHdfsRecordRelationships;
    +    private volatile List<PropertyDescriptor> putHdfsRecordProperties;
    +
    +    private final List<AllowableValue> strategyList = Collections.unmodifiableList(Arrays.asList(
    +            SCHEMA_NAME_PROPERTY,
    +            SCHEMA_TEXT_PROPERTY,
    +            HWX_SCHEMA_REF_ATTRIBUTES,
    +            HWX_CONTENT_ENCODED_SCHEMA
    +    ));
    +
    +
    +    @Override
    +    protected final void init(final ProcessorInitializationContext context) {
    +        super.init(context);
    +
    +        final Set<Relationship> rels = new HashSet<>();
    +        rels.add(REL_SUCCESS);
    +        rels.add(REL_RETRY);
    +        rels.add(REL_FAILURE);
    +        this.putHdfsRecordRelationships = Collections.unmodifiableSet(rels);
    +
    +        final List<PropertyDescriptor> props = new ArrayList<>(properties);
    +        props.add(RECORD_READER);
    +
    +        props.add(new PropertyDescriptor.Builder()
    +                .fromPropertyDescriptor(DIRECTORY)
    +                .description("The parent directory to which files should be written. Will be created if it doesn't exist.")
    +                .build());
    +
    +        final AllowableValue[] strategies = getSchemaAccessStrategyValues().toArray(new AllowableValue[0]);
    +
    +        props.add(new PropertyDescriptor.Builder()
    +                .fromPropertyDescriptor(SCHEMA_ACCESS_STRATEGY)
    +                .description("Specifies how to obtain the schema that is to be used for writing the data.")
    +                .allowableValues(strategies)
    +                .defaultValue(getDefaultSchemaAccessStrategy().getValue())
    +                .build());
    +
    +        props.add(SCHEMA_REGISTRY);
    +        props.add(SCHEMA_NAME);
    +        props.add(SCHEMA_TEXT);
    +
    +        final AllowableValue[] compressionTypes = getCompressionTypes(context).toArray(new AllowableValue[0]);
    +
    +        props.add(new PropertyDescriptor.Builder()
    +                .fromPropertyDescriptor(COMPRESSION_TYPE)
    +                .allowableValues(compressionTypes)
    +                .defaultValue(getDefaultCompressionType(context))
    +                .build());
    +
    +        props.add(OVERWRITE);
    +        props.add(UMASK);
    +        props.add(REMOTE_GROUP);
    +        props.add(REMOTE_OWNER);
    +        props.addAll(getAdditionalProperties());
    +        this.putHdfsRecordProperties = Collections.unmodifiableList(props);
    +    }
    +
    +    protected List<AllowableValue> getSchemaAccessStrategyValues() {
    +        return strategyList;
    +    }
    +
    +    protected AllowableValue getDefaultSchemaAccessStrategy() {
    +        return SCHEMA_NAME_PROPERTY;
    +    }
    +
    +    private PropertyDescriptor getSchemaAcessStrategyDescriptor() {
    +        return getPropertyDescriptor(SCHEMA_ACCESS_STRATEGY.getName());
    +    }
    +
    +    /**
    +     * @param context the initialization context
    +     * @return the possible compression types
    +     */
    +    public abstract List<AllowableValue> getCompressionTypes(final ProcessorInitializationContext context);
    +
    +    /**
    +     * @param context the initialization context
    +     * @return the default compression type
    +     */
    +    public abstract String getDefaultCompressionType(final ProcessorInitializationContext context);
    +
    +    /**
    +     * Allows sub-classes to add additional properties, called from initialize.
    +     *
    +     * @return additional properties to add to the overall list
    +     */
    +    public List<PropertyDescriptor> getAdditionalProperties() {
    +        return Collections.emptyList();
    +    }
    +
    +    @Override
    +    public final Set<Relationship> getRelationships() {
    +        return putHdfsRecordRelationships;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +       return putHdfsRecordProperties;
    +    }
    +
    +    @Override
    +    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
    +        final String schemaAccessStrategy = validationContext.getProperty(getSchemaAcessStrategyDescriptor()).getValue();
    +        return SchemaAccessUtils.validateSchemaAccessStrategy(validationContext, schemaAccessStrategy, getSchemaAccessStrategyValues());
    +    }
    +
    +    @OnScheduled
    +    public final void onScheduled(final ProcessContext context) throws IOException {
    +        super.abstractOnScheduled(context);
    +
    +        final SchemaRegistry schemaRegistry = context.getProperty(SCHEMA_REGISTRY).asControllerService(SchemaRegistry.class);
    +
    +        final PropertyDescriptor descriptor = getPropertyDescriptor(SCHEMA_ACCESS_STRATEGY.getName());
    +        final String schemaAccess = context.getProperty(descriptor).getValue();
    +        this.schemaAccessStrategy = SchemaAccessUtils.getSchemaAccessStrategy(schemaAccess, schemaRegistry, context);
    +
    +        this.remoteOwner = context.getProperty(REMOTE_OWNER).getValue();
    +        this.remoteGroup = context.getProperty(REMOTE_GROUP).getValue();
    +
    +        // Set umask once, to avoid thread safety issues doing it in onTrigger
    +        final PropertyValue umaskProp = context.getProperty(UMASK);
    +        final short dfsUmask;
    +        if (umaskProp.isSet()) {
    +            dfsUmask = Short.parseShort(umaskProp.getValue(), 8);
    +        } else {
    +            dfsUmask = FsPermission.DEFAULT_UMASK;
    +        }
    +        final Configuration conf = getConfiguration();
    +        FsPermission.setUMask(conf, new FsPermission(dfsUmask));
    +    }
    +
    +    /**
    +     * Sub-classes provide the appropriate HDFSRecordWriter.
    +     *
    +     * @param context the process context to obtain additional configuration
    +     * @param flowFile the flow file being written
    +     * @param conf the Configuration instance
    +     * @param path the path to write to
    +     * @param schema the schema for writing
    +     * @return the HDFSRecordWriter
    +     * @throws IOException if an error occurs creating the writer or processing the schema
    +     */
    +    public abstract HDFSRecordWriter createHDFSRecordWriter(
    +            final ProcessContext context,
    +            final FlowFile flowFile,
    +            final Configuration conf,
    +            final Path path,
    +            final RecordSchema schema) throws IOException, SchemaNotFoundException;
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        // do this before getting a flow file so that we always get a chance to attempt Kerberos relogin
    +        final FileSystem fileSystem = getFileSystem();
    +        final Configuration configuration = getConfiguration();
    +        final UserGroupInformation ugi = getUserGroupInformation();
    +
    +        if (configuration == null || fileSystem == null || ugi == null) {
    +            getLogger().error("Processor not configured properly because Configuration, FileSystem, or UserGroupInformation was null");
    +            context.yield();
    +            return;
    +        }
    +
    +        final FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            context.yield();
    +            return;
    +        }
    +
    +        ugi.doAs((PrivilegedAction<Object>)() -> {
    +            Path tempDotCopyFile = null;
    +            FlowFile putFlowFile = flowFile;
    +            try {
    +                final String filenameValue = putFlowFile.getAttribute(CoreAttributes.FILENAME.key()); // TODO codec extension
    +                final String directoryValue = context.getProperty(DIRECTORY).evaluateAttributeExpressions(putFlowFile).getValue();
    +
    +                // create the directory if it doesn't exist
    +                final Path directoryPath = new Path(directoryValue);
    +                createDirectory(fileSystem, directoryPath, remoteOwner, remoteGroup);
    +
    +                // write to tempFile first and on success rename to destFile
    +                final Path tempFile = new Path(directoryPath, "." + filenameValue);
    +                final Path destFile = new Path(directoryPath, filenameValue);
    +
    +                final boolean destinationExists = fileSystem.exists(destFile) || fileSystem.exists(tempFile);
    +                final boolean shouldOverwrite = context.getProperty(OVERWRITE).asBoolean();
    +
    +                // if the tempFile or destFile already exist, and overwrite is set to false, then transfer to failure
    +                if (destinationExists && !shouldOverwrite) {
    +                    session.transfer(session.penalize(putFlowFile), REL_FAILURE);
    +                    getLogger().warn("penalizing {} and routing to failure because file with same name already exists", new Object[]{putFlowFile});
    +                    return null;
    +                }
    +
    +                final AtomicReference<Throwable> exceptionHolder = new AtomicReference<>(null);
    +                final AtomicReference<WriteResult> writeResult = new AtomicReference<>();
    +                final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
    +
    +                final FlowFile flowFileIn = putFlowFile;
    +                final StopWatch stopWatch = new StopWatch(true);
    +
    +                // Read records from the incoming FlowFile and write them the tempFile
    +                session.read(putFlowFile, (final InputStream rawIn) -> {
    +                    RecordReader recordReader = null;
    +                    HDFSRecordWriter recordWriter = null;
    +
    +                    try (final BufferedInputStream in = new BufferedInputStream(rawIn)) {
    +                        final RecordSchema destRecordSchema = schemaAccessStrategy.getSchema(flowFile, in);
    +                        recordWriter = createHDFSRecordWriter(context, flowFile, configuration, tempFile, destRecordSchema);
    +
    +                        // if we fail to create the RecordReader then we want to route to failure, so we need to
    +                        // handle this separately from the other IOExceptions which normally rout to retry
    +                        try {
    +                            recordReader = recordReaderFactory.createRecordReader(flowFileIn, in, getLogger());
    +                        } catch (Exception e) {
    +                            final RecordReaderFactoryException rrfe = new RecordReaderFactoryException("Unable to create RecordReader", e);
    +                            exceptionHolder.set(rrfe);
    +                            return;
    +                        }
    +
    +                        final RecordSet recordSet = recordReader.createRecordSet();
    +                        writeResult.set(recordWriter.write(recordSet));
    +
    +                    } catch (Exception e) {
    +                        exceptionHolder.set(e);
    +                    } finally {
    +                        IOUtils.closeQuietly(recordReader);
    +                        IOUtils.closeQuietly(recordWriter);
    +                    }
    +                });
    +                stopWatch.stop();
    +
    +                final String dataRate = stopWatch.calculateDataRate(putFlowFile.getSize());
    +                final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
    +                tempDotCopyFile = tempFile;
    +
    +                // if any errors happened within the session.read then throw the exception so we jump
    +                // into one of the appropriate catch blocks below
    +                if (exceptionHolder.get() != null) {
    +                    throw exceptionHolder.get();
    +                }
    +
    +                // Attempt to rename from the tempFile to destFile, and change owner if successfully renamed
    +                rename(fileSystem, tempFile, destFile);
    +                changeOwner(fileSystem, destFile, remoteOwner, remoteGroup);
    +
    +                getLogger().info("Wrote {} to {} in {} milliseconds at a rate of {}", new Object[]{putFlowFile, destFile, millis, dataRate});
    +
    +                putFlowFile = postProcess(context, session, putFlowFile, destFile);
    +
    +                final String outputPath = destFile.toString();
    +                final String newFilename = destFile.getName();
    +                final String hdfsPath = destFile.getParent().toString();
    +
    +                // Update the filename and absolute path attributes
    +                final Map<String,String> attributes = new HashMap<>(writeResult.get().getAttributes());
    +                attributes.put(CoreAttributes.FILENAME.key(), newFilename);
    +                attributes.put(ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
    +                attributes.put(RECORD_COUNT_ATTR, String.valueOf(writeResult.get().getRecordCount()));
    +                putFlowFile = session.putAllAttributes(putFlowFile, attributes);
    +
    +                // Send a provenance event and transfer to success
    +                final String transitUri = (outputPath.startsWith("/")) ? "hdfs:/" + outputPath : "hdfs://" + outputPath;
    +                session.getProvenanceReporter().send(putFlowFile, transitUri);
    +                session.transfer(putFlowFile, REL_SUCCESS);
    +
    +            } catch (IOException | FlowFileAccessException e) {
    +                deleteQuietly(fileSystem, tempDotCopyFile);
    +                getLogger().error("Failed to write due to {}", new Object[]{e});
    +                session.transfer(session.penalize(putFlowFile), REL_RETRY);
    +                context.yield();
    +            } catch (Throwable t) {
    +                deleteQuietly(fileSystem, tempDotCopyFile);
    +                getLogger().error("Failed to write due to {}", new Object[]{t});
    +                session.transfer(putFlowFile, REL_FAILURE);
    +            }
    +
    +            return null;
    +        });
    +    }
    +
    +    /**
    +     * This method will be called after successfully writing to the destination file and renaming the file to it's final name
    +     * in order to give sub-classes a chance to take action before transferring to success.
    +     *
    +     * @param context the context
    +     * @param session the session
    +     * @param flowFile the flow file being processed
    +     * @param destFile the destination file written to
    +     * @return an updated FlowFile reference
    +     */
    +    protected FlowFile postProcess(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, final Path destFile) {
    +        return flowFile;
    +    }
    +
    +    protected void rename(final FileSystem fileSystem, final Path srcFile, final Path destFile) throws IOException, InterruptedException, FailureException {
    +        boolean renamed = false;
    +        for (int i = 0; i < 10; i++) { // try to rename multiple times.
    +            if (fileSystem.rename(srcFile, destFile)) {
    +                renamed = true;
    +                break;// rename was successful
    +            }
    +            Thread.sleep(200L);// try waiting to let whatever might cause rename failure to resolve
    +        }
    +        if (!renamed) {
    --- End diff --
    
    I think a method Javadoc comment explaining that is fine. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #1712: NIFI-3724 - Add Put/Fetch Parquet Processors

Posted by alopresto <gi...@git.apache.org>.
Github user alopresto commented on the issue:

    https://github.com/apache/nifi/pull/1712
  
    I ran `contrib-check` and all tests and both passed fine. I have minor comments on the code above but nothing serious. 
    
    I loaded a template provided by Bryan which generated flowfiles, merged them, and wrote them to Parquet format (on local disk using the `core-site.xml` referenced above), then fetched those files and wrote them out as CSV. 
    
    ```
    hw12203:/Users/alopresto/Workspace/scratch/NIFI-3724 (master) alopresto
    \U0001f513 1s @ 14:59:53 $ ll
    total 24
    drwxr-xr-x    6 alopresto  staff   204B May  1 14:59 ./
    drwxr-xr-x  105 alopresto  staff   3.5K May  1 14:45 ../
    -rw-r--r--@   1 alopresto  staff   6.0K May  1 14:55 .DS_Store
    -rw-r--r--    1 alopresto  staff   129B May  1 14:41 core-site.xml
    drwxr-xr-x    2 alopresto  staff    68B May  1 14:59 csv/
    drwxr-xr-x    2 alopresto  staff    68B May  1 14:59 parquet/
    hw12203:/Users/alopresto/Workspace/scratch/NIFI-3724 (master) alopresto
    \U0001f513 9s @ 15:00:03 $ tl
    .
    \u251c\u2500\u2500 [6.0K]  .DS_Store
    \u251c\u2500\u2500 [ 129]  core-site.xml
    \u251c\u2500\u2500 [ 238]  csv/
    \u2502�� \u251c\u2500\u2500 [  54]  257951968574779
    \u2502�� \u251c\u2500\u2500 [3.5M]  257962982705055
    \u2502�� \u251c\u2500\u2500 [  54]  257981981063720
    \u2502�� \u251c\u2500\u2500 [3.7M]  257986105785832
    \u2502�� \u2514\u2500\u2500 [  54]  258011981257869
    \u2514\u2500\u2500 [ 476]  parquet/
        \u251c\u2500\u2500 [  16]  .257951968574779.crc
        \u251c\u2500\u2500 [6.5K]  .257962982705055.crc
        \u251c\u2500\u2500 [  16]  .257981981063720.crc
        \u251c\u2500\u2500 [6.6K]  .257986105785832.crc
        \u251c\u2500\u2500 [  16]  .258011981257869.crc
        \u251c\u2500\u2500 [6.5K]  .258013234789061.crc
        \u251c\u2500\u2500 [ 758]  257951968574779
        \u251c\u2500\u2500 [829K]  257962982705055
        \u251c\u2500\u2500 [ 758]  257981981063720
        \u251c\u2500\u2500 [842K]  257986105785832
        \u251c\u2500\u2500 [ 758]  258011981257869
        \u2514\u2500\u2500 [833K]  258013234789061
    
    2 directories, 19 files
    hw12203:/Users/alopresto/Workspace/scratch/NIFI-3724 (master) alopresto
    \U0001f513 86s @ 15:01:30 $ more csv/258011981257869
    name,favorite_number,favorite_color
    Bryan,693421,blue
    ```
    
    If the `displayName` comments are fixed, I am +1 and ready to merge. Thanks Bryan.  
    
    One minor issue:
    * On template import, the processors which referenced a controller service were invalid. Configuring each (they showed "Incompatible Controller Service Configured") by selecting the same option from the list fixed the issue. This doesn't seem like an issue introduced by any code in this PR, however.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1712: NIFI-3724 - Add Put/Fetch Parquet Processors

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1712#discussion_r114170838
  
    --- Diff: nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java ---
    @@ -0,0 +1,505 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.hadoop;
    +
    +import org.apache.commons.io.IOUtils;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.fs.permission.FsPermission;
    +import org.apache.hadoop.security.UserGroupInformation;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.configuration.DefaultSettings;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.FlowFileAccessException;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.hadoop.exception.FailureException;
    +import org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
    +import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter;
    +import org.apache.nifi.schema.access.SchemaAccessStrategy;
    +import org.apache.nifi.schema.access.SchemaAccessUtils;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.schemaregistry.services.SchemaRegistry;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.WriteResult;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.RecordSet;
    +import org.apache.nifi.util.StopWatch;
    +
    +import java.io.BufferedInputStream;
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.security.PrivilegedAction;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import static org.apache.nifi.schema.access.SchemaAccessUtils.HWX_CONTENT_ENCODED_SCHEMA;
    +import static org.apache.nifi.schema.access.SchemaAccessUtils.HWX_SCHEMA_REF_ATTRIBUTES;
    +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY;
    +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME;
    +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPERTY;
    +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REGISTRY;
    +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT;
    +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT_PROPERTY;
    +
    +/**
    + * Base class for processors that write Records to HDFS.
    + */
    +@TriggerWhenEmpty // trigger when empty so we have a chance to perform a Kerberos re-login
    +@DefaultSettings(yieldDuration = "100 ms") // decrease the default yield since we are triggering when empty
    +public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
    +
    +
    +    public static final PropertyDescriptor COMPRESSION_TYPE = new PropertyDescriptor.Builder()
    +            .name("compression-type")
    +            .displayName("Compression Type")
    +            .description("The type of compression for the file being written.")
    +            .required(true)
    +            .build();
    +
    +    public static final PropertyDescriptor OVERWRITE = new PropertyDescriptor.Builder()
    +            .name("overwrite")
    +            .displayName("Overwrite Files")
    +            .description("Whether or not to overwrite existing files in the same directory with the same name. When set to false, " +
    +                    "flow files will be routed to failure when a file exists in the same directory with the same name.")
    +            .allowableValues("true", "false")
    +            .defaultValue("false")
    +            .required(true)
    +            .build();
    +
    +    public static final PropertyDescriptor UMASK = new PropertyDescriptor.Builder()
    +            .name("permissions-umask")
    +            .displayName("Permissions umask")
    +            .description("A umask represented as an octal number which determines the permissions of files written to HDFS. " +
    +                    "This overrides the Hadoop Configuration dfs.umaskmode")
    +            .addValidator(HadoopValidators.UMASK_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor REMOTE_OWNER = new PropertyDescriptor.Builder()
    +            .name("remote-owner")
    +            .displayName("Remote Owner")
    +            .description("Changes the owner of the HDFS file to this value after it is written. " +
    +                    "This only works if NiFi is running as a user that has HDFS super user privilege to change owner")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor REMOTE_GROUP = new PropertyDescriptor.Builder()
    +            .name("remote-group")
    +            .displayName("Remote Group")
    +            .description("Changes the group of the HDFS file to this value after it is written. " +
    +                    "This only works if NiFi is running as a user that has HDFS super user privilege to change group")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
    +            .name("record-reader")
    +            .displayName("Record Reader")
    +            .description("The service for reading records from incoming flow files.")
    +            .identifiesControllerService(RecordReaderFactory.class)
    +            .required(true)
    +            .build();
    +
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("Flow Files that have been successfully processed are transferred to this relationship")
    +            .build();
    +
    +    public static final Relationship REL_RETRY = new Relationship.Builder()
    +            .name("retry")
    +            .description("Flow Files that could not be processed due to issues that can be retried are transferred to this relationship")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("Flow Files that could not be processed due to issue that cannot be retried are transferred to this relationship")
    +            .build();
    +
    +    public static final String RECORD_COUNT_ATTR = "record.count";
    +
    +    private volatile String remoteOwner;
    +    private volatile String remoteGroup;
    +    private volatile SchemaAccessStrategy schemaAccessStrategy;
    +
    +    private volatile Set<Relationship> putHdfsRecordRelationships;
    +    private volatile List<PropertyDescriptor> putHdfsRecordProperties;
    +
    +    private final List<AllowableValue> strategyList = Collections.unmodifiableList(Arrays.asList(
    +            SCHEMA_NAME_PROPERTY,
    +            SCHEMA_TEXT_PROPERTY,
    +            HWX_SCHEMA_REF_ATTRIBUTES,
    +            HWX_CONTENT_ENCODED_SCHEMA
    +    ));
    +
    +
    +    @Override
    +    protected final void init(final ProcessorInitializationContext context) {
    +        super.init(context);
    +
    +        final Set<Relationship> rels = new HashSet<>();
    +        rels.add(REL_SUCCESS);
    +        rels.add(REL_RETRY);
    +        rels.add(REL_FAILURE);
    +        this.putHdfsRecordRelationships = Collections.unmodifiableSet(rels);
    +
    +        final List<PropertyDescriptor> props = new ArrayList<>(properties);
    +        props.add(RECORD_READER);
    +
    +        props.add(new PropertyDescriptor.Builder()
    +                .fromPropertyDescriptor(DIRECTORY)
    +                .description("The parent directory to which files should be written. Will be created if it doesn't exist.")
    +                .build());
    +
    +        final AllowableValue[] strategies = getSchemaAccessStrategyValues().toArray(new AllowableValue[0]);
    +
    +        props.add(new PropertyDescriptor.Builder()
    +                .fromPropertyDescriptor(SCHEMA_ACCESS_STRATEGY)
    +                .description("Specifies how to obtain the schema that is to be used for writing the data.")
    +                .allowableValues(strategies)
    +                .defaultValue(getDefaultSchemaAccessStrategy().getValue())
    +                .build());
    +
    +        props.add(SCHEMA_REGISTRY);
    +        props.add(SCHEMA_NAME);
    +        props.add(SCHEMA_TEXT);
    +
    +        final AllowableValue[] compressionTypes = getCompressionTypes(context).toArray(new AllowableValue[0]);
    +
    +        props.add(new PropertyDescriptor.Builder()
    +                .fromPropertyDescriptor(COMPRESSION_TYPE)
    +                .allowableValues(compressionTypes)
    +                .defaultValue(getDefaultCompressionType(context))
    +                .build());
    +
    +        props.add(OVERWRITE);
    +        props.add(UMASK);
    +        props.add(REMOTE_GROUP);
    +        props.add(REMOTE_OWNER);
    +        props.addAll(getAdditionalProperties());
    +        this.putHdfsRecordProperties = Collections.unmodifiableList(props);
    +    }
    +
    +    protected List<AllowableValue> getSchemaAccessStrategyValues() {
    +        return strategyList;
    +    }
    +
    +    protected AllowableValue getDefaultSchemaAccessStrategy() {
    +        return SCHEMA_NAME_PROPERTY;
    +    }
    +
    +    private PropertyDescriptor getSchemaAcessStrategyDescriptor() {
    +        return getPropertyDescriptor(SCHEMA_ACCESS_STRATEGY.getName());
    +    }
    +
    +    /**
    +     * @param context the initialization context
    +     * @return the possible compression types
    +     */
    +    public abstract List<AllowableValue> getCompressionTypes(final ProcessorInitializationContext context);
    +
    +    /**
    +     * @param context the initialization context
    +     * @return the default compression type
    +     */
    +    public abstract String getDefaultCompressionType(final ProcessorInitializationContext context);
    +
    +    /**
    +     * Allows sub-classes to add additional properties, called from initialize.
    +     *
    +     * @return additional properties to add to the overall list
    +     */
    +    public List<PropertyDescriptor> getAdditionalProperties() {
    +        return Collections.emptyList();
    +    }
    +
    +    @Override
    +    public final Set<Relationship> getRelationships() {
    +        return putHdfsRecordRelationships;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +       return putHdfsRecordProperties;
    +    }
    +
    +    @Override
    +    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
    +        final String schemaAccessStrategy = validationContext.getProperty(getSchemaAcessStrategyDescriptor()).getValue();
    +        return SchemaAccessUtils.validateSchemaAccessStrategy(validationContext, schemaAccessStrategy, getSchemaAccessStrategyValues());
    +    }
    +
    +    @OnScheduled
    +    public final void onScheduled(final ProcessContext context) throws IOException {
    +        super.abstractOnScheduled(context);
    +
    +        final SchemaRegistry schemaRegistry = context.getProperty(SCHEMA_REGISTRY).asControllerService(SchemaRegistry.class);
    +
    +        final PropertyDescriptor descriptor = getPropertyDescriptor(SCHEMA_ACCESS_STRATEGY.getName());
    +        final String schemaAccess = context.getProperty(descriptor).getValue();
    +        this.schemaAccessStrategy = SchemaAccessUtils.getSchemaAccessStrategy(schemaAccess, schemaRegistry, context);
    +
    +        this.remoteOwner = context.getProperty(REMOTE_OWNER).getValue();
    +        this.remoteGroup = context.getProperty(REMOTE_GROUP).getValue();
    +
    +        // Set umask once, to avoid thread safety issues doing it in onTrigger
    +        final PropertyValue umaskProp = context.getProperty(UMASK);
    +        final short dfsUmask;
    +        if (umaskProp.isSet()) {
    +            dfsUmask = Short.parseShort(umaskProp.getValue(), 8);
    +        } else {
    +            dfsUmask = FsPermission.DEFAULT_UMASK;
    +        }
    +        final Configuration conf = getConfiguration();
    +        FsPermission.setUMask(conf, new FsPermission(dfsUmask));
    +    }
    +
    +    /**
    +     * Sub-classes provide the appropriate HDFSRecordWriter.
    +     *
    +     * @param context the process context to obtain additional configuration
    +     * @param flowFile the flow file being written
    +     * @param conf the Configuration instance
    +     * @param path the path to write to
    +     * @param schema the schema for writing
    +     * @return the HDFSRecordWriter
    +     * @throws IOException if an error occurs creating the writer or processing the schema
    +     */
    +    public abstract HDFSRecordWriter createHDFSRecordWriter(
    +            final ProcessContext context,
    +            final FlowFile flowFile,
    +            final Configuration conf,
    +            final Path path,
    +            final RecordSchema schema) throws IOException, SchemaNotFoundException;
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        // do this before getting a flow file so that we always get a chance to attempt Kerberos relogin
    +        final FileSystem fileSystem = getFileSystem();
    +        final Configuration configuration = getConfiguration();
    +        final UserGroupInformation ugi = getUserGroupInformation();
    +
    +        if (configuration == null || fileSystem == null || ugi == null) {
    +            getLogger().error("Processor not configured properly because Configuration, FileSystem, or UserGroupInformation was null");
    +            context.yield();
    +            return;
    +        }
    +
    +        final FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            context.yield();
    +            return;
    +        }
    +
    +        ugi.doAs((PrivilegedAction<Object>)() -> {
    +            Path tempDotCopyFile = null;
    +            FlowFile putFlowFile = flowFile;
    +            try {
    +                final String filenameValue = putFlowFile.getAttribute(CoreAttributes.FILENAME.key()); // TODO codec extension
    +                final String directoryValue = context.getProperty(DIRECTORY).evaluateAttributeExpressions(putFlowFile).getValue();
    +
    +                // create the directory if it doesn't exist
    +                final Path directoryPath = new Path(directoryValue);
    +                createDirectory(fileSystem, directoryPath, remoteOwner, remoteGroup);
    +
    +                // write to tempFile first and on success rename to destFile
    +                final Path tempFile = new Path(directoryPath, "." + filenameValue);
    +                final Path destFile = new Path(directoryPath, filenameValue);
    +
    +                final boolean destinationExists = fileSystem.exists(destFile) || fileSystem.exists(tempFile);
    +                final boolean shouldOverwrite = context.getProperty(OVERWRITE).asBoolean();
    +
    +                // if the tempFile or destFile already exist, and overwrite is set to false, then transfer to failure
    +                if (destinationExists && !shouldOverwrite) {
    +                    session.transfer(session.penalize(putFlowFile), REL_FAILURE);
    +                    getLogger().warn("penalizing {} and routing to failure because file with same name already exists", new Object[]{putFlowFile});
    +                    return null;
    +                }
    +
    +                final AtomicReference<Throwable> exceptionHolder = new AtomicReference<>(null);
    +                final AtomicReference<WriteResult> writeResult = new AtomicReference<>();
    +                final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
    +
    +                final FlowFile flowFileIn = putFlowFile;
    +                final StopWatch stopWatch = new StopWatch(true);
    +
    +                // Read records from the incoming FlowFile and write them the tempFile
    +                session.read(putFlowFile, (final InputStream rawIn) -> {
    +                    RecordReader recordReader = null;
    +                    HDFSRecordWriter recordWriter = null;
    +
    +                    try (final BufferedInputStream in = new BufferedInputStream(rawIn)) {
    +                        final RecordSchema destRecordSchema = schemaAccessStrategy.getSchema(flowFile, in);
    +                        recordWriter = createHDFSRecordWriter(context, flowFile, configuration, tempFile, destRecordSchema);
    +
    +                        // if we fail to create the RecordReader then we want to route to failure, so we need to
    +                        // handle this separately from the other IOExceptions which normally rout to retry
    +                        try {
    +                            recordReader = recordReaderFactory.createRecordReader(flowFileIn, in, getLogger());
    +                        } catch (Exception e) {
    +                            final RecordReaderFactoryException rrfe = new RecordReaderFactoryException("Unable to create RecordReader", e);
    +                            exceptionHolder.set(rrfe);
    +                            return;
    +                        }
    +
    +                        final RecordSet recordSet = recordReader.createRecordSet();
    +                        writeResult.set(recordWriter.write(recordSet));
    +
    +                    } catch (Exception e) {
    +                        exceptionHolder.set(e);
    +                    } finally {
    +                        IOUtils.closeQuietly(recordReader);
    +                        IOUtils.closeQuietly(recordWriter);
    +                    }
    +                });
    +                stopWatch.stop();
    +
    +                final String dataRate = stopWatch.calculateDataRate(putFlowFile.getSize());
    +                final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
    +                tempDotCopyFile = tempFile;
    +
    +                // if any errors happened within the session.read then throw the exception so we jump
    +                // into one of the appropriate catch blocks below
    +                if (exceptionHolder.get() != null) {
    +                    throw exceptionHolder.get();
    +                }
    +
    +                // Attempt to rename from the tempFile to destFile, and change owner if successfully renamed
    +                rename(fileSystem, tempFile, destFile);
    +                changeOwner(fileSystem, destFile, remoteOwner, remoteGroup);
    +
    +                getLogger().info("Wrote {} to {} in {} milliseconds at a rate of {}", new Object[]{putFlowFile, destFile, millis, dataRate});
    +
    +                putFlowFile = postProcess(context, session, putFlowFile, destFile);
    +
    +                final String outputPath = destFile.toString();
    +                final String newFilename = destFile.getName();
    +                final String hdfsPath = destFile.getParent().toString();
    +
    +                // Update the filename and absolute path attributes
    +                final Map<String,String> attributes = new HashMap<>(writeResult.get().getAttributes());
    +                attributes.put(CoreAttributes.FILENAME.key(), newFilename);
    +                attributes.put(ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
    +                attributes.put(RECORD_COUNT_ATTR, String.valueOf(writeResult.get().getRecordCount()));
    +                putFlowFile = session.putAllAttributes(putFlowFile, attributes);
    +
    +                // Send a provenance event and transfer to success
    +                final String transitUri = (outputPath.startsWith("/")) ? "hdfs:/" + outputPath : "hdfs://" + outputPath;
    +                session.getProvenanceReporter().send(putFlowFile, transitUri);
    +                session.transfer(putFlowFile, REL_SUCCESS);
    +
    +            } catch (IOException | FlowFileAccessException e) {
    +                deleteQuietly(fileSystem, tempDotCopyFile);
    +                getLogger().error("Failed to write due to {}", new Object[]{e});
    +                session.transfer(session.penalize(putFlowFile), REL_RETRY);
    +                context.yield();
    +            } catch (Throwable t) {
    +                deleteQuietly(fileSystem, tempDotCopyFile);
    +                getLogger().error("Failed to write due to {}", new Object[]{t});
    +                session.transfer(putFlowFile, REL_FAILURE);
    +            }
    +
    +            return null;
    +        });
    +    }
    +
    +    /**
    +     * This method will be called after successfully writing to the destination file and renaming the file to it's final name
    +     * in order to give sub-classes a chance to take action before transferring to success.
    +     *
    +     * @param context the context
    +     * @param session the session
    +     * @param flowFile the flow file being processed
    +     * @param destFile the destination file written to
    +     * @return an updated FlowFile reference
    +     */
    +    protected FlowFile postProcess(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, final Path destFile) {
    +        return flowFile;
    +    }
    +
    +    protected void rename(final FileSystem fileSystem, final Path srcFile, final Path destFile) throws IOException, InterruptedException, FailureException {
    +        boolean renamed = false;
    +        for (int i = 0; i < 10; i++) { // try to rename multiple times.
    +            if (fileSystem.rename(srcFile, destFile)) {
    +                renamed = true;
    +                break;// rename was successful
    +            }
    +            Thread.sleep(200L);// try waiting to let whatever might cause rename failure to resolve
    +        }
    +        if (!renamed) {
    --- End diff --
    
    The behavior is that it will try up to 10 times to rename the dot file to the final name, and then if it still hasn't been renamed it will delete the dot file and route the flow file to failure. This behavior came from the existing PutHDFS so I kept the same behavior for consistency, but I can add something to the capability description of PutParquet describing this behavior. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1712: NIFI-3724 - Add Put/Fetch Parquet Processors

Posted by alopresto <gi...@git.apache.org>.
Github user alopresto commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1712#discussion_r114153996
  
    --- Diff: nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java ---
    @@ -0,0 +1,505 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.hadoop;
    +
    +import org.apache.commons.io.IOUtils;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.fs.permission.FsPermission;
    +import org.apache.hadoop.security.UserGroupInformation;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.configuration.DefaultSettings;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.FlowFileAccessException;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.hadoop.exception.FailureException;
    +import org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
    +import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter;
    +import org.apache.nifi.schema.access.SchemaAccessStrategy;
    +import org.apache.nifi.schema.access.SchemaAccessUtils;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.schemaregistry.services.SchemaRegistry;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.WriteResult;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.RecordSet;
    +import org.apache.nifi.util.StopWatch;
    +
    +import java.io.BufferedInputStream;
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.security.PrivilegedAction;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import static org.apache.nifi.schema.access.SchemaAccessUtils.HWX_CONTENT_ENCODED_SCHEMA;
    +import static org.apache.nifi.schema.access.SchemaAccessUtils.HWX_SCHEMA_REF_ATTRIBUTES;
    +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY;
    +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME;
    +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPERTY;
    +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REGISTRY;
    +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT;
    +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT_PROPERTY;
    +
    +/**
    + * Base class for processors that write Records to HDFS.
    + */
    +@TriggerWhenEmpty // trigger when empty so we have a chance to perform a Kerberos re-login
    +@DefaultSettings(yieldDuration = "100 ms") // decrease the default yield since we are triggering when empty
    +public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
    +
    +
    +    public static final PropertyDescriptor COMPRESSION_TYPE = new PropertyDescriptor.Builder()
    +            .name("compression-type")
    +            .displayName("Compression Type")
    +            .description("The type of compression for the file being written.")
    +            .required(true)
    +            .build();
    +
    +    public static final PropertyDescriptor OVERWRITE = new PropertyDescriptor.Builder()
    +            .name("overwrite")
    +            .displayName("Overwrite Files")
    +            .description("Whether or not to overwrite existing files in the same directory with the same name. When set to false, " +
    +                    "flow files will be routed to failure when a file exists in the same directory with the same name.")
    +            .allowableValues("true", "false")
    +            .defaultValue("false")
    +            .required(true)
    +            .build();
    +
    +    public static final PropertyDescriptor UMASK = new PropertyDescriptor.Builder()
    +            .name("permissions-umask")
    +            .displayName("Permissions umask")
    +            .description("A umask represented as an octal number which determines the permissions of files written to HDFS. " +
    +                    "This overrides the Hadoop Configuration dfs.umaskmode")
    +            .addValidator(HadoopValidators.UMASK_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor REMOTE_OWNER = new PropertyDescriptor.Builder()
    +            .name("remote-owner")
    +            .displayName("Remote Owner")
    +            .description("Changes the owner of the HDFS file to this value after it is written. " +
    +                    "This only works if NiFi is running as a user that has HDFS super user privilege to change owner")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor REMOTE_GROUP = new PropertyDescriptor.Builder()
    +            .name("remote-group")
    +            .displayName("Remote Group")
    +            .description("Changes the group of the HDFS file to this value after it is written. " +
    +                    "This only works if NiFi is running as a user that has HDFS super user privilege to change group")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
    +            .name("record-reader")
    +            .displayName("Record Reader")
    +            .description("The service for reading records from incoming flow files.")
    +            .identifiesControllerService(RecordReaderFactory.class)
    +            .required(true)
    +            .build();
    +
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("Flow Files that have been successfully processed are transferred to this relationship")
    +            .build();
    +
    +    public static final Relationship REL_RETRY = new Relationship.Builder()
    +            .name("retry")
    +            .description("Flow Files that could not be processed due to issues that can be retried are transferred to this relationship")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("Flow Files that could not be processed due to issue that cannot be retried are transferred to this relationship")
    +            .build();
    +
    +    public static final String RECORD_COUNT_ATTR = "record.count";
    +
    +    private volatile String remoteOwner;
    +    private volatile String remoteGroup;
    +    private volatile SchemaAccessStrategy schemaAccessStrategy;
    +
    +    private volatile Set<Relationship> putHdfsRecordRelationships;
    +    private volatile List<PropertyDescriptor> putHdfsRecordProperties;
    +
    +    private final List<AllowableValue> strategyList = Collections.unmodifiableList(Arrays.asList(
    +            SCHEMA_NAME_PROPERTY,
    +            SCHEMA_TEXT_PROPERTY,
    +            HWX_SCHEMA_REF_ATTRIBUTES,
    +            HWX_CONTENT_ENCODED_SCHEMA
    +    ));
    +
    +
    +    @Override
    +    protected final void init(final ProcessorInitializationContext context) {
    +        super.init(context);
    +
    +        final Set<Relationship> rels = new HashSet<>();
    +        rels.add(REL_SUCCESS);
    +        rels.add(REL_RETRY);
    +        rels.add(REL_FAILURE);
    +        this.putHdfsRecordRelationships = Collections.unmodifiableSet(rels);
    +
    +        final List<PropertyDescriptor> props = new ArrayList<>(properties);
    +        props.add(RECORD_READER);
    +
    +        props.add(new PropertyDescriptor.Builder()
    +                .fromPropertyDescriptor(DIRECTORY)
    +                .description("The parent directory to which files should be written. Will be created if it doesn't exist.")
    +                .build());
    +
    +        final AllowableValue[] strategies = getSchemaAccessStrategyValues().toArray(new AllowableValue[0]);
    +
    +        props.add(new PropertyDescriptor.Builder()
    +                .fromPropertyDescriptor(SCHEMA_ACCESS_STRATEGY)
    +                .description("Specifies how to obtain the schema that is to be used for writing the data.")
    +                .allowableValues(strategies)
    +                .defaultValue(getDefaultSchemaAccessStrategy().getValue())
    +                .build());
    +
    +        props.add(SCHEMA_REGISTRY);
    +        props.add(SCHEMA_NAME);
    +        props.add(SCHEMA_TEXT);
    +
    +        final AllowableValue[] compressionTypes = getCompressionTypes(context).toArray(new AllowableValue[0]);
    +
    +        props.add(new PropertyDescriptor.Builder()
    +                .fromPropertyDescriptor(COMPRESSION_TYPE)
    +                .allowableValues(compressionTypes)
    +                .defaultValue(getDefaultCompressionType(context))
    +                .build());
    +
    +        props.add(OVERWRITE);
    +        props.add(UMASK);
    +        props.add(REMOTE_GROUP);
    +        props.add(REMOTE_OWNER);
    +        props.addAll(getAdditionalProperties());
    +        this.putHdfsRecordProperties = Collections.unmodifiableList(props);
    +    }
    +
    +    protected List<AllowableValue> getSchemaAccessStrategyValues() {
    +        return strategyList;
    +    }
    +
    +    protected AllowableValue getDefaultSchemaAccessStrategy() {
    +        return SCHEMA_NAME_PROPERTY;
    +    }
    +
    +    private PropertyDescriptor getSchemaAcessStrategyDescriptor() {
    +        return getPropertyDescriptor(SCHEMA_ACCESS_STRATEGY.getName());
    +    }
    +
    +    /**
    +     * @param context the initialization context
    +     * @return the possible compression types
    +     */
    +    public abstract List<AllowableValue> getCompressionTypes(final ProcessorInitializationContext context);
    +
    +    /**
    +     * @param context the initialization context
    +     * @return the default compression type
    +     */
    +    public abstract String getDefaultCompressionType(final ProcessorInitializationContext context);
    +
    +    /**
    +     * Allows sub-classes to add additional properties, called from initialize.
    +     *
    +     * @return additional properties to add to the overall list
    +     */
    +    public List<PropertyDescriptor> getAdditionalProperties() {
    +        return Collections.emptyList();
    +    }
    +
    +    @Override
    +    public final Set<Relationship> getRelationships() {
    +        return putHdfsRecordRelationships;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +       return putHdfsRecordProperties;
    +    }
    +
    +    @Override
    +    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
    +        final String schemaAccessStrategy = validationContext.getProperty(getSchemaAcessStrategyDescriptor()).getValue();
    +        return SchemaAccessUtils.validateSchemaAccessStrategy(validationContext, schemaAccessStrategy, getSchemaAccessStrategyValues());
    +    }
    +
    +    @OnScheduled
    +    public final void onScheduled(final ProcessContext context) throws IOException {
    +        super.abstractOnScheduled(context);
    +
    +        final SchemaRegistry schemaRegistry = context.getProperty(SCHEMA_REGISTRY).asControllerService(SchemaRegistry.class);
    +
    +        final PropertyDescriptor descriptor = getPropertyDescriptor(SCHEMA_ACCESS_STRATEGY.getName());
    +        final String schemaAccess = context.getProperty(descriptor).getValue();
    +        this.schemaAccessStrategy = SchemaAccessUtils.getSchemaAccessStrategy(schemaAccess, schemaRegistry, context);
    +
    +        this.remoteOwner = context.getProperty(REMOTE_OWNER).getValue();
    +        this.remoteGroup = context.getProperty(REMOTE_GROUP).getValue();
    +
    +        // Set umask once, to avoid thread safety issues doing it in onTrigger
    +        final PropertyValue umaskProp = context.getProperty(UMASK);
    +        final short dfsUmask;
    +        if (umaskProp.isSet()) {
    +            dfsUmask = Short.parseShort(umaskProp.getValue(), 8);
    +        } else {
    +            dfsUmask = FsPermission.DEFAULT_UMASK;
    +        }
    +        final Configuration conf = getConfiguration();
    +        FsPermission.setUMask(conf, new FsPermission(dfsUmask));
    +    }
    +
    +    /**
    +     * Sub-classes provide the appropriate HDFSRecordWriter.
    +     *
    +     * @param context the process context to obtain additional configuration
    +     * @param flowFile the flow file being written
    +     * @param conf the Configuration instance
    +     * @param path the path to write to
    +     * @param schema the schema for writing
    +     * @return the HDFSRecordWriter
    +     * @throws IOException if an error occurs creating the writer or processing the schema
    +     */
    +    public abstract HDFSRecordWriter createHDFSRecordWriter(
    +            final ProcessContext context,
    +            final FlowFile flowFile,
    +            final Configuration conf,
    +            final Path path,
    +            final RecordSchema schema) throws IOException, SchemaNotFoundException;
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        // do this before getting a flow file so that we always get a chance to attempt Kerberos relogin
    +        final FileSystem fileSystem = getFileSystem();
    +        final Configuration configuration = getConfiguration();
    +        final UserGroupInformation ugi = getUserGroupInformation();
    +
    +        if (configuration == null || fileSystem == null || ugi == null) {
    +            getLogger().error("Processor not configured properly because Configuration, FileSystem, or UserGroupInformation was null");
    +            context.yield();
    +            return;
    +        }
    +
    +        final FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            context.yield();
    +            return;
    +        }
    +
    +        ugi.doAs((PrivilegedAction<Object>)() -> {
    +            Path tempDotCopyFile = null;
    +            FlowFile putFlowFile = flowFile;
    +            try {
    +                final String filenameValue = putFlowFile.getAttribute(CoreAttributes.FILENAME.key()); // TODO codec extension
    +                final String directoryValue = context.getProperty(DIRECTORY).evaluateAttributeExpressions(putFlowFile).getValue();
    +
    +                // create the directory if it doesn't exist
    +                final Path directoryPath = new Path(directoryValue);
    +                createDirectory(fileSystem, directoryPath, remoteOwner, remoteGroup);
    +
    +                // write to tempFile first and on success rename to destFile
    +                final Path tempFile = new Path(directoryPath, "." + filenameValue);
    +                final Path destFile = new Path(directoryPath, filenameValue);
    +
    +                final boolean destinationExists = fileSystem.exists(destFile) || fileSystem.exists(tempFile);
    +                final boolean shouldOverwrite = context.getProperty(OVERWRITE).asBoolean();
    +
    +                // if the tempFile or destFile already exist, and overwrite is set to false, then transfer to failure
    +                if (destinationExists && !shouldOverwrite) {
    +                    session.transfer(session.penalize(putFlowFile), REL_FAILURE);
    +                    getLogger().warn("penalizing {} and routing to failure because file with same name already exists", new Object[]{putFlowFile});
    +                    return null;
    +                }
    +
    +                final AtomicReference<Throwable> exceptionHolder = new AtomicReference<>(null);
    +                final AtomicReference<WriteResult> writeResult = new AtomicReference<>();
    +                final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
    +
    +                final FlowFile flowFileIn = putFlowFile;
    +                final StopWatch stopWatch = new StopWatch(true);
    +
    +                // Read records from the incoming FlowFile and write them the tempFile
    +                session.read(putFlowFile, (final InputStream rawIn) -> {
    +                    RecordReader recordReader = null;
    +                    HDFSRecordWriter recordWriter = null;
    +
    +                    try (final BufferedInputStream in = new BufferedInputStream(rawIn)) {
    +                        final RecordSchema destRecordSchema = schemaAccessStrategy.getSchema(flowFile, in);
    +                        recordWriter = createHDFSRecordWriter(context, flowFile, configuration, tempFile, destRecordSchema);
    +
    +                        // if we fail to create the RecordReader then we want to route to failure, so we need to
    +                        // handle this separately from the other IOExceptions which normally rout to retry
    --- End diff --
    
    *rout* -> *route*. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1712: NIFI-3724 - Add Put/Fetch Parquet Processors

Posted by alopresto <gi...@git.apache.org>.
Github user alopresto commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1712#discussion_r114150872
  
    --- Diff: nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java ---
    @@ -0,0 +1,496 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.avro;
    +
    +import org.apache.avro.LogicalType;
    +import org.apache.avro.LogicalTypes;
    +import org.apache.avro.Schema;
    +import org.apache.avro.Schema.Field;
    +import org.apache.avro.Schema.Type;
    +import org.apache.avro.generic.GenericData;
    +import org.apache.avro.generic.GenericFixed;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.avro.util.Utf8;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.DataType;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.SchemaIdentifier;
    +import org.apache.nifi.serialization.record.util.DataTypeUtils;
    +import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
    +
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.time.Duration;
    +import java.time.temporal.ChronoUnit;
    +import java.util.ArrayList;
    +import java.util.Date;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.concurrent.TimeUnit;
    +import java.util.stream.Collectors;
    +
    +public class AvroTypeUtil {
    +    public static final String AVRO_SCHEMA_FORMAT = "avro";
    +
    +    public static Schema extractAvroSchema(final RecordSchema recordSchema) throws SchemaNotFoundException {
    +        final Optional<String> schemaFormatOption = recordSchema.getSchemaFormat();
    +        if (!schemaFormatOption.isPresent()) {
    +            throw new SchemaNotFoundException("No Schema Format was present in the RecordSchema");
    +        }
    +
    +        final String schemaFormat = schemaFormatOption.get();
    +        if (!schemaFormat.equals(AVRO_SCHEMA_FORMAT)) {
    +            throw new SchemaNotFoundException("Schema provided is not in Avro format");
    +        }
    +
    +        final Optional<String> textOption = recordSchema.getSchemaText();
    +        if (!textOption.isPresent()) {
    +            throw new SchemaNotFoundException("No Schema text was present in the RecordSchema");
    +        }
    +
    +        final String text = textOption.get();
    +        return new Schema.Parser().parse(text);
    +    }
    +
    +    public static DataType determineDataType(final Schema avroSchema) {
    +        final Type avroType = avroSchema.getType();
    --- End diff --
    
    Same comment for `null` check. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---