You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/05/01 20:11:59 UTC

[06/17] nifi git commit: NIFI-3724 - Initial commit of Parquet bundle with PutParquet and FetchParquet - Creating nifi-records-utils to share utility code from record services - Refactoring Parquet tests to use MockRecorderParser and MockRecordWriter - R

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/DateTimeUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/DateTimeUtils.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/DateTimeUtils.java
new file mode 100644
index 0000000..d5ab8c5
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/DateTimeUtils.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.serialization.record.RecordFieldType;
+
+public class DateTimeUtils {
+    public static final PropertyDescriptor DATE_FORMAT = new PropertyDescriptor.Builder()
+        .name("Date Format")
+        .description("Specifies the format to use when reading/writing Date fields")
+        .expressionLanguageSupported(false)
+        .defaultValue(RecordFieldType.DATE.getDefaultFormat())
+        .addValidator(new SimpleDateFormatValidator())
+        .required(true)
+        .build();
+
+    public static final PropertyDescriptor TIME_FORMAT = new PropertyDescriptor.Builder()
+        .name("Time Format")
+        .description("Specifies the format to use when reading/writing Time fields")
+        .expressionLanguageSupported(false)
+        .defaultValue(RecordFieldType.TIME.getDefaultFormat())
+        .addValidator(new SimpleDateFormatValidator())
+        .required(true)
+        .build();
+
+    public static final PropertyDescriptor TIMESTAMP_FORMAT = new PropertyDescriptor.Builder()
+        .name("Timestamp Format")
+        .description("Specifies the format to use when reading/writing Timestamp fields")
+        .expressionLanguageSupported(false)
+        .defaultValue(RecordFieldType.TIMESTAMP.getDefaultFormat())
+        .addValidator(new SimpleDateFormatValidator())
+        .required(true)
+        .build();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/SimpleDateFormatValidator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/SimpleDateFormatValidator.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/SimpleDateFormatValidator.java
new file mode 100644
index 0000000..f25749b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/SimpleDateFormatValidator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization;
+
+import java.text.SimpleDateFormat;
+
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+
+public class SimpleDateFormatValidator implements Validator {
+
+    @Override
+    public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+        try {
+            new SimpleDateFormat(input);
+        } catch (final Exception e) {
+            return new ValidationResult.Builder()
+                .input(input)
+                .subject(subject)
+                .valid(false)
+                .explanation("Invalid Date format: " + e.getMessage())
+                .build();
+        }
+
+        return new ValidationResult.Builder()
+            .input(input)
+            .subject(subject)
+            .valid(true)
+            .build();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/pom.xml
new file mode 100644
index 0000000..fea0920
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/pom.xml
@@ -0,0 +1,33 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-extension-utils</artifactId>
+        <version>1.2.0-SNAPSHOT</version>
+    </parent>
+    <packaging>pom</packaging>
+    <artifactId>nifi-record-utils</artifactId>
+
+    <modules>
+        <module>nifi-avro-record-utils</module>
+        <module>nifi-standard-record-utils</module>
+	<module>nifi-hadoop-record-utils</module>
+	<module>nifi-mock-record-utils</module>
+    </modules>
+
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/pom.xml
new file mode 100644
index 0000000..4e1a0f5
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/pom.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-nar-bundles</artifactId>
+        <version>1.2.0-SNAPSHOT</version>
+    </parent>
+    <packaging>pom</packaging>
+    <artifactId>nifi-extension-utils</artifactId>
+    <description>
+        This module contains reusable utilities related to extensions that can be shared across NARs.
+    </description>
+
+    <modules>
+        <module>nifi-record-utils</module>
+        <module>nifi-hadoop-utils</module>
+        <module>nifi-processor-utils</module>
+    </modules>
+
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
index 7429ea8..493caf0 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
@@ -33,12 +33,6 @@
         <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-hadoop-utils</artifactId>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.hadoop</groupId>
-                    <artifactId>hadoop-common</artifactId>
-                </exclusion>
-            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
@@ -50,16 +44,16 @@
             <scope>provided</scope>
         </dependency>
         <dependency>
-            <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-distributed-cache-client-service-api</artifactId>
-        </dependency>
-        <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-hdfs</artifactId>
             <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-distributed-cache-client-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-mock</artifactId>
             <scope>test</scope>
         </dependency>

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
deleted file mode 100644
index dac2b30..0000000
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
+++ /dev/null
@@ -1,580 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.hadoop;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.BZip2Codec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.io.compress.GzipCodec;
-import org.apache.hadoop.io.compress.Lz4Codec;
-import org.apache.hadoop.io.compress.SnappyCodec;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.Validator;
-import org.apache.nifi.hadoop.KerberosProperties;
-import org.apache.nifi.hadoop.SecurityUtil;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessorInitializationContext;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.util.StringUtils;
-
-import javax.net.SocketFactory;
-import java.io.File;
-import java.io.IOException;
-import java.lang.ref.WeakReference;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.URI;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.WeakHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * This is a base class that is helpful when building processors interacting with HDFS.
- */
-@RequiresInstanceClassLoading(cloneAncestorResources = true)
-public abstract class AbstractHadoopProcessor extends AbstractProcessor {
-    /**
-     * Compression Type Enum
-     */
-    public enum CompressionType {
-        NONE,
-        DEFAULT,
-        BZIP,
-        GZIP,
-        LZ4,
-        SNAPPY,
-        AUTOMATIC;
-
-        @Override
-        public String toString() {
-            switch (this) {
-                case NONE: return "NONE";
-                case DEFAULT: return DefaultCodec.class.getName();
-                case BZIP: return BZip2Codec.class.getName();
-                case GZIP: return GzipCodec.class.getName();
-                case LZ4: return Lz4Codec.class.getName();
-                case SNAPPY: return SnappyCodec.class.getName();
-                case AUTOMATIC: return "Automatically Detected";
-            }
-            return null;
-        }
-    }
-
-    // properties
-    public static final PropertyDescriptor HADOOP_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder()
-            .name("Hadoop Configuration Resources")
-            .description("A file or comma separated list of files which contains the Hadoop file system configuration. Without this, Hadoop "
-                    + "will search the classpath for a 'core-site.xml' and 'hdfs-site.xml' file or will revert to a default configuration.")
-            .required(false)
-            .addValidator(createMultipleFilesExistValidator())
-            .build();
-
-    public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
-            .name("Directory")
-            .description("The HDFS directory from which files should be read")
-            .required(true)
-            .addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
-            .expressionLanguageSupported(true)
-            .build();
-
-    public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder()
-            .name("Compression codec")
-            .required(true)
-            .allowableValues(CompressionType.values())
-            .defaultValue(CompressionType.NONE.toString())
-            .build();
-
-    public static final PropertyDescriptor KERBEROS_RELOGIN_PERIOD = new PropertyDescriptor.Builder()
-            .name("Kerberos Relogin Period").required(false)
-            .description("Period of time which should pass before attempting a kerberos relogin")
-            .defaultValue("4 hours")
-            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-
-    public static final PropertyDescriptor ADDITIONAL_CLASSPATH_RESOURCES = new PropertyDescriptor.Builder()
-            .name("Additional Classpath Resources")
-            .description("A comma-separated list of paths to files and/or directories that will be added to the classpath. When specifying a " +
-                    "directory, all files with in the directory will be added to the classpath, but further sub-directories will not be included.")
-            .required(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .dynamicallyModifiesClasspath(true)
-            .build();
-
-    private static final Object RESOURCES_LOCK = new Object();
-
-    private long kerberosReloginThreshold;
-    private long lastKerberosReloginTime;
-    protected KerberosProperties kerberosProperties;
-    protected List<PropertyDescriptor> properties;
-    private volatile File kerberosConfigFile = null;
-
-    // variables shared by all threads of this processor
-    // Hadoop Configuration, Filesystem, and UserGroupInformation (optional)
-    private final AtomicReference<HdfsResources> hdfsResources = new AtomicReference<>();
-
-    // Holder of cached Configuration information so validation does not reload the same config over and over
-    private final AtomicReference<ValidationResources> validationResourceHolder = new AtomicReference<>();
-
-    @Override
-    protected void init(ProcessorInitializationContext context) {
-        hdfsResources.set(new HdfsResources(null, null, null));
-
-        kerberosConfigFile = context.getKerberosConfigurationFile();
-        kerberosProperties = getKerberosProperties(kerberosConfigFile);
-
-        List<PropertyDescriptor> props = new ArrayList<>();
-        props.add(HADOOP_CONFIGURATION_RESOURCES);
-        props.add(kerberosProperties.getKerberosPrincipal());
-        props.add(kerberosProperties.getKerberosKeytab());
-        props.add(KERBEROS_RELOGIN_PERIOD);
-        props.add(ADDITIONAL_CLASSPATH_RESOURCES);
-        properties = Collections.unmodifiableList(props);
-    }
-
-    protected KerberosProperties getKerberosProperties(File kerberosConfigFile) {
-        return new KerberosProperties(kerberosConfigFile);
-    }
-
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return properties;
-    }
-
-    @Override
-    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
-        final String configResources = validationContext.getProperty(HADOOP_CONFIGURATION_RESOURCES).getValue();
-        final String principal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).getValue();
-        final String keytab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).getValue();
-
-        final List<ValidationResult> results = new ArrayList<>();
-
-        if (!StringUtils.isBlank(configResources)) {
-            try {
-                ValidationResources resources = validationResourceHolder.get();
-
-                // if no resources in the holder, or if the holder has different resources loaded,
-                // then load the Configuration and set the new resources in the holder
-                if (resources == null || !configResources.equals(resources.getConfigResources())) {
-                    getLogger().debug("Reloading validation resources");
-                    final Configuration config = new ExtendedConfiguration(getLogger());
-                    config.setClassLoader(Thread.currentThread().getContextClassLoader());
-                    resources = new ValidationResources(configResources, getConfigurationFromResources(config, configResources));
-                    validationResourceHolder.set(resources);
-                }
-
-                final Configuration conf = resources.getConfiguration();
-                results.addAll(KerberosProperties.validatePrincipalAndKeytab(
-                        this.getClass().getSimpleName(), conf, principal, keytab, getLogger()));
-
-            } catch (IOException e) {
-                results.add(new ValidationResult.Builder()
-                        .valid(false)
-                        .subject(this.getClass().getSimpleName())
-                        .explanation("Could not load Hadoop Configuration resources")
-                        .build());
-            }
-        }
-
-        return results;
-    }
-
-    /*
-     * If your subclass also has an @OnScheduled annotated method and you need hdfsResources in that method, then be sure to call super.abstractOnScheduled(context)
-     */
-    @OnScheduled
-    public final void abstractOnScheduled(ProcessContext context) throws IOException {
-        try {
-            // This value will be null when called from ListHDFS, because it overrides all of the default
-            // properties this processor sets. TODO: re-work ListHDFS to utilize Kerberos
-            if (context.getProperty(KERBEROS_RELOGIN_PERIOD).getValue() != null) {
-                kerberosReloginThreshold = context.getProperty(KERBEROS_RELOGIN_PERIOD).asTimePeriod(TimeUnit.SECONDS);
-            }
-            HdfsResources resources = hdfsResources.get();
-            if (resources.getConfiguration() == null) {
-                final String configResources = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).getValue();
-                resources = resetHDFSResources(configResources, context);
-                hdfsResources.set(resources);
-            }
-        } catch (IOException ex) {
-            getLogger().error("HDFS Configuration error - {}", new Object[] { ex });
-            hdfsResources.set(new HdfsResources(null, null, null));
-            throw ex;
-        }
-    }
-
-    @OnStopped
-    public final void abstractOnStopped() {
-        hdfsResources.set(new HdfsResources(null, null, null));
-    }
-
-    private static Configuration getConfigurationFromResources(final Configuration config, String configResources) throws IOException {
-        boolean foundResources = false;
-        if (null != configResources) {
-            String[] resources = configResources.split(",");
-            for (String resource : resources) {
-                config.addResource(new Path(resource.trim()));
-                foundResources = true;
-            }
-        }
-
-        if (!foundResources) {
-            // check that at least 1 non-default resource is available on the classpath
-            String configStr = config.toString();
-            for (String resource : configStr.substring(configStr.indexOf(":") + 1).split(",")) {
-                if (!resource.contains("default") && config.getResource(resource.trim()) != null) {
-                    foundResources = true;
-                    break;
-                }
-            }
-        }
-
-        if (!foundResources) {
-            throw new IOException("Could not find any of the " + HADOOP_CONFIGURATION_RESOURCES.getName() + " on the classpath");
-        }
-        return config;
-    }
-
-    /*
-     * Reset Hadoop Configuration and FileSystem based on the supplied configuration resources.
-     */
-    HdfsResources resetHDFSResources(String configResources, ProcessContext context) throws IOException {
-        Configuration config = new ExtendedConfiguration(getLogger());
-        config.setClassLoader(Thread.currentThread().getContextClassLoader());
-
-        getConfigurationFromResources(config, configResources);
-
-        // first check for timeout on HDFS connection, because FileSystem has a hard coded 15 minute timeout
-        checkHdfsUriForTimeout(config);
-
-        // disable caching of Configuration and FileSystem objects, else we cannot reconfigure the processor without a complete
-        // restart
-        String disableCacheName = String.format("fs.%s.impl.disable.cache", FileSystem.getDefaultUri(config).getScheme());
-        config.set(disableCacheName, "true");
-
-        // If kerberos is enabled, create the file system as the kerberos principal
-        // -- use RESOURCE_LOCK to guarantee UserGroupInformation is accessed by only a single thread at at time
-        FileSystem fs;
-        UserGroupInformation ugi;
-        synchronized (RESOURCES_LOCK) {
-            if (SecurityUtil.isSecurityEnabled(config)) {
-                String principal = context.getProperty(kerberosProperties.getKerberosPrincipal()).getValue();
-                String keyTab = context.getProperty(kerberosProperties.getKerberosKeytab()).getValue();
-                ugi = SecurityUtil.loginKerberos(config, principal, keyTab);
-                fs = getFileSystemAsUser(config, ugi);
-                lastKerberosReloginTime = System.currentTimeMillis() / 1000;
-            } else {
-                config.set("ipc.client.fallback-to-simple-auth-allowed", "true");
-                config.set("hadoop.security.authentication", "simple");
-                ugi = SecurityUtil.loginSimple(config);
-                fs = getFileSystemAsUser(config, ugi);
-            }
-        }
-        getLogger().debug("resetHDFSResources UGI {}", new Object[]{ugi});
-
-        final Path workingDir = fs.getWorkingDirectory();
-        getLogger().info("Initialized a new HDFS File System with working dir: {} default block size: {} default replication: {} config: {}",
-                new Object[]{workingDir, fs.getDefaultBlockSize(workingDir), fs.getDefaultReplication(workingDir), config.toString()});
-
-        return new HdfsResources(config, fs, ugi);
-    }
-
-    /**
-     * This exists in order to allow unit tests to override it so that they don't take several minutes waiting for UDP packets to be received
-     *
-     * @param config
-     *            the configuration to use
-     * @return the FileSystem that is created for the given Configuration
-     * @throws IOException
-     *             if unable to create the FileSystem
-     */
-    protected FileSystem getFileSystem(final Configuration config) throws IOException {
-        return FileSystem.get(config);
-    }
-
-    protected FileSystem getFileSystemAsUser(final Configuration config, UserGroupInformation ugi) throws IOException {
-        try {
-            return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
-                @Override
-                public FileSystem run() throws Exception {
-                    return FileSystem.get(config);
-                }
-            });
-        } catch (InterruptedException e) {
-            throw new IOException("Unable to create file system: " + e.getMessage());
-        }
-    }
-
-    /*
-     * Drastically reduce the timeout of a socket connection from the default in FileSystem.get()
-     */
-    protected void checkHdfsUriForTimeout(Configuration config) throws IOException {
-        URI hdfsUri = FileSystem.getDefaultUri(config);
-        String address = hdfsUri.getAuthority();
-        int port = hdfsUri.getPort();
-        if (address == null || address.isEmpty() || port < 0) {
-            return;
-        }
-        InetSocketAddress namenode = NetUtils.createSocketAddr(address, port);
-        SocketFactory socketFactory = NetUtils.getDefaultSocketFactory(config);
-        Socket socket = null;
-        try {
-            socket = socketFactory.createSocket();
-            NetUtils.connect(socket, namenode, 1000); // 1 second timeout
-        } finally {
-            IOUtils.closeQuietly(socket);
-        }
-    }
-
-    /*
-     * Validates that one or more files exist, as specified in a single property.
-     */
-    public static final Validator createMultipleFilesExistValidator() {
-        return new Validator() {
-
-            @Override
-            public ValidationResult validate(String subject, String input, ValidationContext context) {
-                final String[] files = input.split(",");
-                for (String filename : files) {
-                    try {
-                        final File file = new File(filename.trim());
-                        final boolean valid = file.exists() && file.isFile();
-                        if (!valid) {
-                            final String message = "File " + file + " does not exist or is not a file";
-                            return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(message).build();
-                        }
-                    } catch (SecurityException e) {
-                        final String message = "Unable to access " + filename + " due to " + e.getMessage();
-                        return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(message).build();
-                    }
-                }
-                return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
-            }
-
-        };
-    }
-
-    /**
-     * Returns the configured CompressionCodec, or null if none is configured.
-     *
-     * @param context
-     *            the ProcessContext
-     * @param configuration
-     *            the Hadoop Configuration
-     * @return CompressionCodec or null
-     */
-    protected org.apache.hadoop.io.compress.CompressionCodec getCompressionCodec(ProcessContext context, Configuration configuration) {
-        org.apache.hadoop.io.compress.CompressionCodec codec = null;
-        if (context.getProperty(COMPRESSION_CODEC).isSet()) {
-            String compressionClassname = CompressionType.valueOf(context.getProperty(COMPRESSION_CODEC).getValue()).toString();
-            CompressionCodecFactory ccf = new CompressionCodecFactory(configuration);
-            codec = ccf.getCodecByClassName(compressionClassname);
-        }
-
-        return codec;
-    }
-
-    /**
-     * Returns the relative path of the child that does not include the filename or the root path.
-     *
-     * @param root
-     *            the path to relativize from
-     * @param child
-     *            the path to relativize
-     * @return the relative path
-     */
-    public static String getPathDifference(final Path root, final Path child) {
-        final int depthDiff = child.depth() - root.depth();
-        if (depthDiff <= 1) {
-            return "".intern();
-        }
-        String lastRoot = root.getName();
-        Path childsParent = child.getParent();
-        final StringBuilder builder = new StringBuilder();
-        builder.append(childsParent.getName());
-        for (int i = (depthDiff - 3); i >= 0; i--) {
-            childsParent = childsParent.getParent();
-            String name = childsParent.getName();
-            if (name.equals(lastRoot) && childsParent.toString().endsWith(root.toString())) {
-                break;
-            }
-            builder.insert(0, Path.SEPARATOR).insert(0, name);
-        }
-        return builder.toString();
-    }
-
-    protected Configuration getConfiguration() {
-        return hdfsResources.get().getConfiguration();
-    }
-
-    protected FileSystem getFileSystem() {
-        // trigger Relogin if necessary
-        getUserGroupInformation();
-        return hdfsResources.get().getFileSystem();
-    }
-
-    protected UserGroupInformation getUserGroupInformation() {
-        // if kerberos is enabled, check if the ticket should be renewed before returning
-        UserGroupInformation userGroupInformation = hdfsResources.get().getUserGroupInformation();
-        if (userGroupInformation != null && isTicketOld()) {
-            tryKerberosRelogin(userGroupInformation);
-        }
-        return userGroupInformation;
-    }
-
-    protected void tryKerberosRelogin(UserGroupInformation ugi) {
-        try {
-            getLogger().info("Kerberos ticket age exceeds threshold [{} seconds] " +
-                "attempting to renew ticket for user {}", new Object[]{
-              kerberosReloginThreshold, ugi.getUserName()});
-            ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
-                ugi.checkTGTAndReloginFromKeytab();
-                return null;
-            });
-            lastKerberosReloginTime = System.currentTimeMillis() / 1000;
-            getLogger().info("Kerberos relogin successful or ticket still valid");
-        } catch (IOException e) {
-            // Most likely case of this happening is ticket is expired and error getting a new one,
-            // meaning dfs operations would fail
-            getLogger().error("Kerberos relogin failed", e);
-            throw new ProcessException("Unable to renew kerberos ticket", e);
-        } catch (InterruptedException e) {
-            getLogger().error("Interrupted while attempting Kerberos relogin", e);
-            throw new ProcessException("Unable to renew kerberos ticket", e);
-        }
-    }
-
-    protected boolean isTicketOld() {
-        return (System.currentTimeMillis() / 1000 - lastKerberosReloginTime) > kerberosReloginThreshold;
-    }
-
-
-    static protected class HdfsResources {
-        private final Configuration configuration;
-        private final FileSystem fileSystem;
-        private final UserGroupInformation userGroupInformation;
-
-        public HdfsResources(Configuration configuration, FileSystem fileSystem, UserGroupInformation userGroupInformation) {
-            this.configuration = configuration;
-            this.fileSystem = fileSystem;
-            this.userGroupInformation = userGroupInformation;
-        }
-
-        public Configuration getConfiguration() {
-            return configuration;
-        }
-
-        public FileSystem getFileSystem() {
-            return fileSystem;
-        }
-
-        public UserGroupInformation getUserGroupInformation() {
-            return userGroupInformation;
-        }
-    }
-
-    static protected class ValidationResources {
-        private final String configResources;
-        private final Configuration configuration;
-
-        public ValidationResources(String configResources, Configuration configuration) {
-            this.configResources = configResources;
-            this.configuration = configuration;
-        }
-
-        public String getConfigResources() {
-            return configResources;
-        }
-
-        public Configuration getConfiguration() {
-            return configuration;
-        }
-    }
-
-    /**
-     * Extending Hadoop Configuration to prevent it from caching classes that can't be found. Since users may be
-     * adding additional JARs to the classpath we don't want them to have to restart the JVM to be able to load
-     * something that was previously not found, but might now be available.
-     *
-     * Reference the original getClassByNameOrNull from Configuration.
-     */
-    static class ExtendedConfiguration extends Configuration {
-
-        private final ComponentLog logger;
-        private final Map<ClassLoader, Map<String, WeakReference<Class<?>>>> CACHE_CLASSES = new WeakHashMap<>();
-
-        public ExtendedConfiguration(final ComponentLog logger) {
-            this.logger = logger;
-        }
-
-        public Class<?> getClassByNameOrNull(String name) {
-            final ClassLoader classLoader = getClassLoader();
-
-            Map<String, WeakReference<Class<?>>> map;
-            synchronized (CACHE_CLASSES) {
-                map = CACHE_CLASSES.get(classLoader);
-                if (map == null) {
-                    map = Collections.synchronizedMap(new WeakHashMap<>());
-                    CACHE_CLASSES.put(classLoader, map);
-                }
-            }
-
-            Class<?> clazz = null;
-            WeakReference<Class<?>> ref = map.get(name);
-            if (ref != null) {
-                clazz = ref.get();
-            }
-
-            if (clazz == null) {
-                try {
-                    clazz = Class.forName(name, true, classLoader);
-                } catch (ClassNotFoundException e) {
-                    logger.error(e.getMessage(), e);
-                    return null;
-                }
-                // two putters can race here, but they'll put the same class
-                map.put(name, new WeakReference<>(clazz));
-                return clazz;
-            } else {
-                // cache hit
-                return clazz;
-            }
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index 6232936..69b5b77 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -36,9 +36,6 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.Validator;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.DataUnit;
@@ -96,8 +93,6 @@ public class PutHDFS extends AbstractHadoopProcessor {
     public static final String BUFFER_SIZE_KEY = "io.file.buffer.size";
     public static final int BUFFER_SIZE_DEFAULT = 4096;
 
-    public static final String ABSOLUTE_HDFS_PATH_ATTRIBUTE = "absolute.hdfs.path";
-
     // relationships
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
@@ -135,14 +130,14 @@ public class PutHDFS extends AbstractHadoopProcessor {
     public static final PropertyDescriptor REPLICATION_FACTOR = new PropertyDescriptor.Builder()
             .name("Replication")
             .description("Number of times that HDFS will replicate each file. This overrides the Hadoop Configuration")
-            .addValidator(createPositiveShortValidator())
+            .addValidator(HadoopValidators.POSITIVE_SHORT_VALIDATOR)
             .build();
 
     public static final PropertyDescriptor UMASK = new PropertyDescriptor.Builder()
             .name("Permissions umask")
             .description(
                     "A umask represented as an octal number which determines the permissions of files written to HDFS. This overrides the Hadoop Configuration dfs.umaskmode")
-            .addValidator(createUmaskValidator())
+            .addValidator(HadoopValidators.UMASK_VALIDATOR)
             .build();
 
     public static final PropertyDescriptor REMOTE_OWNER = new PropertyDescriptor.Builder()
@@ -404,51 +399,4 @@ public class PutHDFS extends AbstractHadoopProcessor {
         }
     }
 
-    /*
-     * Validates that a property is a valid short number greater than 0.
-     */
-    static Validator createPositiveShortValidator() {
-        return new Validator() {
-            @Override
-            public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
-                String reason = null;
-                try {
-                    final short shortVal = Short.parseShort(value);
-                    if (shortVal <= 0) {
-                        reason = "short integer must be greater than zero";
-                    }
-                } catch (final NumberFormatException e) {
-                    reason = "[" + value + "] is not a valid short integer";
-                }
-                return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null)
-                        .build();
-            }
-        };
-    }
-
-    /*
-     * Validates that a property is a valid umask, i.e. a short octal number that is not negative.
-     */
-    static Validator createUmaskValidator() {
-        return new Validator() {
-            @Override
-            public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
-                String reason = null;
-                try {
-                    final short shortVal = Short.parseShort(value, 8);
-                    if (shortVal < 0) {
-                        reason = "octal umask [" + value + "] cannot be negative";
-                    } else if (shortVal > 511) {
-                        // HDFS umask has 9 bits: rwxrwxrwx ; the sticky bit cannot be umasked
-                        reason = "octal umask [" + value + "] is not a valid umask";
-                    }
-                } catch (final NumberFormatException e) {
-                    reason = "[" + value + "] is not a valid short octal number";
-                }
-                return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null)
-                        .build();
-            }
-        };
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestCreateHadoopSequenceFile.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestCreateHadoopSequenceFile.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestCreateHadoopSequenceFile.java
index 1ac62af..bbb050a 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestCreateHadoopSequenceFile.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestCreateHadoopSequenceFile.java
@@ -206,7 +206,7 @@ public class TestCreateHadoopSequenceFile {
     @Test
     public void testSequenceFileBzipCompressionCodec() throws UnsupportedEncodingException, IOException {
 
-        controller.setProperty(AbstractHadoopProcessor.COMPRESSION_CODEC, AbstractHadoopProcessor.CompressionType.BZIP.name());
+        controller.setProperty(AbstractHadoopProcessor.COMPRESSION_CODEC, CompressionType.BZIP.name());
         controller.setProperty(CreateHadoopSequenceFile.COMPRESSION_TYPE, SequenceFile.CompressionType.BLOCK.name());
 
         File inFile = inFiles[0];
@@ -253,7 +253,7 @@ public class TestCreateHadoopSequenceFile {
     @Test
     public void testSequenceFileDefaultCompressionCodec() throws UnsupportedEncodingException, IOException {
 
-        controller.setProperty(AbstractHadoopProcessor.COMPRESSION_CODEC, AbstractHadoopProcessor.CompressionType.DEFAULT.name());
+        controller.setProperty(AbstractHadoopProcessor.COMPRESSION_CODEC, CompressionType.DEFAULT.name());
         controller.setProperty(CreateHadoopSequenceFile.COMPRESSION_TYPE, SequenceFile.CompressionType.BLOCK.name());
 
         File inFile = inFiles[0];
@@ -300,7 +300,7 @@ public class TestCreateHadoopSequenceFile {
     @Test
     public void testSequenceFileNoneCompressionCodec() throws UnsupportedEncodingException, IOException {
 
-        controller.setProperty(AbstractHadoopProcessor.COMPRESSION_CODEC, AbstractHadoopProcessor.CompressionType.NONE.name());
+        controller.setProperty(AbstractHadoopProcessor.COMPRESSION_CODEC, CompressionType.NONE.name());
         controller.setProperty(CreateHadoopSequenceFile.COMPRESSION_TYPE, SequenceFile.CompressionType.BLOCK.name());
 
         File inFile = inFiles[0];

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml
index 661180e..2c50ceb 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml
@@ -84,12 +84,6 @@
         <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-hadoop-utils</artifactId>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.hadoop</groupId>
-                    <artifactId>hadoop-client</artifactId>
-                </exclusion>
-            </exclusions>
         </dependency>
         <dependency>
             <groupId>com.github.stephenc.findbugs</groupId>

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-nar/pom.xml b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-nar/pom.xml
new file mode 100644
index 0000000..dfe67a3
--- /dev/null
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-nar/pom.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-parquet-bundle</artifactId>
+        <version>1.2.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-parquet-nar</artifactId>
+    <version>1.2.0-SNAPSHOT</version>
+    <packaging>nar</packaging>
+    <properties>
+        <maven.javadoc.skip>true</maven.javadoc.skip>
+        <source.skip>true</source.skip>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-parquet-processors</artifactId>
+            <version>1.2.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-hadoop-libraries-nar</artifactId>
+            <type>nar</type>
+        </dependency>
+    </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-nar/src/main/resources/META-INF/LICENSE
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-nar/src/main/resources/META-INF/LICENSE
new file mode 100644
index 0000000..958de4d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-nar/src/main/resources/META-INF/LICENSE
@@ -0,0 +1,239 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+
+APACHE NIFI SUBCOMPONENTS:
+
+The Apache NiFi project contains subcomponents with separate copyright
+notices and license terms. Your use of the source code for the these
+subcomponents is subject to the terms and conditions of the following
+licenses.
+
+  The binary distribution of this product bundles 'ParaNamer' and 'Paranamer Core'
+  which is available under a BSD style license.
+
+    Copyright (c) 2006 Paul Hammant & ThoughtWorks Inc
+     All rights reserved.
+
+     Redistribution and use in source and binary forms, with or without
+     modification, are permitted provided that the following conditions
+     are met:
+     1. Redistributions of source code must retain the above copyright
+        notice, this list of conditions and the following disclaimer.
+     2. Redistributions in binary form must reproduce the above copyright
+        notice, this list of conditions and the following disclaimer in the
+        documentation and/or other materials provided with the distribution.
+     3. Neither the name of the copyright holders nor the names of its
+        contributors may be used to endorse or promote products derived from
+        this software without specific prior written permission.
+
+     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+     ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+     LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+     CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+     SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+     INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+     CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+     ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+     THE POSSIBILITY OF SUCH DAMAGE.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-nar/src/main/resources/META-INF/NOTICE
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-nar/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000..4c40843
--- /dev/null
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-nar/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,105 @@
+nifi-parquet-nar
+Copyright 2014-2017 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+===========================================
+Apache Software License v2
+===========================================
+
+The following binary components are provided under the Apache Software License v2
+
+  (ASLv2) Apache Avro
+    The following NOTICE information applies:
+      Apache Avro
+      Copyright 2009-2013 The Apache Software Foundation
+
+  (ASLv2) Apache Commons Codec
+    The following NOTICE information applies:
+      Apache Commons Codec
+      Copyright 2002-2014 The Apache Software Foundation
+
+      src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java
+      contains test data from http://aspell.net/test/orig/batch0.tab.
+      Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org)
+
+      ===============================================================================
+
+      The content of package org.apache.commons.codec.language.bm has been translated
+      from the original php source code available at http://stevemorse.org/phoneticinfo.htm
+      with permission from the original authors.
+      Original source copyright:
+      Copyright (c) 2008 Alexander Beider & Stephen P. Morse.
+
+  (ASLv2) Apache Commons Compress
+    The following NOTICE information applies:
+      Apache Commons Compress
+      Copyright 2002-2014 The Apache Software Foundation
+
+      The files in the package org.apache.commons.compress.archivers.sevenz
+      were derived from the LZMA SDK, version 9.20 (C/ and CPP/7zip/),
+      which has been placed in the public domain:
+
+      "LZMA SDK is placed in the public domain." (http://www.7-zip.org/sdk.html)
+
+  (ASLv2) Apache Commons Lang
+    The following NOTICE information applies:
+      Apache Commons Lang
+      Copyright 2001-2015 The Apache Software Foundation
+
+      This product includes software from the Spring Framework,
+      under the Apache License 2.0 (see: StringUtils.containsWhitespace())
+
+  (ASLv2) Apache Parquet
+    The following NOTICE information applies:
+      Apache Parquet MR (Incubating)
+      Copyright 2014 The Apache Software Foundation
+
+      This product includes software developed at
+      The Apache Software Foundation (http://www.apache.org/).
+
+  (ASLv2) Jackson JSON processor
+    The following NOTICE information applies:
+      # Jackson JSON processor
+
+      Jackson is a high-performance, Free/Open Source JSON processing library.
+      It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has
+      been in development since 2007.
+      It is currently developed by a community of developers, as well as supported
+      commercially by FasterXML.com.
+
+      ## Licensing
+
+      Jackson core and extension components may licensed under different licenses.
+      To find the details that apply to this artifact see the accompanying LICENSE file.
+      For more information, including possible other licensing options, contact
+      FasterXML.com (http://fasterxml.com).
+
+      ## Credits
+
+      A list of contributors may be found from CREDITS file, which is included
+      in some artifacts (usually source distributions); but is always available
+      from the source code management (SCM) system project uses.
+
+  (ASLv2) Snappy Java
+    The following NOTICE information applies:
+      This product includes software developed by Google
+       Snappy: http://code.google.com/p/snappy/ (New BSD License)
+
+      This product includes software developed by Apache
+       PureJavaCrc32C from apache-hadoop-common http://hadoop.apache.org/
+       (Apache 2.0 license)
+
+      This library containd statically linked libstdc++. This inclusion is allowed by
+      "GCC RUntime Library Exception"
+      http://gcc.gnu.org/onlinedocs/libstdc++/manual/license.html
+
+*****************
+Public Domain
+*****************
+
+The following binary components are provided to the 'Public Domain'.  See project link for details.
+
+    (Public Domain) XZ for Java (org.tukaani:xz:jar:1.5 - http://tukaani.org/xz/java.html
+

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml
new file mode 100644
index 0000000..c8c1078
--- /dev/null
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml
@@ -0,0 +1,100 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-parquet-bundle</artifactId>
+        <version>1.2.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-parquet-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.parquet</groupId>
+            <artifactId>parquet-avro</artifactId>
+            <version>1.8.2</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-hadoop-record-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-schema-registry-service-api</artifactId>
+        </dependency>
+        <!-- Test dependencies -->
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock-record-utils</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <configuration>
+                    <excludes combine.children="append">
+                        <exclude>src/test/resources/avro/user.avsc</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java
new file mode 100644
index 0000000..3536944
--- /dev/null
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.parquet;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.Restricted;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.hadoop.AbstractFetchHDFSRecord;
+import org.apache.nifi.processors.hadoop.record.HDFSRecordReader;
+import org.apache.nifi.processors.parquet.record.AvroParquetHDFSRecordReader;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.hadoop.ParquetReader;
+
+import java.io.IOException;
+
+@SupportsBatching
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"parquet", "hadoop", "HDFS", "get", "ingest", "fetch", "source", "restricted"})
+@CapabilityDescription("Reads from a given Parquet file and writes records to the content of the flow file using " +
+        "the selected record writer. The original Parquet file will remain unchanged, and the content of the flow file " +
+        "will be replaced with records of the selected type. This processor can be used with ListHDFS or ListFile to obtain " +
+        "a listing of files to fetch.")
+@WritesAttributes({
+        @WritesAttribute(attribute="fetch.failure.reason", description="When a FlowFile is routed to 'failure', this attribute is added " +
+                "indicating why the file could not be fetched from the given filesystem."),
+        @WritesAttribute(attribute = "record.count", description = "The number of records in the resulting flow file")
+})
+@SeeAlso({PutParquet.class})
+@Restricted("Provides operator the ability to retrieve any file that NiFi has access to in HDFS or the local filesystem.")
+public class FetchParquet extends AbstractFetchHDFSRecord {
+
+    @Override
+    public HDFSRecordReader createHDFSRecordReader(final ProcessContext context, final FlowFile flowFile, final Configuration conf, final Path path)
+            throws IOException {
+        final ParquetReader.Builder<GenericRecord> readerBuilder = AvroParquetReader.<GenericRecord>builder(path).withConf(conf);
+        return new AvroParquetHDFSRecordReader(readerBuilder.build());
+    }
+
+}