You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2015/04/20 05:12:51 UTC

[14/50] [abbrv] incubator-nifi git commit: 504: Initial import of geo enrich processors

504: Initial import of geo enrich processors


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/ff0bd2c6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/ff0bd2c6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/ff0bd2c6

Branch: refs/heads/NIFI-271
Commit: ff0bd2c6696e73d3157eb2d0116913e337410f38
Parents: 45416dc
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Apr 9 17:54:59 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Apr 9 17:54:59 2015 -0400

----------------------------------------------------------------------
 .../nifi-geo-bundle/nifi-geo-nar/pom.xml        |  33 +++
 .../nifi-geo-processors/.gitignore              |   1 +
 .../nifi-geo-bundle/nifi-geo-processors/pom.xml |  43 +++
 .../org/apache/nifi/processors/GeoEnrichIP.java | 210 ++++++++++++++
 .../nifi/processors/maxmind/DatabaseReader.java | 286 +++++++++++++++++++
 .../org.apache.nifi.processor.Processor         |  16 ++
 nifi/nifi-nar-bundles/nifi-geo-bundle/pom.xml   |  42 +++
 7 files changed, 631 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/ff0bd2c6/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-nar/pom.xml b/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-nar/pom.xml
new file mode 100644
index 0000000..484e291
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-nar/pom.xml
@@ -0,0 +1,33 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-geo-bundle</artifactId>
+        <version>0.1.0-incubating-SNAPSHOT</version>
+    </parent>
+    <artifactId>nifi-geo-nar</artifactId>
+    <packaging>nar</packaging>
+    <description>NiFi Geo Enrichment NAR</description>
+	
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-geo-processors</artifactId>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/ff0bd2c6/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-processors/.gitignore
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-processors/.gitignore b/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-processors/.gitignore
new file mode 100644
index 0000000..b83d222
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-processors/.gitignore
@@ -0,0 +1 @@
+/target/

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/ff0bd2c6/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-processors/pom.xml b/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-processors/pom.xml
new file mode 100644
index 0000000..67bc253
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-processors/pom.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-geo-bundle</artifactId>
+        <version>0.1.0-incubating-SNAPSHOT</version>
+    </parent>
+    <artifactId>nifi-geo-processors</artifactId>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-processor-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.maxmind.geoip2</groupId>
+            <artifactId>geoip2</artifactId>
+            <version>2.1.0</version>
+        </dependency>        
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/ff0bd2c6/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIP.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIP.java b/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIP.java
new file mode 100644
index 0000000..fed0e7e
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIP.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.maxmind.DatabaseReader;
+import org.apache.nifi.util.StopWatch;
+
+import com.maxmind.geoip2.exception.GeoIp2Exception;
+import com.maxmind.geoip2.model.CityResponse;
+import com.maxmind.geoip2.record.Subdivision;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"geo", "enrich", "ip", "maxmind"})
+@CapabilityDescription("Looks up geolocation information for an IP address and adds the geo information to FlowFile attributes. The "
+		+ "geo data is provided as a MaxMind database. The attribute that contains the IP address to lookup is provided by the "
+		+ "'IP Address Attribute' property. If the name of the attribute provided is 'X', then the the attributes added by enrichment "
+		+ "will take the form X.geo.<fieldName>")
+@WritesAttributes({
+	@WritesAttribute(attribute="X.geo.lookup.micros", description="The number of microseconds that the geo lookup took"),
+	@WritesAttribute(attribute="X.geo.city", description="The city identified for the IP address"),
+	@WritesAttribute(attribute="X.geo.latitude", description="The latitude identified for this IP address"),
+	@WritesAttribute(attribute="X.geo.longitude", description="The longitude identified for this IP address"),
+	@WritesAttribute(attribute="X.geo.subdivision.N", description="Each subdivision that is identified for this IP address is added with a one-up number appended to the attribute name, starting with 0"),
+	@WritesAttribute(attribute="X.geo.subdivision.isocode.N", description="The ISO code for the subdivision that is identified by X.geo.subdivision.N"),
+	@WritesAttribute(attribute="X.geo.country", description="The country identified for this IP address"),
+	@WritesAttribute(attribute="X.geo.country.isocode", description="The ISO Code for the country identified"),
+	@WritesAttribute(attribute="X.geo.postalcode", description="The postal code for the country identified"),
+})
+public class GeoEnrichIP extends AbstractProcessor {
+
+    public static final PropertyDescriptor GEO_DATABASE_FILE = new PropertyDescriptor.Builder()
+            .name("Geo Database File")
+            .description("Path to Maxmind Geo Enrichment Database File")
+            .required(true)
+            .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor IP_ADDRESS_ATTRIBUTE = new PropertyDescriptor.Builder()
+            .name("IP Address Attribute")
+            .required(true)
+            .description("The name of an attribute whose value is a dotted decimal IP address for which enrichment should occur")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final Relationship REL_FOUND = new Relationship.Builder()
+            .name("found")
+            .description("Where to route flow files after successfully enriching attributes with geo data")
+            .build();
+
+    public static final Relationship REL_NOT_FOUND = new Relationship.Builder()
+            .name("not found")
+            .description("Where to route flow files after unsuccessfully enriching attributes because no geo data was found")
+            .build();
+
+    private Set<Relationship> relationships;
+    private List<PropertyDescriptor> propertyDescriptors;
+    private final AtomicReference<DatabaseReader> databaseReaderRef = new AtomicReference<>(null);
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @OnScheduled
+    public final void onScheduled(final ProcessContext context) throws IOException {
+        final String dbFileString = context.getProperty(GEO_DATABASE_FILE).getValue();
+        final File dbFile = new File(dbFileString);
+        final StopWatch stopWatch = new StopWatch(true);
+        final DatabaseReader reader = new DatabaseReader.Builder(dbFile).build();
+        stopWatch.stop();
+        getLogger().info("Completed loading of Maxmind Geo Database.  Elapsed time was {} milliseconds.", new Object[]{stopWatch.getDuration(TimeUnit.MILLISECONDS)});
+        databaseReaderRef.set(reader);
+    }
+
+    @OnStopped
+    public void closeReader() throws IOException {
+    	final DatabaseReader reader = databaseReaderRef.get();
+    	if ( reader != null ) {
+    		reader.close();
+    	}
+    }
+    
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_FOUND);
+        rels.add(REL_NOT_FOUND);
+        this.relationships = Collections.unmodifiableSet(rels);
+
+        final List<PropertyDescriptor> props = new ArrayList<>();
+        props.add(GEO_DATABASE_FILE);
+        props.add(IP_ADDRESS_ATTRIBUTE);
+        this.propertyDescriptors = Collections.unmodifiableList(props);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final DatabaseReader dbReader = databaseReaderRef.get();
+        final String ipAttributeName = context.getProperty(IP_ADDRESS_ATTRIBUTE).getValue();
+        final String ipAttributeValue = flowFile.getAttribute(ipAttributeName);
+        if (StringUtils.isEmpty(ipAttributeName)) { //TODO need to add additional validation - should look like an IPv4 or IPv6 addr for instance
+            session.transfer(flowFile, REL_NOT_FOUND);
+            getLogger().warn("Unable to find ip address for {}", new Object[]{flowFile});
+            return;
+        }
+        InetAddress inetAddress = null;
+        CityResponse response = null;
+
+        try {
+            inetAddress = InetAddress.getByName(ipAttributeValue);
+        } catch (final IOException ioe) {
+            session.transfer(flowFile, REL_NOT_FOUND);
+            getLogger().warn("Could not resolve {} to ip address for {}", new Object[]{ipAttributeValue, flowFile}, ioe);
+            return;
+        }
+        final StopWatch stopWatch = new StopWatch(true);
+        try {
+            response = dbReader.city(inetAddress);
+            stopWatch.stop();
+        } catch (final IOException | GeoIp2Exception ex) {
+            session.transfer(flowFile, REL_NOT_FOUND);
+            getLogger().warn("Failure while trying to find enrichment data for {} due to {}", new Object[]{flowFile, ex}, ex);
+            return;
+        }
+        
+        if (response == null) {
+            session.transfer(flowFile, REL_NOT_FOUND);
+            getLogger().warn("No enrichment data found for ip {} of {}", new Object[]{ipAttributeValue, flowFile});
+            return;
+        }
+
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put(new StringBuilder(ipAttributeName).append(".geo.lookup.micros").toString(), String.valueOf(stopWatch.getDuration(TimeUnit.MICROSECONDS)));
+        attrs.put(new StringBuilder(ipAttributeName).append(".geo.city").toString(), response.getCity().getName());
+        attrs.put(new StringBuilder(ipAttributeName).append(".geo.latitude").toString(), response.getLocation().getLatitude().toString());
+        attrs.put(new StringBuilder(ipAttributeName).append(".geo.longitude").toString(), response.getLocation().getLongitude().toString());
+        int i = 0;
+        for (final Subdivision subd : response.getSubdivisions()) {
+            attrs.put(new StringBuilder(ipAttributeName).append(".geo.subdivision.").append(i).toString(), subd.getName());
+            attrs.put(new StringBuilder(ipAttributeName).append(".geo.subdivision.isocode.").append(i).toString(), subd.getIsoCode());
+            i++;
+        }
+        attrs.put(new StringBuilder(ipAttributeName).append(".geo.country").toString(), response.getCountry().getName());
+        attrs.put(new StringBuilder(ipAttributeName).append(".geo.country.isocode").toString(), response.getCountry().getIsoCode());
+        attrs.put(new StringBuilder(ipAttributeName).append(".geo.postalcode").toString(), response.getPostal().getCode());
+        flowFile = session.putAllAttributes(flowFile, attrs);
+
+        session.transfer(flowFile, REL_FOUND);
+        getLogger().info("Completed lookup of IP geo information for {}", new Object[]{flowFile});
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/ff0bd2c6/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-processors/src/main/java/org/apache/nifi/processors/maxmind/DatabaseReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-processors/src/main/java/org/apache/nifi/processors/maxmind/DatabaseReader.java b/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-processors/src/main/java/org/apache/nifi/processors/maxmind/DatabaseReader.java
new file mode 100644
index 0000000..796a7af
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-processors/src/main/java/org/apache/nifi/processors/maxmind/DatabaseReader.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.maxmind;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.List;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.maxmind.db.Metadata;
+import com.maxmind.db.Reader;
+import com.maxmind.db.Reader.FileMode;
+import com.maxmind.geoip2.GeoIp2Provider;
+import com.maxmind.geoip2.exception.AddressNotFoundException;
+import com.maxmind.geoip2.exception.GeoIp2Exception;
+import com.maxmind.geoip2.model.AnonymousIpResponse;
+import com.maxmind.geoip2.model.CityResponse;
+import com.maxmind.geoip2.model.ConnectionTypeResponse;
+import com.maxmind.geoip2.model.CountryResponse;
+import com.maxmind.geoip2.model.DomainResponse;
+import com.maxmind.geoip2.model.IspResponse;
+
+/**
+ * <p>
+ * This class was copied from 
+ * https://raw.githubusercontent.com/maxmind/GeoIP2-java/master/src/main/java/com/maxmind/geoip2/DatabaseReader.java
+ * It is written by Maxmind and it is available under Apache Software License V2
+ * Copyright (c) 2013 by MaxMind, Inc.
+ * The modification we're making to the code below is to stop using exceptions for
+ * mainline flow control.  Specifically we don't want to throw an exception
+ * simply because an address was not found.
+ * </p>
+ * 
+ * Instances of this class provide a reader for the GeoIP2 database format. IP
+ * addresses can be looked up using the <code>get</code> method.
+ */
+public class DatabaseReader implements GeoIp2Provider, Closeable {
+
+    private final Reader reader;
+
+    private final ObjectMapper om;
+
+    private DatabaseReader(Builder builder) throws IOException {
+        if (builder.stream != null) {
+            this.reader = new Reader(builder.stream);
+        } else if (builder.database != null) {
+            this.reader = new Reader(builder.database, builder.mode);
+        } else {
+            // This should never happen. If it does, review the Builder class
+            // constructors for errors.
+            throw new IllegalArgumentException(
+                    "Unsupported Builder configuration: expected either File or URL");
+        }
+        this.om = new ObjectMapper();
+        this.om.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
+                false);
+        this.om.configure(
+                DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL, true);
+        InjectableValues inject = new InjectableValues.Std().addValue(
+                "locales", builder.locales);
+        this.om.setInjectableValues(inject);
+    }
+
+    /**
+     * <p>
+     * Constructs a Builder for the DatabaseReader. The file passed to it must
+     * be a valid GeoIP2 database file.
+     * </p>
+     * <p>
+     * <code>Builder</code> creates instances of <code>DatabaseReader</code>
+     * from values set by the methods.
+     * </p>
+     * <p>
+     * Only the values set in the <code>Builder</code> constructor are required.
+     * </p>
+     */
+    public final static class Builder {
+        final File database;
+        final InputStream stream;
+
+        List<String> locales = Arrays.asList("en");
+        FileMode mode = FileMode.MEMORY_MAPPED;
+
+        /**
+         * @param stream the stream containing the GeoIP2 database to use.
+         */
+        public Builder(InputStream stream) {
+            this.stream = stream;
+            this.database = null;
+        }
+
+        /**
+         * @param database the GeoIP2 database file to use.
+         */
+        public Builder(File database) {
+            this.database = database;
+            this.stream = null;
+        }
+
+        /**
+         * @param val List of locale codes to use in name property from most
+         *            preferred to least preferred.
+         * @return Builder object
+         */
+        public Builder locales(List<String> val) {
+            this.locales = val;
+            return this;
+        }
+
+        /**
+         * @param val The file mode used to open the GeoIP2 database
+         * @return Builder object
+         * @throws java.lang.IllegalArgumentException if you initialized the Builder with a URL, which uses
+         *                                            {@link FileMode#MEMORY}, but you provided a different
+         *                                            FileMode to this method.
+         */
+        public Builder fileMode(FileMode val) {
+            if (this.stream != null && !FileMode.MEMORY.equals(val)) {
+                throw new IllegalArgumentException(
+                        "Only FileMode.MEMORY is supported when using an InputStream.");
+            }
+            this.mode = val;
+            return this;
+        }
+
+        /**
+         * @return an instance of <code>DatabaseReader</code> created from the
+         * fields set on this builder.
+         * @throws IOException if there is an error reading the database
+         */
+        public DatabaseReader build() throws IOException {
+            return new DatabaseReader(this);
+        }
+    }
+
+    /**
+     * @param ipAddress IPv4 or IPv6 address to lookup.
+     * @return A <T> object with the data for the IP address or null if no 
+     * information could be found for the given IP address
+     * @throws IOException              if there is an error opening or reading from the file.
+     */
+    private <T> T get(InetAddress ipAddress, Class<T> cls, boolean hasTraits,
+                      String type) throws IOException, AddressNotFoundException {
+
+        String databaseType = this.getMetadata().getDatabaseType();
+        if (!databaseType.contains(type)) {
+            String caller = Thread.currentThread().getStackTrace()[2]
+                    .getMethodName();
+            throw new UnsupportedOperationException(
+                    "Invalid attempt to open a " + databaseType
+                            + " database using the " + caller + " method");
+        }
+
+        ObjectNode node = (ObjectNode) this.reader.get(ipAddress);
+
+        if (node == null) {
+            return null;
+        }
+
+        ObjectNode ipNode;
+        if (hasTraits) {
+            if (!node.has("traits")) {
+                node.set("traits", this.om.createObjectNode());
+            }
+            ipNode = (ObjectNode) node.get("traits");
+        } else {
+            ipNode = node;
+        }
+        ipNode.put("ip_address", ipAddress.getHostAddress());
+
+        return this.om.treeToValue(node, cls);
+    }
+
+    /**
+     * <p>
+     * Closes the database.
+     * </p>
+     * <p>
+     * If you are using <code>FileMode.MEMORY_MAPPED</code>, this will
+     * <em>not</em> unmap the underlying file due to a limitation in Java's
+     * <code>MappedByteBuffer</code>. It will however set the reference to
+     * the buffer to <code>null</code>, allowing the garbage collector to
+     * collect it.
+     * </p>
+     *
+     * @throws IOException if an I/O error occurs.
+     */
+    @Override
+    public void close() throws IOException {
+        this.reader.close();
+    }
+
+    @Override
+    public CountryResponse country(InetAddress ipAddress) throws IOException,
+            GeoIp2Exception {
+        return this.get(ipAddress, CountryResponse.class, true, "Country");
+    }
+
+    @Override
+    public CityResponse city(InetAddress ipAddress) throws IOException,
+            GeoIp2Exception {
+        return this.get(ipAddress, CityResponse.class, true, "City");
+    }
+
+    /**
+     * Look up an IP address in a GeoIP2 Anonymous IP.
+     *
+     * @param ipAddress IPv4 or IPv6 address to lookup.
+     * @return a AnonymousIpResponse for the requested IP address.
+     * @throws GeoIp2Exception if there is an error looking up the IP
+     * @throws IOException     if there is an IO error
+     */
+    public AnonymousIpResponse anonymousIp(InetAddress ipAddress) throws IOException,
+            GeoIp2Exception {
+        return this.get(ipAddress, AnonymousIpResponse.class, false, "GeoIP2-Anonymous-IP");
+    }
+
+    /**
+     * Look up an IP address in a GeoIP2 Connection Type database.
+     *
+     * @param ipAddress IPv4 or IPv6 address to lookup.
+     * @return a ConnectTypeResponse for the requested IP address.
+     * @throws GeoIp2Exception if there is an error looking up the IP
+     * @throws IOException     if there is an IO error
+     */
+    public ConnectionTypeResponse connectionType(InetAddress ipAddress)
+            throws IOException, GeoIp2Exception {
+        return this.get(ipAddress, ConnectionTypeResponse.class, false,
+                "GeoIP2-Connection-Type");
+    }
+
+    /**
+     * Look up an IP address in a GeoIP2 Domain database.
+     *
+     * @param ipAddress IPv4 or IPv6 address to lookup.
+     * @return a DomainResponse for the requested IP address.
+     * @throws GeoIp2Exception if there is an error looking up the IP
+     * @throws IOException     if there is an IO error
+     */
+    public DomainResponse domain(InetAddress ipAddress) throws IOException,
+            GeoIp2Exception {
+        return this
+                .get(ipAddress, DomainResponse.class, false, "GeoIP2-Domain");
+    }
+
+    /**
+     * Look up an IP address in a GeoIP2 ISP database.
+     *
+     * @param ipAddress IPv4 or IPv6 address to lookup.
+     * @return an IspResponse for the requested IP address.
+     * @throws GeoIp2Exception if there is an error looking up the IP
+     * @throws IOException     if there is an IO error
+     */
+    public IspResponse isp(InetAddress ipAddress) throws IOException,
+            GeoIp2Exception {
+        return this.get(ipAddress, IspResponse.class, false, "GeoIP2-ISP");
+    }
+
+    /**
+     * @return the metadata for the open MaxMind DB file.
+     */
+    public Metadata getMetadata() {
+        return this.reader.getMetadata();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/ff0bd2c6/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000..9b1be71
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.nifi.processors.GeoEnrichIP
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/ff0bd2c6/nifi/nifi-nar-bundles/nifi-geo-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-geo-bundle/pom.xml b/nifi/nifi-nar-bundles/nifi-geo-bundle/pom.xml
new file mode 100644
index 0000000..2dbd32f
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-geo-bundle/pom.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-nar-bundles</artifactId>
+        <version>0.1.0-incubating-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-geo-bundle</artifactId>
+    <packaging>pom</packaging>
+    <description>NiFi Geo Enrichment Capability Set</description>
+	
+    <modules>
+        <module>nifi-geo-processors</module>
+        <module>nifi-geo-nar</module>
+    </modules>
+	
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-geo-processors</artifactId>
+                <version>0.1.0-incubating-SNAPSHOT</version>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+</project>