You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/05/01 20:11:59 UTC
[06/17] nifi git commit: NIFI-3724 - Initial commit of Parquet bundle
with PutParquet and FetchParquet - Creating nifi-records-utils to share
utility code from record services - Refactoring Parquet tests to use
MockRecorderParser and MockRecordWriter - R
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/DateTimeUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/DateTimeUtils.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/DateTimeUtils.java
new file mode 100644
index 0000000..d5ab8c5
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/DateTimeUtils.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.serialization.record.RecordFieldType;
+
+public class DateTimeUtils {
+ public static final PropertyDescriptor DATE_FORMAT = new PropertyDescriptor.Builder()
+ .name("Date Format")
+ .description("Specifies the format to use when reading/writing Date fields")
+ .expressionLanguageSupported(false)
+ .defaultValue(RecordFieldType.DATE.getDefaultFormat())
+ .addValidator(new SimpleDateFormatValidator())
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor TIME_FORMAT = new PropertyDescriptor.Builder()
+ .name("Time Format")
+ .description("Specifies the format to use when reading/writing Time fields")
+ .expressionLanguageSupported(false)
+ .defaultValue(RecordFieldType.TIME.getDefaultFormat())
+ .addValidator(new SimpleDateFormatValidator())
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor TIMESTAMP_FORMAT = new PropertyDescriptor.Builder()
+ .name("Timestamp Format")
+ .description("Specifies the format to use when reading/writing Timestamp fields")
+ .expressionLanguageSupported(false)
+ .defaultValue(RecordFieldType.TIMESTAMP.getDefaultFormat())
+ .addValidator(new SimpleDateFormatValidator())
+ .required(true)
+ .build();
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/SimpleDateFormatValidator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/SimpleDateFormatValidator.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/SimpleDateFormatValidator.java
new file mode 100644
index 0000000..f25749b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/SimpleDateFormatValidator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization;
+
+import java.text.SimpleDateFormat;
+
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+
+public class SimpleDateFormatValidator implements Validator {
+
+ @Override
+ public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+ try {
+ new SimpleDateFormat(input);
+ } catch (final Exception e) {
+ return new ValidationResult.Builder()
+ .input(input)
+ .subject(subject)
+ .valid(false)
+ .explanation("Invalid Date format: " + e.getMessage())
+ .build();
+ }
+
+ return new ValidationResult.Builder()
+ .input(input)
+ .subject(subject)
+ .valid(true)
+ .build();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/pom.xml
new file mode 100644
index 0000000..fea0920
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/pom.xml
@@ -0,0 +1,33 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-extension-utils</artifactId>
+ <version>1.2.0-SNAPSHOT</version>
+ </parent>
+ <packaging>pom</packaging>
+ <artifactId>nifi-record-utils</artifactId>
+
+ <modules>
+ <module>nifi-avro-record-utils</module>
+ <module>nifi-standard-record-utils</module>
+ <module>nifi-hadoop-record-utils</module>
+ <module>nifi-mock-record-utils</module>
+ </modules>
+
+</project>
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/pom.xml
new file mode 100644
index 0000000..4e1a0f5
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/pom.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-nar-bundles</artifactId>
+ <version>1.2.0-SNAPSHOT</version>
+ </parent>
+ <packaging>pom</packaging>
+ <artifactId>nifi-extension-utils</artifactId>
+ <description>
+ This module contains reusable utilities related to extensions that can be shared across NARs.
+ </description>
+
+ <modules>
+ <module>nifi-record-utils</module>
+ <module>nifi-hadoop-utils</module>
+ <module>nifi-processor-utils</module>
+ </modules>
+
+</project>
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
index 7429ea8..493caf0 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
@@ -33,12 +33,6 @@
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hadoop-utils</artifactId>
- <exclusions>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
@@ -50,16 +44,16 @@
<scope>provided</scope>
</dependency>
<dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-distributed-cache-client-service-api</artifactId>
- </dependency>
- <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-distributed-cache-client-service-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
deleted file mode 100644
index dac2b30..0000000
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
+++ /dev/null
@@ -1,580 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.hadoop;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.BZip2Codec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.io.compress.GzipCodec;
-import org.apache.hadoop.io.compress.Lz4Codec;
-import org.apache.hadoop.io.compress.SnappyCodec;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.Validator;
-import org.apache.nifi.hadoop.KerberosProperties;
-import org.apache.nifi.hadoop.SecurityUtil;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessorInitializationContext;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.util.StringUtils;
-
-import javax.net.SocketFactory;
-import java.io.File;
-import java.io.IOException;
-import java.lang.ref.WeakReference;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.URI;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.WeakHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * This is a base class that is helpful when building processors interacting with HDFS.
- */
-@RequiresInstanceClassLoading(cloneAncestorResources = true)
-public abstract class AbstractHadoopProcessor extends AbstractProcessor {
- /**
- * Compression Type Enum
- */
- public enum CompressionType {
- NONE,
- DEFAULT,
- BZIP,
- GZIP,
- LZ4,
- SNAPPY,
- AUTOMATIC;
-
- @Override
- public String toString() {
- switch (this) {
- case NONE: return "NONE";
- case DEFAULT: return DefaultCodec.class.getName();
- case BZIP: return BZip2Codec.class.getName();
- case GZIP: return GzipCodec.class.getName();
- case LZ4: return Lz4Codec.class.getName();
- case SNAPPY: return SnappyCodec.class.getName();
- case AUTOMATIC: return "Automatically Detected";
- }
- return null;
- }
- }
-
- // properties
- public static final PropertyDescriptor HADOOP_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder()
- .name("Hadoop Configuration Resources")
- .description("A file or comma separated list of files which contains the Hadoop file system configuration. Without this, Hadoop "
- + "will search the classpath for a 'core-site.xml' and 'hdfs-site.xml' file or will revert to a default configuration.")
- .required(false)
- .addValidator(createMultipleFilesExistValidator())
- .build();
-
- public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
- .name("Directory")
- .description("The HDFS directory from which files should be read")
- .required(true)
- .addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
- .expressionLanguageSupported(true)
- .build();
-
- public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder()
- .name("Compression codec")
- .required(true)
- .allowableValues(CompressionType.values())
- .defaultValue(CompressionType.NONE.toString())
- .build();
-
- public static final PropertyDescriptor KERBEROS_RELOGIN_PERIOD = new PropertyDescriptor.Builder()
- .name("Kerberos Relogin Period").required(false)
- .description("Period of time which should pass before attempting a kerberos relogin")
- .defaultValue("4 hours")
- .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor ADDITIONAL_CLASSPATH_RESOURCES = new PropertyDescriptor.Builder()
- .name("Additional Classpath Resources")
- .description("A comma-separated list of paths to files and/or directories that will be added to the classpath. When specifying a " +
- "directory, all files with in the directory will be added to the classpath, but further sub-directories will not be included.")
- .required(false)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .dynamicallyModifiesClasspath(true)
- .build();
-
- private static final Object RESOURCES_LOCK = new Object();
-
- private long kerberosReloginThreshold;
- private long lastKerberosReloginTime;
- protected KerberosProperties kerberosProperties;
- protected List<PropertyDescriptor> properties;
- private volatile File kerberosConfigFile = null;
-
- // variables shared by all threads of this processor
- // Hadoop Configuration, Filesystem, and UserGroupInformation (optional)
- private final AtomicReference<HdfsResources> hdfsResources = new AtomicReference<>();
-
- // Holder of cached Configuration information so validation does not reload the same config over and over
- private final AtomicReference<ValidationResources> validationResourceHolder = new AtomicReference<>();
-
- @Override
- protected void init(ProcessorInitializationContext context) {
- hdfsResources.set(new HdfsResources(null, null, null));
-
- kerberosConfigFile = context.getKerberosConfigurationFile();
- kerberosProperties = getKerberosProperties(kerberosConfigFile);
-
- List<PropertyDescriptor> props = new ArrayList<>();
- props.add(HADOOP_CONFIGURATION_RESOURCES);
- props.add(kerberosProperties.getKerberosPrincipal());
- props.add(kerberosProperties.getKerberosKeytab());
- props.add(KERBEROS_RELOGIN_PERIOD);
- props.add(ADDITIONAL_CLASSPATH_RESOURCES);
- properties = Collections.unmodifiableList(props);
- }
-
- protected KerberosProperties getKerberosProperties(File kerberosConfigFile) {
- return new KerberosProperties(kerberosConfigFile);
- }
-
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return properties;
- }
-
- @Override
- protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
- final String configResources = validationContext.getProperty(HADOOP_CONFIGURATION_RESOURCES).getValue();
- final String principal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).getValue();
- final String keytab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).getValue();
-
- final List<ValidationResult> results = new ArrayList<>();
-
- if (!StringUtils.isBlank(configResources)) {
- try {
- ValidationResources resources = validationResourceHolder.get();
-
- // if no resources in the holder, or if the holder has different resources loaded,
- // then load the Configuration and set the new resources in the holder
- if (resources == null || !configResources.equals(resources.getConfigResources())) {
- getLogger().debug("Reloading validation resources");
- final Configuration config = new ExtendedConfiguration(getLogger());
- config.setClassLoader(Thread.currentThread().getContextClassLoader());
- resources = new ValidationResources(configResources, getConfigurationFromResources(config, configResources));
- validationResourceHolder.set(resources);
- }
-
- final Configuration conf = resources.getConfiguration();
- results.addAll(KerberosProperties.validatePrincipalAndKeytab(
- this.getClass().getSimpleName(), conf, principal, keytab, getLogger()));
-
- } catch (IOException e) {
- results.add(new ValidationResult.Builder()
- .valid(false)
- .subject(this.getClass().getSimpleName())
- .explanation("Could not load Hadoop Configuration resources")
- .build());
- }
- }
-
- return results;
- }
-
- /*
- * If your subclass also has an @OnScheduled annotated method and you need hdfsResources in that method, then be sure to call super.abstractOnScheduled(context)
- */
- @OnScheduled
- public final void abstractOnScheduled(ProcessContext context) throws IOException {
- try {
- // This value will be null when called from ListHDFS, because it overrides all of the default
- // properties this processor sets. TODO: re-work ListHDFS to utilize Kerberos
- if (context.getProperty(KERBEROS_RELOGIN_PERIOD).getValue() != null) {
- kerberosReloginThreshold = context.getProperty(KERBEROS_RELOGIN_PERIOD).asTimePeriod(TimeUnit.SECONDS);
- }
- HdfsResources resources = hdfsResources.get();
- if (resources.getConfiguration() == null) {
- final String configResources = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).getValue();
- resources = resetHDFSResources(configResources, context);
- hdfsResources.set(resources);
- }
- } catch (IOException ex) {
- getLogger().error("HDFS Configuration error - {}", new Object[] { ex });
- hdfsResources.set(new HdfsResources(null, null, null));
- throw ex;
- }
- }
-
- @OnStopped
- public final void abstractOnStopped() {
- hdfsResources.set(new HdfsResources(null, null, null));
- }
-
- private static Configuration getConfigurationFromResources(final Configuration config, String configResources) throws IOException {
- boolean foundResources = false;
- if (null != configResources) {
- String[] resources = configResources.split(",");
- for (String resource : resources) {
- config.addResource(new Path(resource.trim()));
- foundResources = true;
- }
- }
-
- if (!foundResources) {
- // check that at least 1 non-default resource is available on the classpath
- String configStr = config.toString();
- for (String resource : configStr.substring(configStr.indexOf(":") + 1).split(",")) {
- if (!resource.contains("default") && config.getResource(resource.trim()) != null) {
- foundResources = true;
- break;
- }
- }
- }
-
- if (!foundResources) {
- throw new IOException("Could not find any of the " + HADOOP_CONFIGURATION_RESOURCES.getName() + " on the classpath");
- }
- return config;
- }
-
- /*
- * Reset Hadoop Configuration and FileSystem based on the supplied configuration resources.
- */
- HdfsResources resetHDFSResources(String configResources, ProcessContext context) throws IOException {
- Configuration config = new ExtendedConfiguration(getLogger());
- config.setClassLoader(Thread.currentThread().getContextClassLoader());
-
- getConfigurationFromResources(config, configResources);
-
- // first check for timeout on HDFS connection, because FileSystem has a hard coded 15 minute timeout
- checkHdfsUriForTimeout(config);
-
- // disable caching of Configuration and FileSystem objects, else we cannot reconfigure the processor without a complete
- // restart
- String disableCacheName = String.format("fs.%s.impl.disable.cache", FileSystem.getDefaultUri(config).getScheme());
- config.set(disableCacheName, "true");
-
- // If kerberos is enabled, create the file system as the kerberos principal
- // -- use RESOURCE_LOCK to guarantee UserGroupInformation is accessed by only a single thread at at time
- FileSystem fs;
- UserGroupInformation ugi;
- synchronized (RESOURCES_LOCK) {
- if (SecurityUtil.isSecurityEnabled(config)) {
- String principal = context.getProperty(kerberosProperties.getKerberosPrincipal()).getValue();
- String keyTab = context.getProperty(kerberosProperties.getKerberosKeytab()).getValue();
- ugi = SecurityUtil.loginKerberos(config, principal, keyTab);
- fs = getFileSystemAsUser(config, ugi);
- lastKerberosReloginTime = System.currentTimeMillis() / 1000;
- } else {
- config.set("ipc.client.fallback-to-simple-auth-allowed", "true");
- config.set("hadoop.security.authentication", "simple");
- ugi = SecurityUtil.loginSimple(config);
- fs = getFileSystemAsUser(config, ugi);
- }
- }
- getLogger().debug("resetHDFSResources UGI {}", new Object[]{ugi});
-
- final Path workingDir = fs.getWorkingDirectory();
- getLogger().info("Initialized a new HDFS File System with working dir: {} default block size: {} default replication: {} config: {}",
- new Object[]{workingDir, fs.getDefaultBlockSize(workingDir), fs.getDefaultReplication(workingDir), config.toString()});
-
- return new HdfsResources(config, fs, ugi);
- }
-
- /**
- * This exists in order to allow unit tests to override it so that they don't take several minutes waiting for UDP packets to be received
- *
- * @param config
- * the configuration to use
- * @return the FileSystem that is created for the given Configuration
- * @throws IOException
- * if unable to create the FileSystem
- */
- protected FileSystem getFileSystem(final Configuration config) throws IOException {
- return FileSystem.get(config);
- }
-
- protected FileSystem getFileSystemAsUser(final Configuration config, UserGroupInformation ugi) throws IOException {
- try {
- return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
- @Override
- public FileSystem run() throws Exception {
- return FileSystem.get(config);
- }
- });
- } catch (InterruptedException e) {
- throw new IOException("Unable to create file system: " + e.getMessage());
- }
- }
-
- /*
- * Drastically reduce the timeout of a socket connection from the default in FileSystem.get()
- */
- protected void checkHdfsUriForTimeout(Configuration config) throws IOException {
- URI hdfsUri = FileSystem.getDefaultUri(config);
- String address = hdfsUri.getAuthority();
- int port = hdfsUri.getPort();
- if (address == null || address.isEmpty() || port < 0) {
- return;
- }
- InetSocketAddress namenode = NetUtils.createSocketAddr(address, port);
- SocketFactory socketFactory = NetUtils.getDefaultSocketFactory(config);
- Socket socket = null;
- try {
- socket = socketFactory.createSocket();
- NetUtils.connect(socket, namenode, 1000); // 1 second timeout
- } finally {
- IOUtils.closeQuietly(socket);
- }
- }
-
- /*
- * Validates that one or more files exist, as specified in a single property.
- */
- public static final Validator createMultipleFilesExistValidator() {
- return new Validator() {
-
- @Override
- public ValidationResult validate(String subject, String input, ValidationContext context) {
- final String[] files = input.split(",");
- for (String filename : files) {
- try {
- final File file = new File(filename.trim());
- final boolean valid = file.exists() && file.isFile();
- if (!valid) {
- final String message = "File " + file + " does not exist or is not a file";
- return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(message).build();
- }
- } catch (SecurityException e) {
- final String message = "Unable to access " + filename + " due to " + e.getMessage();
- return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(message).build();
- }
- }
- return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
- }
-
- };
- }
-
- /**
- * Returns the configured CompressionCodec, or null if none is configured.
- *
- * @param context
- * the ProcessContext
- * @param configuration
- * the Hadoop Configuration
- * @return CompressionCodec or null
- */
- protected org.apache.hadoop.io.compress.CompressionCodec getCompressionCodec(ProcessContext context, Configuration configuration) {
- org.apache.hadoop.io.compress.CompressionCodec codec = null;
- if (context.getProperty(COMPRESSION_CODEC).isSet()) {
- String compressionClassname = CompressionType.valueOf(context.getProperty(COMPRESSION_CODEC).getValue()).toString();
- CompressionCodecFactory ccf = new CompressionCodecFactory(configuration);
- codec = ccf.getCodecByClassName(compressionClassname);
- }
-
- return codec;
- }
-
- /**
- * Returns the relative path of the child that does not include the filename or the root path.
- *
- * @param root
- * the path to relativize from
- * @param child
- * the path to relativize
- * @return the relative path
- */
- public static String getPathDifference(final Path root, final Path child) {
- final int depthDiff = child.depth() - root.depth();
- if (depthDiff <= 1) {
- return "".intern();
- }
- String lastRoot = root.getName();
- Path childsParent = child.getParent();
- final StringBuilder builder = new StringBuilder();
- builder.append(childsParent.getName());
- for (int i = (depthDiff - 3); i >= 0; i--) {
- childsParent = childsParent.getParent();
- String name = childsParent.getName();
- if (name.equals(lastRoot) && childsParent.toString().endsWith(root.toString())) {
- break;
- }
- builder.insert(0, Path.SEPARATOR).insert(0, name);
- }
- return builder.toString();
- }
-
- protected Configuration getConfiguration() {
- return hdfsResources.get().getConfiguration();
- }
-
- protected FileSystem getFileSystem() {
- // trigger Relogin if necessary
- getUserGroupInformation();
- return hdfsResources.get().getFileSystem();
- }
-
- protected UserGroupInformation getUserGroupInformation() {
- // if kerberos is enabled, check if the ticket should be renewed before returning
- UserGroupInformation userGroupInformation = hdfsResources.get().getUserGroupInformation();
- if (userGroupInformation != null && isTicketOld()) {
- tryKerberosRelogin(userGroupInformation);
- }
- return userGroupInformation;
- }
-
- protected void tryKerberosRelogin(UserGroupInformation ugi) {
- try {
- getLogger().info("Kerberos ticket age exceeds threshold [{} seconds] " +
- "attempting to renew ticket for user {}", new Object[]{
- kerberosReloginThreshold, ugi.getUserName()});
- ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
- ugi.checkTGTAndReloginFromKeytab();
- return null;
- });
- lastKerberosReloginTime = System.currentTimeMillis() / 1000;
- getLogger().info("Kerberos relogin successful or ticket still valid");
- } catch (IOException e) {
- // Most likely case of this happening is ticket is expired and error getting a new one,
- // meaning dfs operations would fail
- getLogger().error("Kerberos relogin failed", e);
- throw new ProcessException("Unable to renew kerberos ticket", e);
- } catch (InterruptedException e) {
- getLogger().error("Interrupted while attempting Kerberos relogin", e);
- throw new ProcessException("Unable to renew kerberos ticket", e);
- }
- }
-
- protected boolean isTicketOld() {
- return (System.currentTimeMillis() / 1000 - lastKerberosReloginTime) > kerberosReloginThreshold;
- }
-
-
- static protected class HdfsResources {
- private final Configuration configuration;
- private final FileSystem fileSystem;
- private final UserGroupInformation userGroupInformation;
-
- public HdfsResources(Configuration configuration, FileSystem fileSystem, UserGroupInformation userGroupInformation) {
- this.configuration = configuration;
- this.fileSystem = fileSystem;
- this.userGroupInformation = userGroupInformation;
- }
-
- public Configuration getConfiguration() {
- return configuration;
- }
-
- public FileSystem getFileSystem() {
- return fileSystem;
- }
-
- public UserGroupInformation getUserGroupInformation() {
- return userGroupInformation;
- }
- }
-
- static protected class ValidationResources {
- private final String configResources;
- private final Configuration configuration;
-
- public ValidationResources(String configResources, Configuration configuration) {
- this.configResources = configResources;
- this.configuration = configuration;
- }
-
- public String getConfigResources() {
- return configResources;
- }
-
- public Configuration getConfiguration() {
- return configuration;
- }
- }
-
- /**
- * Extending Hadoop Configuration to prevent it from caching classes that can't be found. Since users may be
- * adding additional JARs to the classpath we don't want them to have to restart the JVM to be able to load
- * something that was previously not found, but might now be available.
- *
- * Reference the original getClassByNameOrNull from Configuration.
- */
- static class ExtendedConfiguration extends Configuration {
-
- private final ComponentLog logger;
- private final Map<ClassLoader, Map<String, WeakReference<Class<?>>>> CACHE_CLASSES = new WeakHashMap<>();
-
- public ExtendedConfiguration(final ComponentLog logger) {
- this.logger = logger;
- }
-
- public Class<?> getClassByNameOrNull(String name) {
- final ClassLoader classLoader = getClassLoader();
-
- Map<String, WeakReference<Class<?>>> map;
- synchronized (CACHE_CLASSES) {
- map = CACHE_CLASSES.get(classLoader);
- if (map == null) {
- map = Collections.synchronizedMap(new WeakHashMap<>());
- CACHE_CLASSES.put(classLoader, map);
- }
- }
-
- Class<?> clazz = null;
- WeakReference<Class<?>> ref = map.get(name);
- if (ref != null) {
- clazz = ref.get();
- }
-
- if (clazz == null) {
- try {
- clazz = Class.forName(name, true, classLoader);
- } catch (ClassNotFoundException e) {
- logger.error(e.getMessage(), e);
- return null;
- }
- // two putters can race here, but they'll put the same class
- map.put(name, new WeakReference<>(clazz));
- return clazz;
- } else {
- // cache hit
- return clazz;
- }
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index 6232936..69b5b77 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -36,9 +36,6 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.DataUnit;
@@ -96,8 +93,6 @@ public class PutHDFS extends AbstractHadoopProcessor {
public static final String BUFFER_SIZE_KEY = "io.file.buffer.size";
public static final int BUFFER_SIZE_DEFAULT = 4096;
- public static final String ABSOLUTE_HDFS_PATH_ATTRIBUTE = "absolute.hdfs.path";
-
// relationships
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
@@ -135,14 +130,14 @@ public class PutHDFS extends AbstractHadoopProcessor {
public static final PropertyDescriptor REPLICATION_FACTOR = new PropertyDescriptor.Builder()
.name("Replication")
.description("Number of times that HDFS will replicate each file. This overrides the Hadoop Configuration")
- .addValidator(createPositiveShortValidator())
+ .addValidator(HadoopValidators.POSITIVE_SHORT_VALIDATOR)
.build();
public static final PropertyDescriptor UMASK = new PropertyDescriptor.Builder()
.name("Permissions umask")
.description(
"A umask represented as an octal number which determines the permissions of files written to HDFS. This overrides the Hadoop Configuration dfs.umaskmode")
- .addValidator(createUmaskValidator())
+ .addValidator(HadoopValidators.UMASK_VALIDATOR)
.build();
public static final PropertyDescriptor REMOTE_OWNER = new PropertyDescriptor.Builder()
@@ -404,51 +399,4 @@ public class PutHDFS extends AbstractHadoopProcessor {
}
}
- /*
- * Validates that a property is a valid short number greater than 0.
- */
- static Validator createPositiveShortValidator() {
- return new Validator() {
- @Override
- public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
- String reason = null;
- try {
- final short shortVal = Short.parseShort(value);
- if (shortVal <= 0) {
- reason = "short integer must be greater than zero";
- }
- } catch (final NumberFormatException e) {
- reason = "[" + value + "] is not a valid short integer";
- }
- return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null)
- .build();
- }
- };
- }
-
- /*
- * Validates that a property is a valid umask, i.e. a short octal number that is not negative.
- */
- static Validator createUmaskValidator() {
- return new Validator() {
- @Override
- public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
- String reason = null;
- try {
- final short shortVal = Short.parseShort(value, 8);
- if (shortVal < 0) {
- reason = "octal umask [" + value + "] cannot be negative";
- } else if (shortVal > 511) {
- // HDFS umask has 9 bits: rwxrwxrwx ; the sticky bit cannot be umasked
- reason = "octal umask [" + value + "] is not a valid umask";
- }
- } catch (final NumberFormatException e) {
- reason = "[" + value + "] is not a valid short octal number";
- }
- return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null)
- .build();
- }
- };
- }
-
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestCreateHadoopSequenceFile.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestCreateHadoopSequenceFile.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestCreateHadoopSequenceFile.java
index 1ac62af..bbb050a 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestCreateHadoopSequenceFile.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestCreateHadoopSequenceFile.java
@@ -206,7 +206,7 @@ public class TestCreateHadoopSequenceFile {
@Test
public void testSequenceFileBzipCompressionCodec() throws UnsupportedEncodingException, IOException {
- controller.setProperty(AbstractHadoopProcessor.COMPRESSION_CODEC, AbstractHadoopProcessor.CompressionType.BZIP.name());
+ controller.setProperty(AbstractHadoopProcessor.COMPRESSION_CODEC, CompressionType.BZIP.name());
controller.setProperty(CreateHadoopSequenceFile.COMPRESSION_TYPE, SequenceFile.CompressionType.BLOCK.name());
File inFile = inFiles[0];
@@ -253,7 +253,7 @@ public class TestCreateHadoopSequenceFile {
@Test
public void testSequenceFileDefaultCompressionCodec() throws UnsupportedEncodingException, IOException {
- controller.setProperty(AbstractHadoopProcessor.COMPRESSION_CODEC, AbstractHadoopProcessor.CompressionType.DEFAULT.name());
+ controller.setProperty(AbstractHadoopProcessor.COMPRESSION_CODEC, CompressionType.DEFAULT.name());
controller.setProperty(CreateHadoopSequenceFile.COMPRESSION_TYPE, SequenceFile.CompressionType.BLOCK.name());
File inFile = inFiles[0];
@@ -300,7 +300,7 @@ public class TestCreateHadoopSequenceFile {
@Test
public void testSequenceFileNoneCompressionCodec() throws UnsupportedEncodingException, IOException {
- controller.setProperty(AbstractHadoopProcessor.COMPRESSION_CODEC, AbstractHadoopProcessor.CompressionType.NONE.name());
+ controller.setProperty(AbstractHadoopProcessor.COMPRESSION_CODEC, CompressionType.NONE.name());
controller.setProperty(CreateHadoopSequenceFile.COMPRESSION_TYPE, SequenceFile.CompressionType.BLOCK.name());
File inFile = inFiles[0];
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml
index 661180e..2c50ceb 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml
@@ -84,12 +84,6 @@
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hadoop-utils</artifactId>
- <exclusions>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
<groupId>com.github.stephenc.findbugs</groupId>
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-nar/pom.xml b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-nar/pom.xml
new file mode 100644
index 0000000..dfe67a3
--- /dev/null
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-nar/pom.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-parquet-bundle</artifactId>
+ <version>1.2.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>nifi-parquet-nar</artifactId>
+ <version>1.2.0-SNAPSHOT</version>
+ <packaging>nar</packaging>
+ <properties>
+ <maven.javadoc.skip>true</maven.javadoc.skip>
+ <source.skip>true</source.skip>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-parquet-processors</artifactId>
+ <version>1.2.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-hadoop-libraries-nar</artifactId>
+ <type>nar</type>
+ </dependency>
+ </dependencies>
+
+</project>
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-nar/src/main/resources/META-INF/LICENSE
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-nar/src/main/resources/META-INF/LICENSE
new file mode 100644
index 0000000..958de4d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-nar/src/main/resources/META-INF/LICENSE
@@ -0,0 +1,239 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+APACHE NIFI SUBCOMPONENTS:
+
+The Apache NiFi project contains subcomponents with separate copyright
+notices and license terms. Your use of the source code for the these
+subcomponents is subject to the terms and conditions of the following
+licenses.
+
+ The binary distribution of this product bundles 'ParaNamer' and 'Paranamer Core'
+ which is available under a BSD style license.
+
+ Copyright (c) 2006 Paul Hammant & ThoughtWorks Inc
+ All rights reserved.
+
+ Redistribution and use in source and binary forms, with or without
+ modification, are permitted provided that the following conditions
+ are met:
+ 1. Redistributions of source code must retain the above copyright
+ notice, this list of conditions and the following disclaimer.
+ 2. Redistributions in binary form must reproduce the above copyright
+ notice, this list of conditions and the following disclaimer in the
+ documentation and/or other materials provided with the distribution.
+ 3. Neither the name of the copyright holders nor the names of its
+ contributors may be used to endorse or promote products derived from
+ this software without specific prior written permission.
+
+ THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ THE POSSIBILITY OF SUCH DAMAGE.
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-nar/src/main/resources/META-INF/NOTICE
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-nar/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000..4c40843
--- /dev/null
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-nar/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,105 @@
+nifi-parquet-nar
+Copyright 2014-2017 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+===========================================
+Apache Software License v2
+===========================================
+
+The following binary components are provided under the Apache Software License v2
+
+ (ASLv2) Apache Avro
+ The following NOTICE information applies:
+ Apache Avro
+ Copyright 2009-2013 The Apache Software Foundation
+
+ (ASLv2) Apache Commons Codec
+ The following NOTICE information applies:
+ Apache Commons Codec
+ Copyright 2002-2014 The Apache Software Foundation
+
+ src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java
+ contains test data from http://aspell.net/test/orig/batch0.tab.
+ Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org)
+
+ ===============================================================================
+
+ The content of package org.apache.commons.codec.language.bm has been translated
+ from the original php source code available at http://stevemorse.org/phoneticinfo.htm
+ with permission from the original authors.
+ Original source copyright:
+ Copyright (c) 2008 Alexander Beider & Stephen P. Morse.
+
+ (ASLv2) Apache Commons Compress
+ The following NOTICE information applies:
+ Apache Commons Compress
+ Copyright 2002-2014 The Apache Software Foundation
+
+ The files in the package org.apache.commons.compress.archivers.sevenz
+ were derived from the LZMA SDK, version 9.20 (C/ and CPP/7zip/),
+ which has been placed in the public domain:
+
+ "LZMA SDK is placed in the public domain." (http://www.7-zip.org/sdk.html)
+
+ (ASLv2) Apache Commons Lang
+ The following NOTICE information applies:
+ Apache Commons Lang
+ Copyright 2001-2015 The Apache Software Foundation
+
+ This product includes software from the Spring Framework,
+ under the Apache License 2.0 (see: StringUtils.containsWhitespace())
+
+ (ASLv2) Apache Parquet
+ The following NOTICE information applies:
+ Apache Parquet MR (Incubating)
+ Copyright 2014 The Apache Software Foundation
+
+ This product includes software developed at
+ The Apache Software Foundation (http://www.apache.org/).
+
+ (ASLv2) Jackson JSON processor
+ The following NOTICE information applies:
+ # Jackson JSON processor
+
+ Jackson is a high-performance, Free/Open Source JSON processing library.
+ It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has
+ been in development since 2007.
+ It is currently developed by a community of developers, as well as supported
+ commercially by FasterXML.com.
+
+ ## Licensing
+
+ Jackson core and extension components may licensed under different licenses.
+ To find the details that apply to this artifact see the accompanying LICENSE file.
+ For more information, including possible other licensing options, contact
+ FasterXML.com (http://fasterxml.com).
+
+ ## Credits
+
+ A list of contributors may be found from CREDITS file, which is included
+ in some artifacts (usually source distributions); but is always available
+ from the source code management (SCM) system project uses.
+
+ (ASLv2) Snappy Java
+ The following NOTICE information applies:
+ This product includes software developed by Google
+ Snappy: http://code.google.com/p/snappy/ (New BSD License)
+
+ This product includes software developed by Apache
+ PureJavaCrc32C from apache-hadoop-common http://hadoop.apache.org/
+ (Apache 2.0 license)
+
+ This library containd statically linked libstdc++. This inclusion is allowed by
+ "GCC RUntime Library Exception"
+ http://gcc.gnu.org/onlinedocs/libstdc++/manual/license.html
+
+*****************
+Public Domain
+*****************
+
+The following binary components are provided to the 'Public Domain'. See project link for details.
+
+ (Public Domain) XZ for Java (org.tukaani:xz:jar:1.5 - http://tukaani.org/xz/java.html
+
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml
new file mode 100644
index 0000000..c8c1078
--- /dev/null
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml
@@ -0,0 +1,100 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-parquet-bundle</artifactId>
+ <version>1.2.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>nifi-parquet-processors</artifactId>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-utils</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-avro</artifactId>
+ <version>1.8.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-hadoop-record-utils</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-serialization-service-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-schema-registry-service-api</artifactId>
+ </dependency>
+ <!-- Test dependencies -->
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mock-record-utils</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mock</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <configuration>
+ <excludes combine.children="append">
+ <exclude>src/test/resources/avro/user.avsc</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java
new file mode 100644
index 0000000..3536944
--- /dev/null
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.parquet;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.Restricted;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.hadoop.AbstractFetchHDFSRecord;
+import org.apache.nifi.processors.hadoop.record.HDFSRecordReader;
+import org.apache.nifi.processors.parquet.record.AvroParquetHDFSRecordReader;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.hadoop.ParquetReader;
+
+import java.io.IOException;
+
+@SupportsBatching
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"parquet", "hadoop", "HDFS", "get", "ingest", "fetch", "source", "restricted"})
+@CapabilityDescription("Reads from a given Parquet file and writes records to the content of the flow file using " +
+ "the selected record writer. The original Parquet file will remain unchanged, and the content of the flow file " +
+ "will be replaced with records of the selected type. This processor can be used with ListHDFS or ListFile to obtain " +
+ "a listing of files to fetch.")
+@WritesAttributes({
+ @WritesAttribute(attribute="fetch.failure.reason", description="When a FlowFile is routed to 'failure', this attribute is added " +
+ "indicating why the file could not be fetched from the given filesystem."),
+ @WritesAttribute(attribute = "record.count", description = "The number of records in the resulting flow file")
+})
+@SeeAlso({PutParquet.class})
+@Restricted("Provides operator the ability to retrieve any file that NiFi has access to in HDFS or the local filesystem.")
+public class FetchParquet extends AbstractFetchHDFSRecord {
+
+ @Override
+ public HDFSRecordReader createHDFSRecordReader(final ProcessContext context, final FlowFile flowFile, final Configuration conf, final Path path)
+ throws IOException {
+ final ParquetReader.Builder<GenericRecord> readerBuilder = AvroParquetReader.<GenericRecord>builder(path).withConf(conf);
+ return new AvroParquetHDFSRecordReader(readerBuilder.build());
+ }
+
+}