You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/10/27 21:05:28 UTC

[GitHub] [nifi] markap14 commented on a change in pull request #5476: NIFI-9259: Adding GeohashRecord Processor

markap14 commented on a change in pull request #5476:
URL: https://github.com/apache/nifi/pull/5476#discussion_r737816742



##########
File path: nifi-nar-bundles/nifi-geohash-bundle/nifi-geohash-processors/src/main/java/org/apache/nifi/processors/geohash/GeohashRecord.java
##########
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.geohash;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import ch.hsr.geohash.GeoHash;
+import ch.hsr.geohash.WGS84Point;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Optional;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"geo", "geohash", "record"})
+@CapabilityDescription("A record-based processor that encodes and decodes Geohashes from and to latitude/longitude coordinates.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public class GeohashRecord extends AbstractProcessor {
+    public static final String ENCODE_MODE = "Encode";
+    public static final String DECODE_MODE = "Decode";
+
+    public static final String BINARY_STRING = "BinaryString";
+    public static final String BASE32 = "Base32";
+    public static final String LONG_INT = "LongInt";
+
+    public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
+            .name("mode")
+            .displayName("Mode")
+            .description("Specifies whether to encode latitude/longitude to geohash or decode geohash to latitude/longitude")
+            .required(true)
+            .allowableValues(ENCODE_MODE, DECODE_MODE)
+            .defaultValue(ENCODE_MODE)
+            .build();
+
+    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the record reader service to use for reading incoming data")
+            .required(true)
+            .identifiesControllerService(RecordReaderFactory.class)
+            .build();
+
+    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("Specifies the record writer service to use for writing data")
+            .required(true)
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .build();
+
+    public static final PropertyDescriptor LATITUDE_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("latitude-record-path")
+            .displayName("Latitude Record Path")
+            .description("In the encode mode, this property specifies the record path to retrieve the latitude value;"
+                    + "in the decode mode, this property specifies the record path to put the latitude value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor LONGITUDE_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("longitude-record-path")
+            .displayName("Longitude Record Path")
+            .description("In the encode mode, this property specifies the record path to retrieve the longitude value;"

Review comment:
       space after semicolon

##########
File path: nifi-nar-bundles/nifi-geohash-bundle/nifi-geohash-processors/src/main/java/org/apache/nifi/processors/geohash/GeohashRecord.java
##########
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.geohash;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import ch.hsr.geohash.GeoHash;
+import ch.hsr.geohash.WGS84Point;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Optional;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"geo", "geohash", "record"})
+@CapabilityDescription("A record-based processor that encodes and decodes Geohashes from and to latitude/longitude coordinates.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public class GeohashRecord extends AbstractProcessor {
+    public static final String ENCODE_MODE = "Encode";
+    public static final String DECODE_MODE = "Decode";
+
+    public static final String BINARY_STRING = "BinaryString";
+    public static final String BASE32 = "Base32";
+    public static final String LONG_INT = "LongInt";
+
+    public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
+            .name("mode")
+            .displayName("Mode")
+            .description("Specifies whether to encode latitude/longitude to geohash or decode geohash to latitude/longitude")
+            .required(true)
+            .allowableValues(ENCODE_MODE, DECODE_MODE)
+            .defaultValue(ENCODE_MODE)
+            .build();
+
+    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the record reader service to use for reading incoming data")
+            .required(true)
+            .identifiesControllerService(RecordReaderFactory.class)
+            .build();
+
+    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("Specifies the record writer service to use for writing data")
+            .required(true)
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .build();
+
+    public static final PropertyDescriptor LATITUDE_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("latitude-record-path")
+            .displayName("Latitude Record Path")
+            .description("In the encode mode, this property specifies the record path to retrieve the latitude value;"
+                    + "in the decode mode, this property specifies the record path to put the latitude value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor LONGITUDE_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("longitude-record-path")
+            .displayName("Longitude Record Path")
+            .description("In the encode mode, this property specifies the record path to retrieve the longitude value;"
+                    + "in the decode mode, this property specifies the record path to put the longitude value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)

Review comment:
       Should use `new RecordPathValidator()` here

##########
File path: nifi-nar-bundles/nifi-geohash-bundle/nifi-geohash-processors/src/main/java/org/apache/nifi/processors/geohash/GeohashRecord.java
##########
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.geohash;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import ch.hsr.geohash.GeoHash;
+import ch.hsr.geohash.WGS84Point;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Optional;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"geo", "geohash", "record"})
+@CapabilityDescription("A record-based processor that encodes and decodes Geohashes from and to latitude/longitude coordinates.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public class GeohashRecord extends AbstractProcessor {
+    public static final String ENCODE_MODE = "Encode";
+    public static final String DECODE_MODE = "Decode";
+
+    public static final String BINARY_STRING = "BinaryString";
+    public static final String BASE32 = "Base32";
+    public static final String LONG_INT = "LongInt";
+
+    public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
+            .name("mode")
+            .displayName("Mode")
+            .description("Specifies whether to encode latitude/longitude to geohash or decode geohash to latitude/longitude")
+            .required(true)
+            .allowableValues(ENCODE_MODE, DECODE_MODE)
+            .defaultValue(ENCODE_MODE)
+            .build();
+
+    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the record reader service to use for reading incoming data")
+            .required(true)
+            .identifiesControllerService(RecordReaderFactory.class)
+            .build();
+
+    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("Specifies the record writer service to use for writing data")
+            .required(true)
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .build();
+
+    public static final PropertyDescriptor LATITUDE_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("latitude-record-path")
+            .displayName("Latitude Record Path")
+            .description("In the encode mode, this property specifies the record path to retrieve the latitude value;"
+                    + "in the decode mode, this property specifies the record path to put the latitude value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor LONGITUDE_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("longitude-record-path")
+            .displayName("Longitude Record Path")
+            .description("In the encode mode, this property specifies the record path to retrieve the longitude value;"
+                    + "in the decode mode, this property specifies the record path to put the longitude value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_FORMAT = new PropertyDescriptor.Builder()
+            .name("geohash-format")
+            .displayName("Geohash Format")
+            .description("In the encode mode, this property specifies the desired format for encoding geohash;"
+                    + "in the decode mode, this property specifies the format of geohash provided")
+            .required(true)
+            .allowableValues(BASE32, BINARY_STRING, LONG_INT)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_LEVEL = new PropertyDescriptor.Builder()
+            .name("geohash-level")
+            .displayName("Geohash Level")
+            .description("The integer precision level(1-12) desired for encoding geohash")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .dependsOn(MODE, ENCODE_MODE)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("geohash-record-path")
+            .displayName("Geohash Record Path")
+            .description("In the encode mode, this property specifies the record path to put the geohash value;"
+                    + "in the decode mode, this property specifies the record path to retrieve the geohash value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Records that are successfully encoded or decoded will be routed to success")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("Records that cannot be encoded or decoded will be routed to failure")
+            .build();
+
+    public static final List<PropertyDescriptor> RECORD_PATH_PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            LATITUDE_RECORD_PATH,LONGITUDE_RECORD_PATH,GEOHASH_RECORD_PATH
+    ));
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        descriptors = new ArrayList<>();
+        descriptors.add(MODE);
+        descriptors.add(RECORD_READER);
+        descriptors.add(RECORD_WRITER);
+        descriptors.add(LATITUDE_RECORD_PATH);
+        descriptors.add(LONGITUDE_RECORD_PATH);
+        descriptors.add(GEOHASH_FORMAT);
+        descriptors.add(GEOHASH_LEVEL);
+        descriptors.add(GEOHASH_RECORD_PATH);

Review comment:
       Would recommend adding GEOHASH_RECORD_PATH immediately following LONGITUDE_RECORD_PATH so that the RecordPath properties are provided together

##########
File path: nifi-nar-bundles/nifi-geohash-bundle/nifi-geohash-processors/src/main/java/org/apache/nifi/processors/geohash/GeohashRecord.java
##########
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.geohash;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import ch.hsr.geohash.GeoHash;
+import ch.hsr.geohash.WGS84Point;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Optional;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"geo", "geohash", "record"})
+@CapabilityDescription("A record-based processor that encodes and decodes Geohashes from and to latitude/longitude coordinates.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public class GeohashRecord extends AbstractProcessor {
+    public static final String ENCODE_MODE = "Encode";
+    public static final String DECODE_MODE = "Decode";
+
+    public static final String BINARY_STRING = "BinaryString";
+    public static final String BASE32 = "Base32";
+    public static final String LONG_INT = "LongInt";
+
+    public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
+            .name("mode")
+            .displayName("Mode")
+            .description("Specifies whether to encode latitude/longitude to geohash or decode geohash to latitude/longitude")
+            .required(true)
+            .allowableValues(ENCODE_MODE, DECODE_MODE)
+            .defaultValue(ENCODE_MODE)
+            .build();
+
+    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the record reader service to use for reading incoming data")
+            .required(true)
+            .identifiesControllerService(RecordReaderFactory.class)
+            .build();
+
+    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("Specifies the record writer service to use for writing data")
+            .required(true)
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .build();
+
+    public static final PropertyDescriptor LATITUDE_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("latitude-record-path")
+            .displayName("Latitude Record Path")
+            .description("In the encode mode, this property specifies the record path to retrieve the latitude value;"
+                    + "in the decode mode, this property specifies the record path to put the latitude value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor LONGITUDE_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("longitude-record-path")
+            .displayName("Longitude Record Path")
+            .description("In the encode mode, this property specifies the record path to retrieve the longitude value;"
+                    + "in the decode mode, this property specifies the record path to put the longitude value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_FORMAT = new PropertyDescriptor.Builder()
+            .name("geohash-format")
+            .displayName("Geohash Format")
+            .description("In the encode mode, this property specifies the desired format for encoding geohash;"
+                    + "in the decode mode, this property specifies the format of geohash provided")
+            .required(true)
+            .allowableValues(BASE32, BINARY_STRING, LONG_INT)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_LEVEL = new PropertyDescriptor.Builder()
+            .name("geohash-level")
+            .displayName("Geohash Level")
+            .description("The integer precision level(1-12) desired for encoding geohash")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .dependsOn(MODE, ENCODE_MODE)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("geohash-record-path")
+            .displayName("Geohash Record Path")
+            .description("In the encode mode, this property specifies the record path to put the geohash value;"
+                    + "in the decode mode, this property specifies the record path to retrieve the geohash value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Records that are successfully encoded or decoded will be routed to success")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("Records that cannot be encoded or decoded will be routed to failure")
+            .build();
+
+    public static final List<PropertyDescriptor> RECORD_PATH_PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            LATITUDE_RECORD_PATH,LONGITUDE_RECORD_PATH,GEOHASH_RECORD_PATH
+    ));
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        descriptors = new ArrayList<>();
+        descriptors.add(MODE);
+        descriptors.add(RECORD_READER);
+        descriptors.add(RECORD_WRITER);
+        descriptors.add(LATITUDE_RECORD_PATH);
+        descriptors.add(LONGITUDE_RECORD_PATH);
+        descriptors.add(GEOHASH_FORMAT);
+        descriptors.add(GEOHASH_LEVEL);
+        descriptors.add(GEOHASH_RECORD_PATH);
+        descriptors = Collections.unmodifiableList(descriptors);
+
+        relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) {
+        FlowFile input = session.get();
+        if ( input == null ) {
+            return;
+        }
+
+        final RecordReaderFactory readerFactory =  context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final boolean encode = context.getProperty(MODE).getValue().equalsIgnoreCase(ENCODE_MODE);
+        final String format = context.getProperty(GEOHASH_FORMAT).getValue();
+
+        FlowFile output = session.create(input);
+        try (InputStream is = session.read(input);
+             OutputStream os = session.write(output)) {
+            RecordPathCache cache = new RecordPathCache(RECORD_PATH_PROPERTIES.size() + 1);
+            Map<PropertyDescriptor, RecordPath> paths = new HashMap<>();
+
+            for (PropertyDescriptor descriptor : RECORD_PATH_PROPERTIES) {
+                String rawRecordPath = context.getProperty(descriptor).evaluateAttributeExpressions(input).getValue();
+                RecordPath compiled = cache.getCompiled(rawRecordPath);
+                paths.put(descriptor, compiled);
+            }
+
+            RecordReader reader = readerFactory.createRecordReader(input, is, getLogger());
+            RecordSchema schema = writerFactory.getSchema(input.getAttributes(), reader.getSchema());
+            RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, os, output);
+
+            Record record;
+            Relationship targetRelationship = REL_FAILURE;
+            writer.beginRecordSet();
+
+            int recordsCount = 0;
+            boolean updated = false;
+            while ((record = reader.nextRecord()) != null) {
+                if(encode) {
+                    final int level = Integer.parseInt(context.getProperty(GEOHASH_LEVEL).getValue());
+                    final String rawLatitudePath = context.getProperty(LATITUDE_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+                    RecordPath latitudePath = cache.getCompiled(rawLatitudePath);
+                    final String rawLongitudePath = context.getProperty(LONGITUDE_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+                    RecordPath longitudePath = cache.getCompiled(rawLongitudePath);
+
+                    Object encodedGeohash = getEncodedGeohash(latitudePath, longitudePath, record, format, level);
+
+                    updated = updateRecord(GEOHASH_RECORD_PATH, encodedGeohash, record, paths);
+                }else {
+                    final String rawgeohashPath = context.getProperty(GEOHASH_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+                    RecordPath geohashPath = cache.getCompiled(rawgeohashPath);
+
+                    WGS84Point decodedPoint = getDecodedPointFromGeohash(geohashPath, record, format);
+
+                    if(decodedPoint != null) {
+                        updated = updateRecord(LATITUDE_RECORD_PATH, String.valueOf(decodedPoint.getLatitude()), record, paths)
+                                &&  updateRecord(LONGITUDE_RECORD_PATH, String.valueOf(decodedPoint.getLongitude()), record, paths);
+                    }
+                }
+
+                if (updated) {
+                    targetRelationship = REL_SUCCESS;
+                }
+
+                writer.write(record);
+                recordsCount++;
+
+            }
+            writer.finishRecordSet();
+            writer.close();
+
+            is.close();
+            os.close();
+
+            output = session.putAllAttributes(output, buildAttributes(recordsCount, writer.getMimeType()));
+
+            session.transfer(output, targetRelationship);
+            session.remove(input);
+
+            session.getProvenanceReporter().modifyContent(output);
+        } catch (Exception ex) {
+            getLogger().error("Failed to {}; will route to failure", new Object[]{encode ? "encode" : "decode" ,ex});

Review comment:
       No need for the explicit array constructor -- can just give `encode ? "encode" : "decode", ex` as the argument list.

##########
File path: nifi-nar-bundles/nifi-geohash-bundle/nifi-geohash-processors/src/main/java/org/apache/nifi/processors/geohash/GeohashRecord.java
##########
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.geohash;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import ch.hsr.geohash.GeoHash;
+import ch.hsr.geohash.WGS84Point;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Optional;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"geo", "geohash", "record"})
+@CapabilityDescription("A record-based processor that encodes and decodes Geohashes from and to latitude/longitude coordinates.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public class GeohashRecord extends AbstractProcessor {
+    public static final String ENCODE_MODE = "Encode";
+    public static final String DECODE_MODE = "Decode";
+
+    public static final String BINARY_STRING = "BinaryString";
+    public static final String BASE32 = "Base32";
+    public static final String LONG_INT = "LongInt";
+
+    public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
+            .name("mode")
+            .displayName("Mode")
+            .description("Specifies whether to encode latitude/longitude to geohash or decode geohash to latitude/longitude")
+            .required(true)
+            .allowableValues(ENCODE_MODE, DECODE_MODE)
+            .defaultValue(ENCODE_MODE)
+            .build();
+
+    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the record reader service to use for reading incoming data")
+            .required(true)
+            .identifiesControllerService(RecordReaderFactory.class)
+            .build();
+
+    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("Specifies the record writer service to use for writing data")
+            .required(true)
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .build();
+
+    public static final PropertyDescriptor LATITUDE_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("latitude-record-path")
+            .displayName("Latitude Record Path")
+            .description("In the encode mode, this property specifies the record path to retrieve the latitude value;"
+                    + "in the decode mode, this property specifies the record path to put the latitude value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor LONGITUDE_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("longitude-record-path")
+            .displayName("Longitude Record Path")
+            .description("In the encode mode, this property specifies the record path to retrieve the longitude value;"
+                    + "in the decode mode, this property specifies the record path to put the longitude value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_FORMAT = new PropertyDescriptor.Builder()
+            .name("geohash-format")
+            .displayName("Geohash Format")
+            .description("In the encode mode, this property specifies the desired format for encoding geohash;"
+                    + "in the decode mode, this property specifies the format of geohash provided")
+            .required(true)
+            .allowableValues(BASE32, BINARY_STRING, LONG_INT)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_LEVEL = new PropertyDescriptor.Builder()
+            .name("geohash-level")
+            .displayName("Geohash Level")
+            .description("The integer precision level(1-12) desired for encoding geohash")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .dependsOn(MODE, ENCODE_MODE)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("geohash-record-path")
+            .displayName("Geohash Record Path")
+            .description("In the encode mode, this property specifies the record path to put the geohash value;"
+                    + "in the decode mode, this property specifies the record path to retrieve the geohash value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Records that are successfully encoded or decoded will be routed to success")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("Records that cannot be encoded or decoded will be routed to failure")
+            .build();
+
+    public static final List<PropertyDescriptor> RECORD_PATH_PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            LATITUDE_RECORD_PATH,LONGITUDE_RECORD_PATH,GEOHASH_RECORD_PATH
+    ));
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        descriptors = new ArrayList<>();
+        descriptors.add(MODE);
+        descriptors.add(RECORD_READER);
+        descriptors.add(RECORD_WRITER);
+        descriptors.add(LATITUDE_RECORD_PATH);
+        descriptors.add(LONGITUDE_RECORD_PATH);
+        descriptors.add(GEOHASH_FORMAT);
+        descriptors.add(GEOHASH_LEVEL);
+        descriptors.add(GEOHASH_RECORD_PATH);
+        descriptors = Collections.unmodifiableList(descriptors);
+
+        relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) {
+        FlowFile input = session.get();
+        if ( input == null ) {
+            return;
+        }
+
+        final RecordReaderFactory readerFactory =  context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final boolean encode = context.getProperty(MODE).getValue().equalsIgnoreCase(ENCODE_MODE);
+        final String format = context.getProperty(GEOHASH_FORMAT).getValue();
+
+        FlowFile output = session.create(input);
+        try (InputStream is = session.read(input);
+             OutputStream os = session.write(output)) {
+            RecordPathCache cache = new RecordPathCache(RECORD_PATH_PROPERTIES.size() + 1);
+            Map<PropertyDescriptor, RecordPath> paths = new HashMap<>();
+
+            for (PropertyDescriptor descriptor : RECORD_PATH_PROPERTIES) {
+                String rawRecordPath = context.getProperty(descriptor).evaluateAttributeExpressions(input).getValue();
+                RecordPath compiled = cache.getCompiled(rawRecordPath);
+                paths.put(descriptor, compiled);
+            }
+
+            RecordReader reader = readerFactory.createRecordReader(input, is, getLogger());
+            RecordSchema schema = writerFactory.getSchema(input.getAttributes(), reader.getSchema());
+            RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, os, output);
+
+            Record record;
+            Relationship targetRelationship = REL_FAILURE;
+            writer.beginRecordSet();
+
+            int recordsCount = 0;
+            boolean updated = false;
+            while ((record = reader.nextRecord()) != null) {
+                if(encode) {
+                    final int level = Integer.parseInt(context.getProperty(GEOHASH_LEVEL).getValue());
+                    final String rawLatitudePath = context.getProperty(LATITUDE_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+                    RecordPath latitudePath = cache.getCompiled(rawLatitudePath);
+                    final String rawLongitudePath = context.getProperty(LONGITUDE_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+                    RecordPath longitudePath = cache.getCompiled(rawLongitudePath);
+
+                    Object encodedGeohash = getEncodedGeohash(latitudePath, longitudePath, record, format, level);
+
+                    updated = updateRecord(GEOHASH_RECORD_PATH, encodedGeohash, record, paths);
+                }else {
+                    final String rawgeohashPath = context.getProperty(GEOHASH_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+                    RecordPath geohashPath = cache.getCompiled(rawgeohashPath);
+
+                    WGS84Point decodedPoint = getDecodedPointFromGeohash(geohashPath, record, format);
+
+                    if(decodedPoint != null) {
+                        updated = updateRecord(LATITUDE_RECORD_PATH, String.valueOf(decodedPoint.getLatitude()), record, paths)
+                                &&  updateRecord(LONGITUDE_RECORD_PATH, String.valueOf(decodedPoint.getLongitude()), record, paths);
+                    }
+                }
+
+                if (updated) {
+                    targetRelationship = REL_SUCCESS;
+                }
+
+                writer.write(record);
+                recordsCount++;
+
+            }
+            writer.finishRecordSet();
+            writer.close();
+
+            is.close();
+            os.close();
+
+            output = session.putAllAttributes(output, buildAttributes(recordsCount, writer.getMimeType()));
+
+            session.transfer(output, targetRelationship);
+            session.remove(input);
+
+            session.getProvenanceReporter().modifyContent(output);
+        } catch (Exception ex) {
+            getLogger().error("Failed to {}; will route to failure", new Object[]{encode ? "encode" : "decode" ,ex});
+            session.rollback();
+            context.yield();
+        }
+    }
+
+    private Object getEncodedGeohash(RecordPath latitudePath, RecordPath longitudePath, Record record, String format, int level) {
+        RecordPathResult latitudeResult = latitudePath.evaluate(record);
+        RecordPathResult longitudeResult = longitudePath.evaluate(record);
+        Optional<FieldValue> latitudeField = latitudeResult.getSelectedFields().findFirst();
+        Optional<FieldValue> longitudeField = longitudeResult.getSelectedFields().findFirst();
+
+        if (latitudeField.isPresent() && longitudeField.isPresent()) {
+            FieldValue latitudeValue = latitudeField.get();
+            FieldValue longitudeValue = longitudeField.get();
+            Object latitudeVal = latitudeValue.getValue();
+            Object longitudeVal = longitudeValue.getValue();
+
+            if (latitudeVal == null || longitudeVal == null) {
+                return null;
+            }
+
+            double realLatValue = Double.parseDouble(latitudeVal.toString());
+            double realLongValue = Double.parseDouble(longitudeVal.toString());
+            GeoHash gh = GeoHash.withCharacterPrecision(realLatValue, realLongValue, level);
+
+            switch (format) {
+                case BINARY_STRING:
+                    return gh.toBinaryString();
+                case LONG_INT:
+                    return gh.longValue();
+                default:
+                    return gh.toBase32();
+            }
+        } else {
+            return null;
+        }
+    }
+
+    private WGS84Point getDecodedPointFromGeohash(RecordPath geohashPath, Record record, String format) {
+        RecordPathResult geohashResult = geohashPath.evaluate(record);
+        Optional<FieldValue> geohashField = geohashResult.getSelectedFields().findFirst();
+        if (geohashField.isPresent()) {
+            FieldValue geohashFieldValue = geohashField.get();
+            Object geohashVal = geohashFieldValue.getValue();
+            if(geohashVal == null) {
+                return null;
+            }
+
+            String geohashString = geohashVal.toString();
+            GeoHash decodedHash;
+
+            switch (format) {
+                case BINARY_STRING:
+                    decodedHash = GeoHash.fromBinaryString(geohashString);
+                    break;
+                case LONG_INT:
+                    String binaryString = Long.toBinaryString(Long.parseLong(geohashString));
+                    decodedHash = GeoHash.fromBinaryString(binaryString);
+                    break;
+                default:
+                    decodedHash = GeoHash.fromGeohashString(geohashString);
+            }
+
+            return decodedHash.getBoundingBoxCenter();
+
+        }else {
+            return null;
+        }
+    }
+
+    private boolean updateRecord(PropertyDescriptor descriptor, Object newValue, Record record, Map<PropertyDescriptor, RecordPath> cached) {
+        if (!cached.containsKey(descriptor) || newValue == null) {
+            return false;
+        }
+        RecordPath path = cached.get(descriptor);
+        RecordPathResult result = path.evaluate(record);
+        FieldValue fieldValue = result.getSelectedFields().findFirst().get();
+
+        if (fieldValue.getParent().get().getValue() == null) {

Review comment:
       If `fieldValue` were to point to the root record (i.e., user configured it as `/`) then `getParent()` would return an empty optional and the call here to `get()` will throw `NoSuchElementException`

##########
File path: nifi-nar-bundles/nifi-geohash-bundle/nifi-geohash-processors/src/main/java/org/apache/nifi/processors/geohash/GeohashRecord.java
##########
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.geohash;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import ch.hsr.geohash.GeoHash;
+import ch.hsr.geohash.WGS84Point;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Optional;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"geo", "geohash", "record"})
+@CapabilityDescription("A record-based processor that encodes and decodes Geohashes from and to latitude/longitude coordinates.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public class GeohashRecord extends AbstractProcessor {
+    public static final String ENCODE_MODE = "Encode";
+    public static final String DECODE_MODE = "Decode";
+
+    public static final String BINARY_STRING = "BinaryString";
+    public static final String BASE32 = "Base32";
+    public static final String LONG_INT = "LongInt";
+
+    public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
+            .name("mode")
+            .displayName("Mode")
+            .description("Specifies whether to encode latitude/longitude to geohash or decode geohash to latitude/longitude")
+            .required(true)
+            .allowableValues(ENCODE_MODE, DECODE_MODE)
+            .defaultValue(ENCODE_MODE)
+            .build();
+
+    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the record reader service to use for reading incoming data")
+            .required(true)
+            .identifiesControllerService(RecordReaderFactory.class)
+            .build();
+
+    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("Specifies the record writer service to use for writing data")
+            .required(true)
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .build();
+
+    public static final PropertyDescriptor LATITUDE_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("latitude-record-path")
+            .displayName("Latitude Record Path")
+            .description("In the encode mode, this property specifies the record path to retrieve the latitude value;"
+                    + "in the decode mode, this property specifies the record path to put the latitude value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)

Review comment:
       Should use `new RecordPathValidator()` here

##########
File path: nifi-nar-bundles/nifi-geohash-bundle/nifi-geohash-processors/src/main/java/org/apache/nifi/processors/geohash/GeohashRecord.java
##########
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.geohash;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import ch.hsr.geohash.GeoHash;
+import ch.hsr.geohash.WGS84Point;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Optional;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"geo", "geohash", "record"})
+@CapabilityDescription("A record-based processor that encodes and decodes Geohashes from and to latitude/longitude coordinates.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public class GeohashRecord extends AbstractProcessor {
+    public static final String ENCODE_MODE = "Encode";
+    public static final String DECODE_MODE = "Decode";
+
+    public static final String BINARY_STRING = "BinaryString";
+    public static final String BASE32 = "Base32";
+    public static final String LONG_INT = "LongInt";
+
+    public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
+            .name("mode")
+            .displayName("Mode")
+            .description("Specifies whether to encode latitude/longitude to geohash or decode geohash to latitude/longitude")
+            .required(true)
+            .allowableValues(ENCODE_MODE, DECODE_MODE)
+            .defaultValue(ENCODE_MODE)
+            .build();
+
+    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the record reader service to use for reading incoming data")
+            .required(true)
+            .identifiesControllerService(RecordReaderFactory.class)
+            .build();
+
+    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("Specifies the record writer service to use for writing data")
+            .required(true)
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .build();
+
+    public static final PropertyDescriptor LATITUDE_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("latitude-record-path")
+            .displayName("Latitude Record Path")
+            .description("In the encode mode, this property specifies the record path to retrieve the latitude value;"

Review comment:
       should add a space after the semicolon :)

##########
File path: nifi-nar-bundles/nifi-geohash-bundle/nifi-geohash-processors/src/main/java/org/apache/nifi/processors/geohash/GeohashRecord.java
##########
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.geohash;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import ch.hsr.geohash.GeoHash;
+import ch.hsr.geohash.WGS84Point;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Optional;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"geo", "geohash", "record"})
+@CapabilityDescription("A record-based processor that encodes and decodes Geohashes from and to latitude/longitude coordinates.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public class GeohashRecord extends AbstractProcessor {
+    public static final String ENCODE_MODE = "Encode";
+    public static final String DECODE_MODE = "Decode";
+
+    public static final String BINARY_STRING = "BinaryString";
+    public static final String BASE32 = "Base32";
+    public static final String LONG_INT = "LongInt";
+
+    public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
+            .name("mode")
+            .displayName("Mode")
+            .description("Specifies whether to encode latitude/longitude to geohash or decode geohash to latitude/longitude")
+            .required(true)
+            .allowableValues(ENCODE_MODE, DECODE_MODE)
+            .defaultValue(ENCODE_MODE)
+            .build();
+
+    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the record reader service to use for reading incoming data")
+            .required(true)
+            .identifiesControllerService(RecordReaderFactory.class)
+            .build();
+
+    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("Specifies the record writer service to use for writing data")
+            .required(true)
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .build();
+
+    public static final PropertyDescriptor LATITUDE_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("latitude-record-path")
+            .displayName("Latitude Record Path")
+            .description("In the encode mode, this property specifies the record path to retrieve the latitude value;"
+                    + "in the decode mode, this property specifies the record path to put the latitude value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor LONGITUDE_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("longitude-record-path")
+            .displayName("Longitude Record Path")
+            .description("In the encode mode, this property specifies the record path to retrieve the longitude value;"
+                    + "in the decode mode, this property specifies the record path to put the longitude value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_FORMAT = new PropertyDescriptor.Builder()
+            .name("geohash-format")
+            .displayName("Geohash Format")
+            .description("In the encode mode, this property specifies the desired format for encoding geohash;"
+                    + "in the decode mode, this property specifies the format of geohash provided")
+            .required(true)
+            .allowableValues(BASE32, BINARY_STRING, LONG_INT)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_LEVEL = new PropertyDescriptor.Builder()
+            .name("geohash-level")
+            .displayName("Geohash Level")
+            .description("The integer precision level(1-12) desired for encoding geohash")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .dependsOn(MODE, ENCODE_MODE)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("geohash-record-path")
+            .displayName("Geohash Record Path")
+            .description("In the encode mode, this property specifies the record path to put the geohash value;"
+                    + "in the decode mode, this property specifies the record path to retrieve the geohash value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Records that are successfully encoded or decoded will be routed to success")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("Records that cannot be encoded or decoded will be routed to failure")
+            .build();
+
+    public static final List<PropertyDescriptor> RECORD_PATH_PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            LATITUDE_RECORD_PATH,LONGITUDE_RECORD_PATH,GEOHASH_RECORD_PATH
+    ));
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        descriptors = new ArrayList<>();
+        descriptors.add(MODE);
+        descriptors.add(RECORD_READER);
+        descriptors.add(RECORD_WRITER);
+        descriptors.add(LATITUDE_RECORD_PATH);
+        descriptors.add(LONGITUDE_RECORD_PATH);
+        descriptors.add(GEOHASH_FORMAT);
+        descriptors.add(GEOHASH_LEVEL);
+        descriptors.add(GEOHASH_RECORD_PATH);
+        descriptors = Collections.unmodifiableList(descriptors);
+
+        relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) {
+        FlowFile input = session.get();
+        if ( input == null ) {
+            return;
+        }
+
+        final RecordReaderFactory readerFactory =  context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final boolean encode = context.getProperty(MODE).getValue().equalsIgnoreCase(ENCODE_MODE);
+        final String format = context.getProperty(GEOHASH_FORMAT).getValue();
+
+        FlowFile output = session.create(input);
+        try (InputStream is = session.read(input);
+             OutputStream os = session.write(output)) {
+            RecordPathCache cache = new RecordPathCache(RECORD_PATH_PROPERTIES.size() + 1);

Review comment:
       Should create the cache as a member variable, not an inline variable. This way the compilation of the RecordPath is cached across multiple invocations. Not sure that it's buying us anything as-is.

##########
File path: nifi-nar-bundles/nifi-geohash-bundle/nifi-geohash-processors/src/main/java/org/apache/nifi/processors/geohash/GeohashRecord.java
##########
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.geohash;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import ch.hsr.geohash.GeoHash;
+import ch.hsr.geohash.WGS84Point;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Optional;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"geo", "geohash", "record"})
+@CapabilityDescription("A record-based processor that encodes and decodes Geohashes from and to latitude/longitude coordinates.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public class GeohashRecord extends AbstractProcessor {
+    public static final String ENCODE_MODE = "Encode";
+    public static final String DECODE_MODE = "Decode";
+
+    public static final String BINARY_STRING = "BinaryString";
+    public static final String BASE32 = "Base32";
+    public static final String LONG_INT = "LongInt";
+
+    public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
+            .name("mode")
+            .displayName("Mode")
+            .description("Specifies whether to encode latitude/longitude to geohash or decode geohash to latitude/longitude")
+            .required(true)
+            .allowableValues(ENCODE_MODE, DECODE_MODE)
+            .defaultValue(ENCODE_MODE)
+            .build();
+
+    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the record reader service to use for reading incoming data")
+            .required(true)
+            .identifiesControllerService(RecordReaderFactory.class)
+            .build();
+
+    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("Specifies the record writer service to use for writing data")
+            .required(true)
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .build();
+
+    public static final PropertyDescriptor LATITUDE_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("latitude-record-path")
+            .displayName("Latitude Record Path")
+            .description("In the encode mode, this property specifies the record path to retrieve the latitude value;"
+                    + "in the decode mode, this property specifies the record path to put the latitude value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor LONGITUDE_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("longitude-record-path")
+            .displayName("Longitude Record Path")
+            .description("In the encode mode, this property specifies the record path to retrieve the longitude value;"
+                    + "in the decode mode, this property specifies the record path to put the longitude value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_FORMAT = new PropertyDescriptor.Builder()
+            .name("geohash-format")
+            .displayName("Geohash Format")
+            .description("In the encode mode, this property specifies the desired format for encoding geohash;"
+                    + "in the decode mode, this property specifies the format of geohash provided")
+            .required(true)
+            .allowableValues(BASE32, BINARY_STRING, LONG_INT)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_LEVEL = new PropertyDescriptor.Builder()
+            .name("geohash-level")
+            .displayName("Geohash Level")
+            .description("The integer precision level(1-12) desired for encoding geohash")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .dependsOn(MODE, ENCODE_MODE)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("geohash-record-path")
+            .displayName("Geohash Record Path")
+            .description("In the encode mode, this property specifies the record path to put the geohash value;"

Review comment:
       Missing space after semicolon

##########
File path: nifi-nar-bundles/nifi-geohash-bundle/nifi-geohash-processors/src/main/java/org/apache/nifi/processors/geohash/GeohashRecord.java
##########
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.geohash;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import ch.hsr.geohash.GeoHash;
+import ch.hsr.geohash.WGS84Point;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Optional;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"geo", "geohash", "record"})
+@CapabilityDescription("A record-based processor that encodes and decodes Geohashes from and to latitude/longitude coordinates.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public class GeohashRecord extends AbstractProcessor {
+    public static final String ENCODE_MODE = "Encode";
+    public static final String DECODE_MODE = "Decode";
+
+    public static final String BINARY_STRING = "BinaryString";
+    public static final String BASE32 = "Base32";
+    public static final String LONG_INT = "LongInt";
+
+    public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
+            .name("mode")
+            .displayName("Mode")
+            .description("Specifies whether to encode latitude/longitude to geohash or decode geohash to latitude/longitude")
+            .required(true)
+            .allowableValues(ENCODE_MODE, DECODE_MODE)
+            .defaultValue(ENCODE_MODE)
+            .build();
+
+    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the record reader service to use for reading incoming data")
+            .required(true)
+            .identifiesControllerService(RecordReaderFactory.class)
+            .build();
+
+    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("Specifies the record writer service to use for writing data")
+            .required(true)
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .build();
+
+    public static final PropertyDescriptor LATITUDE_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("latitude-record-path")
+            .displayName("Latitude Record Path")
+            .description("In the encode mode, this property specifies the record path to retrieve the latitude value;"
+                    + "in the decode mode, this property specifies the record path to put the latitude value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor LONGITUDE_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("longitude-record-path")
+            .displayName("Longitude Record Path")
+            .description("In the encode mode, this property specifies the record path to retrieve the longitude value;"
+                    + "in the decode mode, this property specifies the record path to put the longitude value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_FORMAT = new PropertyDescriptor.Builder()
+            .name("geohash-format")
+            .displayName("Geohash Format")
+            .description("In the encode mode, this property specifies the desired format for encoding geohash;"
+                    + "in the decode mode, this property specifies the format of geohash provided")
+            .required(true)
+            .allowableValues(BASE32, BINARY_STRING, LONG_INT)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)

Review comment:
       No need for a validator when using `allowableValues`

##########
File path: nifi-nar-bundles/nifi-geohash-bundle/nifi-geohash-processors/src/main/java/org/apache/nifi/processors/geohash/GeohashRecord.java
##########
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.geohash;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import ch.hsr.geohash.GeoHash;
+import ch.hsr.geohash.WGS84Point;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Optional;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"geo", "geohash", "record"})
+@CapabilityDescription("A record-based processor that encodes and decodes Geohashes from and to latitude/longitude coordinates.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public class GeohashRecord extends AbstractProcessor {
+    public static final String ENCODE_MODE = "Encode";
+    public static final String DECODE_MODE = "Decode";
+
+    public static final String BINARY_STRING = "BinaryString";
+    public static final String BASE32 = "Base32";
+    public static final String LONG_INT = "LongInt";
+
+    public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
+            .name("mode")
+            .displayName("Mode")
+            .description("Specifies whether to encode latitude/longitude to geohash or decode geohash to latitude/longitude")
+            .required(true)
+            .allowableValues(ENCODE_MODE, DECODE_MODE)
+            .defaultValue(ENCODE_MODE)
+            .build();
+
+    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the record reader service to use for reading incoming data")
+            .required(true)
+            .identifiesControllerService(RecordReaderFactory.class)
+            .build();
+
+    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("Specifies the record writer service to use for writing data")
+            .required(true)
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .build();
+
+    public static final PropertyDescriptor LATITUDE_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("latitude-record-path")
+            .displayName("Latitude Record Path")
+            .description("In the encode mode, this property specifies the record path to retrieve the latitude value;"
+                    + "in the decode mode, this property specifies the record path to put the latitude value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor LONGITUDE_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("longitude-record-path")
+            .displayName("Longitude Record Path")
+            .description("In the encode mode, this property specifies the record path to retrieve the longitude value;"
+                    + "in the decode mode, this property specifies the record path to put the longitude value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_FORMAT = new PropertyDescriptor.Builder()
+            .name("geohash-format")
+            .displayName("Geohash Format")
+            .description("In the encode mode, this property specifies the desired format for encoding geohash;"
+                    + "in the decode mode, this property specifies the format of geohash provided")
+            .required(true)
+            .allowableValues(BASE32, BINARY_STRING, LONG_INT)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_LEVEL = new PropertyDescriptor.Builder()
+            .name("geohash-level")
+            .displayName("Geohash Level")
+            .description("The integer precision level(1-12) desired for encoding geohash")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .dependsOn(MODE, ENCODE_MODE)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("geohash-record-path")
+            .displayName("Geohash Record Path")
+            .description("In the encode mode, this property specifies the record path to put the geohash value;"
+                    + "in the decode mode, this property specifies the record path to retrieve the geohash value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)

Review comment:
       `new RecordPathValidator()`

##########
File path: nifi-nar-bundles/nifi-geohash-bundle/nifi-geohash-processors/src/main/java/org/apache/nifi/processors/geohash/GeohashRecord.java
##########
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.geohash;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import ch.hsr.geohash.GeoHash;
+import ch.hsr.geohash.WGS84Point;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Optional;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"geo", "geohash", "record"})
+@CapabilityDescription("A record-based processor that encodes and decodes Geohashes from and to latitude/longitude coordinates.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public class GeohashRecord extends AbstractProcessor {
+    public static final String ENCODE_MODE = "Encode";
+    public static final String DECODE_MODE = "Decode";
+
+    public static final String BINARY_STRING = "BinaryString";
+    public static final String BASE32 = "Base32";
+    public static final String LONG_INT = "LongInt";
+
+    public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
+            .name("mode")
+            .displayName("Mode")
+            .description("Specifies whether to encode latitude/longitude to geohash or decode geohash to latitude/longitude")
+            .required(true)
+            .allowableValues(ENCODE_MODE, DECODE_MODE)
+            .defaultValue(ENCODE_MODE)
+            .build();
+
+    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the record reader service to use for reading incoming data")
+            .required(true)
+            .identifiesControllerService(RecordReaderFactory.class)
+            .build();
+
+    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("Specifies the record writer service to use for writing data")
+            .required(true)
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .build();
+
+    public static final PropertyDescriptor LATITUDE_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("latitude-record-path")
+            .displayName("Latitude Record Path")
+            .description("In the encode mode, this property specifies the record path to retrieve the latitude value;"
+                    + "in the decode mode, this property specifies the record path to put the latitude value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor LONGITUDE_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("longitude-record-path")
+            .displayName("Longitude Record Path")
+            .description("In the encode mode, this property specifies the record path to retrieve the longitude value;"
+                    + "in the decode mode, this property specifies the record path to put the longitude value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_FORMAT = new PropertyDescriptor.Builder()
+            .name("geohash-format")
+            .displayName("Geohash Format")
+            .description("In the encode mode, this property specifies the desired format for encoding geohash;"
+                    + "in the decode mode, this property specifies the format of geohash provided")
+            .required(true)
+            .allowableValues(BASE32, BINARY_STRING, LONG_INT)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_LEVEL = new PropertyDescriptor.Builder()
+            .name("geohash-level")
+            .displayName("Geohash Level")
+            .description("The integer precision level(1-12) desired for encoding geohash")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)

Review comment:
       Should either use `StandardValidators.createLongValidator(1, 12, true)` or use `.allowableValues( "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12")` -- in this case I don't have a strong preference for which one is better. Might also make sense to allow for Expression Language (EL) to be used here, in which case the `createLongValidator` is probably better because there's no way to use EL when using allowable values

##########
File path: nifi-nar-bundles/nifi-geohash-bundle/nifi-geohash-processors/src/main/java/org/apache/nifi/processors/geohash/GeohashRecord.java
##########
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.geohash;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import ch.hsr.geohash.GeoHash;
+import ch.hsr.geohash.WGS84Point;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Optional;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"geo", "geohash", "record"})
+@CapabilityDescription("A record-based processor that encodes and decodes Geohashes from and to latitude/longitude coordinates.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public class GeohashRecord extends AbstractProcessor {
+    public static final String ENCODE_MODE = "Encode";
+    public static final String DECODE_MODE = "Decode";
+
+    public static final String BINARY_STRING = "BinaryString";
+    public static final String BASE32 = "Base32";
+    public static final String LONG_INT = "LongInt";
+
+    public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
+            .name("mode")
+            .displayName("Mode")
+            .description("Specifies whether to encode latitude/longitude to geohash or decode geohash to latitude/longitude")
+            .required(true)
+            .allowableValues(ENCODE_MODE, DECODE_MODE)
+            .defaultValue(ENCODE_MODE)
+            .build();
+
+    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the record reader service to use for reading incoming data")
+            .required(true)
+            .identifiesControllerService(RecordReaderFactory.class)
+            .build();
+
+    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("Specifies the record writer service to use for writing data")
+            .required(true)
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .build();
+
+    public static final PropertyDescriptor LATITUDE_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("latitude-record-path")
+            .displayName("Latitude Record Path")
+            .description("In the encode mode, this property specifies the record path to retrieve the latitude value;"
+                    + "in the decode mode, this property specifies the record path to put the latitude value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor LONGITUDE_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("longitude-record-path")
+            .displayName("Longitude Record Path")
+            .description("In the encode mode, this property specifies the record path to retrieve the longitude value;"
+                    + "in the decode mode, this property specifies the record path to put the longitude value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_FORMAT = new PropertyDescriptor.Builder()
+            .name("geohash-format")
+            .displayName("Geohash Format")
+            .description("In the encode mode, this property specifies the desired format for encoding geohash;"
+                    + "in the decode mode, this property specifies the format of geohash provided")
+            .required(true)
+            .allowableValues(BASE32, BINARY_STRING, LONG_INT)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_LEVEL = new PropertyDescriptor.Builder()
+            .name("geohash-level")
+            .displayName("Geohash Level")
+            .description("The integer precision level(1-12) desired for encoding geohash")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .dependsOn(MODE, ENCODE_MODE)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("geohash-record-path")
+            .displayName("Geohash Record Path")
+            .description("In the encode mode, this property specifies the record path to put the geohash value;"
+                    + "in the decode mode, this property specifies the record path to retrieve the geohash value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Records that are successfully encoded or decoded will be routed to success")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("Records that cannot be encoded or decoded will be routed to failure")
+            .build();
+
+    public static final List<PropertyDescriptor> RECORD_PATH_PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            LATITUDE_RECORD_PATH,LONGITUDE_RECORD_PATH,GEOHASH_RECORD_PATH
+    ));
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        descriptors = new ArrayList<>();
+        descriptors.add(MODE);
+        descriptors.add(RECORD_READER);
+        descriptors.add(RECORD_WRITER);
+        descriptors.add(LATITUDE_RECORD_PATH);
+        descriptors.add(LONGITUDE_RECORD_PATH);
+        descriptors.add(GEOHASH_FORMAT);
+        descriptors.add(GEOHASH_LEVEL);
+        descriptors.add(GEOHASH_RECORD_PATH);
+        descriptors = Collections.unmodifiableList(descriptors);
+
+        relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) {
+        FlowFile input = session.get();
+        if ( input == null ) {
+            return;
+        }
+
+        final RecordReaderFactory readerFactory =  context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final boolean encode = context.getProperty(MODE).getValue().equalsIgnoreCase(ENCODE_MODE);
+        final String format = context.getProperty(GEOHASH_FORMAT).getValue();
+
+        FlowFile output = session.create(input);
+        try (InputStream is = session.read(input);
+             OutputStream os = session.write(output)) {
+            RecordPathCache cache = new RecordPathCache(RECORD_PATH_PROPERTIES.size() + 1);
+            Map<PropertyDescriptor, RecordPath> paths = new HashMap<>();
+
+            for (PropertyDescriptor descriptor : RECORD_PATH_PROPERTIES) {
+                String rawRecordPath = context.getProperty(descriptor).evaluateAttributeExpressions(input).getValue();
+                RecordPath compiled = cache.getCompiled(rawRecordPath);
+                paths.put(descriptor, compiled);
+            }
+
+            RecordReader reader = readerFactory.createRecordReader(input, is, getLogger());
+            RecordSchema schema = writerFactory.getSchema(input.getAttributes(), reader.getSchema());
+            RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, os, output);

Review comment:
       RecordReader and RecordWriter should be created with a try-with-resources. Need to make sure that we close these.

##########
File path: nifi-nar-bundles/nifi-geohash-bundle/nifi-geohash-processors/src/main/java/org/apache/nifi/processors/geohash/GeohashRecord.java
##########
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.geohash;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import ch.hsr.geohash.GeoHash;
+import ch.hsr.geohash.WGS84Point;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Optional;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"geo", "geohash", "record"})
+@CapabilityDescription("A record-based processor that encodes and decodes Geohashes from and to latitude/longitude coordinates.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public class GeohashRecord extends AbstractProcessor {
+    public static final String ENCODE_MODE = "Encode";
+    public static final String DECODE_MODE = "Decode";
+
+    public static final String BINARY_STRING = "BinaryString";
+    public static final String BASE32 = "Base32";
+    public static final String LONG_INT = "LongInt";
+
+    public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
+            .name("mode")
+            .displayName("Mode")
+            .description("Specifies whether to encode latitude/longitude to geohash or decode geohash to latitude/longitude")
+            .required(true)
+            .allowableValues(ENCODE_MODE, DECODE_MODE)
+            .defaultValue(ENCODE_MODE)
+            .build();
+
+    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the record reader service to use for reading incoming data")
+            .required(true)
+            .identifiesControllerService(RecordReaderFactory.class)
+            .build();
+
+    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("Specifies the record writer service to use for writing data")
+            .required(true)
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .build();
+
+    public static final PropertyDescriptor LATITUDE_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("latitude-record-path")
+            .displayName("Latitude Record Path")
+            .description("In the encode mode, this property specifies the record path to retrieve the latitude value;"
+                    + "in the decode mode, this property specifies the record path to put the latitude value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor LONGITUDE_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("longitude-record-path")
+            .displayName("Longitude Record Path")
+            .description("In the encode mode, this property specifies the record path to retrieve the longitude value;"
+                    + "in the decode mode, this property specifies the record path to put the longitude value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_FORMAT = new PropertyDescriptor.Builder()
+            .name("geohash-format")
+            .displayName("Geohash Format")
+            .description("In the encode mode, this property specifies the desired format for encoding geohash;"
+                    + "in the decode mode, this property specifies the format of geohash provided")
+            .required(true)
+            .allowableValues(BASE32, BINARY_STRING, LONG_INT)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_LEVEL = new PropertyDescriptor.Builder()
+            .name("geohash-level")
+            .displayName("Geohash Level")
+            .description("The integer precision level(1-12) desired for encoding geohash")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .dependsOn(MODE, ENCODE_MODE)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("geohash-record-path")
+            .displayName("Geohash Record Path")
+            .description("In the encode mode, this property specifies the record path to put the geohash value;"
+                    + "in the decode mode, this property specifies the record path to retrieve the geohash value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Records that are successfully encoded or decoded will be routed to success")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("Records that cannot be encoded or decoded will be routed to failure")
+            .build();
+
+    public static final List<PropertyDescriptor> RECORD_PATH_PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            LATITUDE_RECORD_PATH,LONGITUDE_RECORD_PATH,GEOHASH_RECORD_PATH
+    ));
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        descriptors = new ArrayList<>();
+        descriptors.add(MODE);
+        descriptors.add(RECORD_READER);
+        descriptors.add(RECORD_WRITER);
+        descriptors.add(LATITUDE_RECORD_PATH);
+        descriptors.add(LONGITUDE_RECORD_PATH);
+        descriptors.add(GEOHASH_FORMAT);
+        descriptors.add(GEOHASH_LEVEL);
+        descriptors.add(GEOHASH_RECORD_PATH);
+        descriptors = Collections.unmodifiableList(descriptors);
+
+        relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) {
+        FlowFile input = session.get();
+        if ( input == null ) {
+            return;
+        }
+
+        final RecordReaderFactory readerFactory =  context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final boolean encode = context.getProperty(MODE).getValue().equalsIgnoreCase(ENCODE_MODE);
+        final String format = context.getProperty(GEOHASH_FORMAT).getValue();
+
+        FlowFile output = session.create(input);
+        try (InputStream is = session.read(input);
+             OutputStream os = session.write(output)) {
+            RecordPathCache cache = new RecordPathCache(RECORD_PATH_PROPERTIES.size() + 1);
+            Map<PropertyDescriptor, RecordPath> paths = new HashMap<>();
+
+            for (PropertyDescriptor descriptor : RECORD_PATH_PROPERTIES) {
+                String rawRecordPath = context.getProperty(descriptor).evaluateAttributeExpressions(input).getValue();
+                RecordPath compiled = cache.getCompiled(rawRecordPath);
+                paths.put(descriptor, compiled);
+            }
+
+            RecordReader reader = readerFactory.createRecordReader(input, is, getLogger());
+            RecordSchema schema = writerFactory.getSchema(input.getAttributes(), reader.getSchema());
+            RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, os, output);
+
+            Record record;
+            Relationship targetRelationship = REL_FAILURE;
+            writer.beginRecordSet();
+
+            int recordsCount = 0;
+            boolean updated = false;
+            while ((record = reader.nextRecord()) != null) {
+                if(encode) {
+                    final int level = Integer.parseInt(context.getProperty(GEOHASH_LEVEL).getValue());
+                    final String rawLatitudePath = context.getProperty(LATITUDE_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+                    RecordPath latitudePath = cache.getCompiled(rawLatitudePath);
+                    final String rawLongitudePath = context.getProperty(LONGITUDE_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+                    RecordPath longitudePath = cache.getCompiled(rawLongitudePath);

Review comment:
       All of these values will be the same for every Record, so they should be calculated only once, outside of the loop

##########
File path: nifi-nar-bundles/nifi-geohash-bundle/nifi-geohash-processors/src/main/java/org/apache/nifi/processors/geohash/GeohashRecord.java
##########
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.geohash;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import ch.hsr.geohash.GeoHash;
+import ch.hsr.geohash.WGS84Point;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Optional;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"geo", "geohash", "record"})
+@CapabilityDescription("A record-based processor that encodes and decodes Geohashes from and to latitude/longitude coordinates.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public class GeohashRecord extends AbstractProcessor {
+    public static final String ENCODE_MODE = "Encode";
+    public static final String DECODE_MODE = "Decode";
+
+    public static final String BINARY_STRING = "BinaryString";
+    public static final String BASE32 = "Base32";
+    public static final String LONG_INT = "LongInt";
+
+    public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
+            .name("mode")
+            .displayName("Mode")
+            .description("Specifies whether to encode latitude/longitude to geohash or decode geohash to latitude/longitude")
+            .required(true)
+            .allowableValues(ENCODE_MODE, DECODE_MODE)
+            .defaultValue(ENCODE_MODE)
+            .build();
+
+    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the record reader service to use for reading incoming data")
+            .required(true)
+            .identifiesControllerService(RecordReaderFactory.class)
+            .build();
+
+    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("Specifies the record writer service to use for writing data")
+            .required(true)
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .build();
+
+    public static final PropertyDescriptor LATITUDE_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("latitude-record-path")
+            .displayName("Latitude Record Path")
+            .description("In the encode mode, this property specifies the record path to retrieve the latitude value;"
+                    + "in the decode mode, this property specifies the record path to put the latitude value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor LONGITUDE_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("longitude-record-path")
+            .displayName("Longitude Record Path")
+            .description("In the encode mode, this property specifies the record path to retrieve the longitude value;"
+                    + "in the decode mode, this property specifies the record path to put the longitude value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_FORMAT = new PropertyDescriptor.Builder()
+            .name("geohash-format")
+            .displayName("Geohash Format")
+            .description("In the encode mode, this property specifies the desired format for encoding geohash;"
+                    + "in the decode mode, this property specifies the format of geohash provided")
+            .required(true)
+            .allowableValues(BASE32, BINARY_STRING, LONG_INT)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_LEVEL = new PropertyDescriptor.Builder()
+            .name("geohash-level")
+            .displayName("Geohash Level")
+            .description("The integer precision level(1-12) desired for encoding geohash")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .dependsOn(MODE, ENCODE_MODE)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("geohash-record-path")
+            .displayName("Geohash Record Path")
+            .description("In the encode mode, this property specifies the record path to put the geohash value;"
+                    + "in the decode mode, this property specifies the record path to retrieve the geohash value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Records that are successfully encoded or decoded will be routed to success")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("Records that cannot be encoded or decoded will be routed to failure")
+            .build();
+
+    public static final List<PropertyDescriptor> RECORD_PATH_PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            LATITUDE_RECORD_PATH,LONGITUDE_RECORD_PATH,GEOHASH_RECORD_PATH
+    ));
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        descriptors = new ArrayList<>();
+        descriptors.add(MODE);
+        descriptors.add(RECORD_READER);
+        descriptors.add(RECORD_WRITER);
+        descriptors.add(LATITUDE_RECORD_PATH);
+        descriptors.add(LONGITUDE_RECORD_PATH);
+        descriptors.add(GEOHASH_FORMAT);
+        descriptors.add(GEOHASH_LEVEL);
+        descriptors.add(GEOHASH_RECORD_PATH);
+        descriptors = Collections.unmodifiableList(descriptors);
+
+        relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) {
+        FlowFile input = session.get();
+        if ( input == null ) {
+            return;
+        }
+
+        final RecordReaderFactory readerFactory =  context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final boolean encode = context.getProperty(MODE).getValue().equalsIgnoreCase(ENCODE_MODE);
+        final String format = context.getProperty(GEOHASH_FORMAT).getValue();
+
+        FlowFile output = session.create(input);
+        try (InputStream is = session.read(input);
+             OutputStream os = session.write(output)) {
+            RecordPathCache cache = new RecordPathCache(RECORD_PATH_PROPERTIES.size() + 1);
+            Map<PropertyDescriptor, RecordPath> paths = new HashMap<>();
+
+            for (PropertyDescriptor descriptor : RECORD_PATH_PROPERTIES) {
+                String rawRecordPath = context.getProperty(descriptor).evaluateAttributeExpressions(input).getValue();
+                RecordPath compiled = cache.getCompiled(rawRecordPath);
+                paths.put(descriptor, compiled);
+            }
+
+            RecordReader reader = readerFactory.createRecordReader(input, is, getLogger());
+            RecordSchema schema = writerFactory.getSchema(input.getAttributes(), reader.getSchema());
+            RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, os, output);
+
+            Record record;
+            Relationship targetRelationship = REL_FAILURE;
+            writer.beginRecordSet();
+
+            int recordsCount = 0;
+            boolean updated = false;
+            while ((record = reader.nextRecord()) != null) {
+                if(encode) {
+                    final int level = Integer.parseInt(context.getProperty(GEOHASH_LEVEL).getValue());
+                    final String rawLatitudePath = context.getProperty(LATITUDE_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+                    RecordPath latitudePath = cache.getCompiled(rawLatitudePath);
+                    final String rawLongitudePath = context.getProperty(LONGITUDE_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+                    RecordPath longitudePath = cache.getCompiled(rawLongitudePath);
+
+                    Object encodedGeohash = getEncodedGeohash(latitudePath, longitudePath, record, format, level);
+
+                    updated = updateRecord(GEOHASH_RECORD_PATH, encodedGeohash, record, paths);
+                }else {
+                    final String rawgeohashPath = context.getProperty(GEOHASH_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+                    RecordPath geohashPath = cache.getCompiled(rawgeohashPath);
+
+                    WGS84Point decodedPoint = getDecodedPointFromGeohash(geohashPath, record, format);
+
+                    if(decodedPoint != null) {
+                        updated = updateRecord(LATITUDE_RECORD_PATH, String.valueOf(decodedPoint.getLatitude()), record, paths)
+                                &&  updateRecord(LONGITUDE_RECORD_PATH, String.valueOf(decodedPoint.getLongitude()), record, paths);
+                    }
+                }
+
+                if (updated) {
+                    targetRelationship = REL_SUCCESS;
+                }
+
+                writer.write(record);
+                recordsCount++;
+
+            }
+            writer.finishRecordSet();
+            writer.close();
+
+            is.close();
+            os.close();
+
+            output = session.putAllAttributes(output, buildAttributes(recordsCount, writer.getMimeType()));
+
+            session.transfer(output, targetRelationship);
+            session.remove(input);
+
+            session.getProvenanceReporter().modifyContent(output);
+        } catch (Exception ex) {
+            getLogger().error("Failed to {}; will route to failure", new Object[]{encode ? "encode" : "decode" ,ex});
+            session.rollback();
+            context.yield();
+        }
+    }
+
+    private Object getEncodedGeohash(RecordPath latitudePath, RecordPath longitudePath, Record record, String format, int level) {
+        RecordPathResult latitudeResult = latitudePath.evaluate(record);
+        RecordPathResult longitudeResult = longitudePath.evaluate(record);
+        Optional<FieldValue> latitudeField = latitudeResult.getSelectedFields().findFirst();
+        Optional<FieldValue> longitudeField = longitudeResult.getSelectedFields().findFirst();
+
+        if (latitudeField.isPresent() && longitudeField.isPresent()) {
+            FieldValue latitudeValue = latitudeField.get();
+            FieldValue longitudeValue = longitudeField.get();
+            Object latitudeVal = latitudeValue.getValue();
+            Object longitudeVal = longitudeValue.getValue();
+
+            if (latitudeVal == null || longitudeVal == null) {
+                return null;
+            }
+
+            double realLatValue = Double.parseDouble(latitudeVal.toString());
+            double realLongValue = Double.parseDouble(longitudeVal.toString());
+            GeoHash gh = GeoHash.withCharacterPrecision(realLatValue, realLongValue, level);
+
+            switch (format) {
+                case BINARY_STRING:
+                    return gh.toBinaryString();
+                case LONG_INT:
+                    return gh.longValue();
+                default:
+                    return gh.toBase32();
+            }
+        } else {
+            return null;
+        }
+    }
+
+    private WGS84Point getDecodedPointFromGeohash(RecordPath geohashPath, Record record, String format) {
+        RecordPathResult geohashResult = geohashPath.evaluate(record);
+        Optional<FieldValue> geohashField = geohashResult.getSelectedFields().findFirst();
+        if (geohashField.isPresent()) {
+            FieldValue geohashFieldValue = geohashField.get();
+            Object geohashVal = geohashFieldValue.getValue();
+            if(geohashVal == null) {
+                return null;
+            }
+
+            String geohashString = geohashVal.toString();
+            GeoHash decodedHash;
+
+            switch (format) {
+                case BINARY_STRING:
+                    decodedHash = GeoHash.fromBinaryString(geohashString);
+                    break;
+                case LONG_INT:
+                    String binaryString = Long.toBinaryString(Long.parseLong(geohashString));
+                    decodedHash = GeoHash.fromBinaryString(binaryString);
+                    break;
+                default:
+                    decodedHash = GeoHash.fromGeohashString(geohashString);
+            }
+
+            return decodedHash.getBoundingBoxCenter();
+
+        }else {
+            return null;
+        }
+    }
+
+    private boolean updateRecord(PropertyDescriptor descriptor, Object newValue, Record record, Map<PropertyDescriptor, RecordPath> cached) {
+        if (!cached.containsKey(descriptor) || newValue == null) {
+            return false;
+        }
+        RecordPath path = cached.get(descriptor);
+        RecordPathResult result = path.evaluate(record);
+        FieldValue fieldValue = result.getSelectedFields().findFirst().get();

Review comment:
       `result.getSelectedFields().findFirst()` may return an empty Optional. Probably best to do:
   ```
   final Optional<FieldValue> fieldValueOptional = result.getSelectedFields().findFirst();
   if (fieldValueOption.isEmpty()) {
     return false;
   }
   
   final FieldValue fieldValue = fieldValueOptional.get();
   ```

##########
File path: nifi-nar-bundles/nifi-geohash-bundle/nifi-geohash-processors/src/main/java/org/apache/nifi/processors/geohash/GeohashRecord.java
##########
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.geohash;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import ch.hsr.geohash.GeoHash;
+import ch.hsr.geohash.WGS84Point;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Optional;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"geo", "geohash", "record"})
+@CapabilityDescription("A record-based processor that encodes and decodes Geohashes from and to latitude/longitude coordinates.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public class GeohashRecord extends AbstractProcessor {
+    public static final String ENCODE_MODE = "Encode";
+    public static final String DECODE_MODE = "Decode";
+
+    public static final String BINARY_STRING = "BinaryString";
+    public static final String BASE32 = "Base32";
+    public static final String LONG_INT = "LongInt";
+
+    public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
+            .name("mode")
+            .displayName("Mode")
+            .description("Specifies whether to encode latitude/longitude to geohash or decode geohash to latitude/longitude")
+            .required(true)
+            .allowableValues(ENCODE_MODE, DECODE_MODE)
+            .defaultValue(ENCODE_MODE)
+            .build();
+
+    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the record reader service to use for reading incoming data")
+            .required(true)
+            .identifiesControllerService(RecordReaderFactory.class)
+            .build();
+
+    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("Specifies the record writer service to use for writing data")
+            .required(true)
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .build();
+
+    public static final PropertyDescriptor LATITUDE_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("latitude-record-path")
+            .displayName("Latitude Record Path")
+            .description("In the encode mode, this property specifies the record path to retrieve the latitude value;"
+                    + "in the decode mode, this property specifies the record path to put the latitude value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor LONGITUDE_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("longitude-record-path")
+            .displayName("Longitude Record Path")
+            .description("In the encode mode, this property specifies the record path to retrieve the longitude value;"
+                    + "in the decode mode, this property specifies the record path to put the longitude value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_FORMAT = new PropertyDescriptor.Builder()
+            .name("geohash-format")
+            .displayName("Geohash Format")
+            .description("In the encode mode, this property specifies the desired format for encoding geohash;"
+                    + "in the decode mode, this property specifies the format of geohash provided")
+            .required(true)
+            .allowableValues(BASE32, BINARY_STRING, LONG_INT)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_LEVEL = new PropertyDescriptor.Builder()
+            .name("geohash-level")
+            .displayName("Geohash Level")
+            .description("The integer precision level(1-12) desired for encoding geohash")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .dependsOn(MODE, ENCODE_MODE)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("geohash-record-path")
+            .displayName("Geohash Record Path")
+            .description("In the encode mode, this property specifies the record path to put the geohash value;"
+                    + "in the decode mode, this property specifies the record path to retrieve the geohash value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Records that are successfully encoded or decoded will be routed to success")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("Records that cannot be encoded or decoded will be routed to failure")
+            .build();
+
+    public static final List<PropertyDescriptor> RECORD_PATH_PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            LATITUDE_RECORD_PATH,LONGITUDE_RECORD_PATH,GEOHASH_RECORD_PATH
+    ));
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        descriptors = new ArrayList<>();
+        descriptors.add(MODE);
+        descriptors.add(RECORD_READER);
+        descriptors.add(RECORD_WRITER);
+        descriptors.add(LATITUDE_RECORD_PATH);
+        descriptors.add(LONGITUDE_RECORD_PATH);
+        descriptors.add(GEOHASH_FORMAT);
+        descriptors.add(GEOHASH_LEVEL);
+        descriptors.add(GEOHASH_RECORD_PATH);
+        descriptors = Collections.unmodifiableList(descriptors);
+
+        relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) {
+        FlowFile input = session.get();
+        if ( input == null ) {
+            return;
+        }
+
+        final RecordReaderFactory readerFactory =  context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final boolean encode = context.getProperty(MODE).getValue().equalsIgnoreCase(ENCODE_MODE);
+        final String format = context.getProperty(GEOHASH_FORMAT).getValue();
+
+        FlowFile output = session.create(input);
+        try (InputStream is = session.read(input);
+             OutputStream os = session.write(output)) {
+            RecordPathCache cache = new RecordPathCache(RECORD_PATH_PROPERTIES.size() + 1);
+            Map<PropertyDescriptor, RecordPath> paths = new HashMap<>();
+
+            for (PropertyDescriptor descriptor : RECORD_PATH_PROPERTIES) {
+                String rawRecordPath = context.getProperty(descriptor).evaluateAttributeExpressions(input).getValue();
+                RecordPath compiled = cache.getCompiled(rawRecordPath);
+                paths.put(descriptor, compiled);
+            }
+
+            RecordReader reader = readerFactory.createRecordReader(input, is, getLogger());
+            RecordSchema schema = writerFactory.getSchema(input.getAttributes(), reader.getSchema());
+            RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, os, output);
+
+            Record record;
+            Relationship targetRelationship = REL_FAILURE;
+            writer.beginRecordSet();
+
+            int recordsCount = 0;
+            boolean updated = false;
+            while ((record = reader.nextRecord()) != null) {
+                if(encode) {
+                    final int level = Integer.parseInt(context.getProperty(GEOHASH_LEVEL).getValue());
+                    final String rawLatitudePath = context.getProperty(LATITUDE_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+                    RecordPath latitudePath = cache.getCompiled(rawLatitudePath);
+                    final String rawLongitudePath = context.getProperty(LONGITUDE_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+                    RecordPath longitudePath = cache.getCompiled(rawLongitudePath);
+
+                    Object encodedGeohash = getEncodedGeohash(latitudePath, longitudePath, record, format, level);
+
+                    updated = updateRecord(GEOHASH_RECORD_PATH, encodedGeohash, record, paths);
+                }else {
+                    final String rawgeohashPath = context.getProperty(GEOHASH_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+                    RecordPath geohashPath = cache.getCompiled(rawgeohashPath);
+
+                    WGS84Point decodedPoint = getDecodedPointFromGeohash(geohashPath, record, format);
+
+                    if(decodedPoint != null) {
+                        updated = updateRecord(LATITUDE_RECORD_PATH, String.valueOf(decodedPoint.getLatitude()), record, paths)
+                                &&  updateRecord(LONGITUDE_RECORD_PATH, String.valueOf(decodedPoint.getLongitude()), record, paths);
+                    }
+                }
+
+                if (updated) {
+                    targetRelationship = REL_SUCCESS;
+                }
+
+                writer.write(record);
+                recordsCount++;
+
+            }
+            writer.finishRecordSet();
+            writer.close();
+
+            is.close();
+            os.close();
+
+            output = session.putAllAttributes(output, buildAttributes(recordsCount, writer.getMimeType()));
+
+            session.transfer(output, targetRelationship);
+            session.remove(input);
+
+            session.getProvenanceReporter().modifyContent(output);
+        } catch (Exception ex) {
+            getLogger().error("Failed to {}; will route to failure", new Object[]{encode ? "encode" : "decode" ,ex});
+            session.rollback();
+            context.yield();
+        }
+    }
+
+    private Object getEncodedGeohash(RecordPath latitudePath, RecordPath longitudePath, Record record, String format, int level) {
+        RecordPathResult latitudeResult = latitudePath.evaluate(record);
+        RecordPathResult longitudeResult = longitudePath.evaluate(record);
+        Optional<FieldValue> latitudeField = latitudeResult.getSelectedFields().findFirst();
+        Optional<FieldValue> longitudeField = longitudeResult.getSelectedFields().findFirst();
+
+        if (latitudeField.isPresent() && longitudeField.isPresent()) {
+            FieldValue latitudeValue = latitudeField.get();
+            FieldValue longitudeValue = longitudeField.get();
+            Object latitudeVal = latitudeValue.getValue();
+            Object longitudeVal = longitudeValue.getValue();
+
+            if (latitudeVal == null || longitudeVal == null) {
+                return null;
+            }
+
+            double realLatValue = Double.parseDouble(latitudeVal.toString());
+            double realLongValue = Double.parseDouble(longitudeVal.toString());
+            GeoHash gh = GeoHash.withCharacterPrecision(realLatValue, realLongValue, level);
+
+            switch (format) {
+                case BINARY_STRING:
+                    return gh.toBinaryString();
+                case LONG_INT:
+                    return gh.longValue();
+                default:
+                    return gh.toBase32();
+            }
+        } else {
+            return null;
+        }
+    }
+
+    private WGS84Point getDecodedPointFromGeohash(RecordPath geohashPath, Record record, String format) {
+        RecordPathResult geohashResult = geohashPath.evaluate(record);
+        Optional<FieldValue> geohashField = geohashResult.getSelectedFields().findFirst();
+        if (geohashField.isPresent()) {

Review comment:
       This will tend to be a bit cleaner if we reverse the conditional. I.e., instead of:
   ```
   if (geohashField.isPresent()) {
      ... a bunch of logic here, with indentation ...
   } else {
     return null;
   }
   ```
   It's simpler if we change this to:
   ```
   if (!geohashField.isPresent()) {
     return null;
   }
   
   ... the rest of the logic here...
   ```
   Makes it very clear that the logic is "if it's not present, just return null, nothing else we can do" and also means we avoid lots of lines of nested code.

##########
File path: nifi-nar-bundles/nifi-geohash-bundle/nifi-geohash-processors/src/main/java/org/apache/nifi/processors/geohash/GeohashRecord.java
##########
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.geohash;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import ch.hsr.geohash.GeoHash;
+import ch.hsr.geohash.WGS84Point;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Optional;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"geo", "geohash", "record"})
+@CapabilityDescription("A record-based processor that encodes and decodes Geohashes from and to latitude/longitude coordinates.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public class GeohashRecord extends AbstractProcessor {
+    public static final String ENCODE_MODE = "Encode";
+    public static final String DECODE_MODE = "Decode";
+
+    public static final String BINARY_STRING = "BinaryString";
+    public static final String BASE32 = "Base32";
+    public static final String LONG_INT = "LongInt";
+
+    public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
+            .name("mode")
+            .displayName("Mode")
+            .description("Specifies whether to encode latitude/longitude to geohash or decode geohash to latitude/longitude")
+            .required(true)
+            .allowableValues(ENCODE_MODE, DECODE_MODE)
+            .defaultValue(ENCODE_MODE)
+            .build();
+
+    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the record reader service to use for reading incoming data")
+            .required(true)
+            .identifiesControllerService(RecordReaderFactory.class)
+            .build();
+
+    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("Specifies the record writer service to use for writing data")
+            .required(true)
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .build();
+
+    public static final PropertyDescriptor LATITUDE_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("latitude-record-path")
+            .displayName("Latitude Record Path")
+            .description("In the encode mode, this property specifies the record path to retrieve the latitude value;"
+                    + "in the decode mode, this property specifies the record path to put the latitude value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor LONGITUDE_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("longitude-record-path")
+            .displayName("Longitude Record Path")
+            .description("In the encode mode, this property specifies the record path to retrieve the longitude value;"
+                    + "in the decode mode, this property specifies the record path to put the longitude value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_FORMAT = new PropertyDescriptor.Builder()
+            .name("geohash-format")
+            .displayName("Geohash Format")
+            .description("In the encode mode, this property specifies the desired format for encoding geohash;"
+                    + "in the decode mode, this property specifies the format of geohash provided")
+            .required(true)
+            .allowableValues(BASE32, BINARY_STRING, LONG_INT)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_LEVEL = new PropertyDescriptor.Builder()
+            .name("geohash-level")
+            .displayName("Geohash Level")
+            .description("The integer precision level(1-12) desired for encoding geohash")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .dependsOn(MODE, ENCODE_MODE)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("geohash-record-path")
+            .displayName("Geohash Record Path")
+            .description("In the encode mode, this property specifies the record path to put the geohash value;"
+                    + "in the decode mode, this property specifies the record path to retrieve the geohash value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Records that are successfully encoded or decoded will be routed to success")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("Records that cannot be encoded or decoded will be routed to failure")
+            .build();
+
+    public static final List<PropertyDescriptor> RECORD_PATH_PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            LATITUDE_RECORD_PATH,LONGITUDE_RECORD_PATH,GEOHASH_RECORD_PATH
+    ));
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        descriptors = new ArrayList<>();
+        descriptors.add(MODE);
+        descriptors.add(RECORD_READER);
+        descriptors.add(RECORD_WRITER);
+        descriptors.add(LATITUDE_RECORD_PATH);
+        descriptors.add(LONGITUDE_RECORD_PATH);
+        descriptors.add(GEOHASH_FORMAT);
+        descriptors.add(GEOHASH_LEVEL);
+        descriptors.add(GEOHASH_RECORD_PATH);
+        descriptors = Collections.unmodifiableList(descriptors);
+
+        relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) {
+        FlowFile input = session.get();
+        if ( input == null ) {
+            return;
+        }
+
+        final RecordReaderFactory readerFactory =  context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final boolean encode = context.getProperty(MODE).getValue().equalsIgnoreCase(ENCODE_MODE);
+        final String format = context.getProperty(GEOHASH_FORMAT).getValue();
+
+        FlowFile output = session.create(input);
+        try (InputStream is = session.read(input);
+             OutputStream os = session.write(output)) {
+            RecordPathCache cache = new RecordPathCache(RECORD_PATH_PROPERTIES.size() + 1);
+            Map<PropertyDescriptor, RecordPath> paths = new HashMap<>();
+
+            for (PropertyDescriptor descriptor : RECORD_PATH_PROPERTIES) {
+                String rawRecordPath = context.getProperty(descriptor).evaluateAttributeExpressions(input).getValue();
+                RecordPath compiled = cache.getCompiled(rawRecordPath);
+                paths.put(descriptor, compiled);
+            }
+
+            RecordReader reader = readerFactory.createRecordReader(input, is, getLogger());
+            RecordSchema schema = writerFactory.getSchema(input.getAttributes(), reader.getSchema());
+            RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, os, output);
+
+            Record record;
+            Relationship targetRelationship = REL_FAILURE;
+            writer.beginRecordSet();
+
+            int recordsCount = 0;
+            boolean updated = false;
+            while ((record = reader.nextRecord()) != null) {
+                if(encode) {
+                    final int level = Integer.parseInt(context.getProperty(GEOHASH_LEVEL).getValue());
+                    final String rawLatitudePath = context.getProperty(LATITUDE_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+                    RecordPath latitudePath = cache.getCompiled(rawLatitudePath);
+                    final String rawLongitudePath = context.getProperty(LONGITUDE_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+                    RecordPath longitudePath = cache.getCompiled(rawLongitudePath);
+
+                    Object encodedGeohash = getEncodedGeohash(latitudePath, longitudePath, record, format, level);
+
+                    updated = updateRecord(GEOHASH_RECORD_PATH, encodedGeohash, record, paths);
+                }else {
+                    final String rawgeohashPath = context.getProperty(GEOHASH_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+                    RecordPath geohashPath = cache.getCompiled(rawgeohashPath);
+
+                    WGS84Point decodedPoint = getDecodedPointFromGeohash(geohashPath, record, format);
+
+                    if(decodedPoint != null) {
+                        updated = updateRecord(LATITUDE_RECORD_PATH, String.valueOf(decodedPoint.getLatitude()), record, paths)
+                                &&  updateRecord(LONGITUDE_RECORD_PATH, String.valueOf(decodedPoint.getLongitude()), record, paths);
+                    }
+                }
+
+                if (updated) {
+                    targetRelationship = REL_SUCCESS;
+                }
+
+                writer.write(record);
+                recordsCount++;
+
+            }
+            writer.finishRecordSet();
+            writer.close();
+
+            is.close();
+            os.close();
+
+            output = session.putAllAttributes(output, buildAttributes(recordsCount, writer.getMimeType()));
+
+            session.transfer(output, targetRelationship);
+            session.remove(input);

Review comment:
       We should avoid removing the original FlowFile. Instead, we should create an "original" relationship and transfer the FlowFile to that Relationship. Can use
   ```
   new Relationship.Builder()
       .name("original")
       .description("The original, unenriched FlowFile will be routed to this relationship")
       .autoTerminateDefault(true)
       .build();
   ```
   in order to have it auto-terminate the 'original' relationship be default. Most of the time it's not necessary to have the default relationship but it's helpful in some use cases.

##########
File path: nifi-nar-bundles/nifi-geohash-bundle/nifi-geohash-processors/src/main/java/org/apache/nifi/processors/geohash/GeohashRecord.java
##########
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.geohash;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import ch.hsr.geohash.GeoHash;
+import ch.hsr.geohash.WGS84Point;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Optional;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"geo", "geohash", "record"})
+@CapabilityDescription("A record-based processor that encodes and decodes Geohashes from and to latitude/longitude coordinates.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public class GeohashRecord extends AbstractProcessor {
+    public static final String ENCODE_MODE = "Encode";
+    public static final String DECODE_MODE = "Decode";
+
+    public static final String BINARY_STRING = "BinaryString";
+    public static final String BASE32 = "Base32";
+    public static final String LONG_INT = "LongInt";
+
+    public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
+            .name("mode")
+            .displayName("Mode")
+            .description("Specifies whether to encode latitude/longitude to geohash or decode geohash to latitude/longitude")
+            .required(true)
+            .allowableValues(ENCODE_MODE, DECODE_MODE)
+            .defaultValue(ENCODE_MODE)
+            .build();
+
+    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the record reader service to use for reading incoming data")
+            .required(true)
+            .identifiesControllerService(RecordReaderFactory.class)
+            .build();
+
+    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("Specifies the record writer service to use for writing data")
+            .required(true)
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .build();
+
+    public static final PropertyDescriptor LATITUDE_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("latitude-record-path")
+            .displayName("Latitude Record Path")
+            .description("In the encode mode, this property specifies the record path to retrieve the latitude value;"
+                    + "in the decode mode, this property specifies the record path to put the latitude value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor LONGITUDE_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("longitude-record-path")
+            .displayName("Longitude Record Path")
+            .description("In the encode mode, this property specifies the record path to retrieve the longitude value;"
+                    + "in the decode mode, this property specifies the record path to put the longitude value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_FORMAT = new PropertyDescriptor.Builder()
+            .name("geohash-format")
+            .displayName("Geohash Format")
+            .description("In the encode mode, this property specifies the desired format for encoding geohash;"
+                    + "in the decode mode, this property specifies the format of geohash provided")
+            .required(true)
+            .allowableValues(BASE32, BINARY_STRING, LONG_INT)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_LEVEL = new PropertyDescriptor.Builder()
+            .name("geohash-level")
+            .displayName("Geohash Level")
+            .description("The integer precision level(1-12) desired for encoding geohash")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .dependsOn(MODE, ENCODE_MODE)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("geohash-record-path")
+            .displayName("Geohash Record Path")
+            .description("In the encode mode, this property specifies the record path to put the geohash value;"
+                    + "in the decode mode, this property specifies the record path to retrieve the geohash value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Records that are successfully encoded or decoded will be routed to success")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("Records that cannot be encoded or decoded will be routed to failure")
+            .build();
+
+    public static final List<PropertyDescriptor> RECORD_PATH_PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            LATITUDE_RECORD_PATH,LONGITUDE_RECORD_PATH,GEOHASH_RECORD_PATH
+    ));
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        descriptors = new ArrayList<>();
+        descriptors.add(MODE);
+        descriptors.add(RECORD_READER);
+        descriptors.add(RECORD_WRITER);
+        descriptors.add(LATITUDE_RECORD_PATH);
+        descriptors.add(LONGITUDE_RECORD_PATH);
+        descriptors.add(GEOHASH_FORMAT);
+        descriptors.add(GEOHASH_LEVEL);
+        descriptors.add(GEOHASH_RECORD_PATH);
+        descriptors = Collections.unmodifiableList(descriptors);
+
+        relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) {
+        FlowFile input = session.get();
+        if ( input == null ) {
+            return;
+        }
+
+        final RecordReaderFactory readerFactory =  context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final boolean encode = context.getProperty(MODE).getValue().equalsIgnoreCase(ENCODE_MODE);
+        final String format = context.getProperty(GEOHASH_FORMAT).getValue();
+
+        FlowFile output = session.create(input);
+        try (InputStream is = session.read(input);
+             OutputStream os = session.write(output)) {
+            RecordPathCache cache = new RecordPathCache(RECORD_PATH_PROPERTIES.size() + 1);
+            Map<PropertyDescriptor, RecordPath> paths = new HashMap<>();
+
+            for (PropertyDescriptor descriptor : RECORD_PATH_PROPERTIES) {
+                String rawRecordPath = context.getProperty(descriptor).evaluateAttributeExpressions(input).getValue();
+                RecordPath compiled = cache.getCompiled(rawRecordPath);
+                paths.put(descriptor, compiled);
+            }
+
+            RecordReader reader = readerFactory.createRecordReader(input, is, getLogger());
+            RecordSchema schema = writerFactory.getSchema(input.getAttributes(), reader.getSchema());
+            RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, os, output);
+
+            Record record;
+            Relationship targetRelationship = REL_FAILURE;
+            writer.beginRecordSet();
+
+            int recordsCount = 0;
+            boolean updated = false;
+            while ((record = reader.nextRecord()) != null) {
+                if(encode) {
+                    final int level = Integer.parseInt(context.getProperty(GEOHASH_LEVEL).getValue());
+                    final String rawLatitudePath = context.getProperty(LATITUDE_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+                    RecordPath latitudePath = cache.getCompiled(rawLatitudePath);
+                    final String rawLongitudePath = context.getProperty(LONGITUDE_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+                    RecordPath longitudePath = cache.getCompiled(rawLongitudePath);
+
+                    Object encodedGeohash = getEncodedGeohash(latitudePath, longitudePath, record, format, level);
+
+                    updated = updateRecord(GEOHASH_RECORD_PATH, encodedGeohash, record, paths);
+                }else {
+                    final String rawgeohashPath = context.getProperty(GEOHASH_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+                    RecordPath geohashPath = cache.getCompiled(rawgeohashPath);
+
+                    WGS84Point decodedPoint = getDecodedPointFromGeohash(geohashPath, record, format);
+
+                    if(decodedPoint != null) {
+                        updated = updateRecord(LATITUDE_RECORD_PATH, String.valueOf(decodedPoint.getLatitude()), record, paths)
+                                &&  updateRecord(LONGITUDE_RECORD_PATH, String.valueOf(decodedPoint.getLongitude()), record, paths);
+                    }
+                }
+
+                if (updated) {
+                    targetRelationship = REL_SUCCESS;
+                }
+
+                writer.write(record);
+                recordsCount++;
+
+            }
+            writer.finishRecordSet();
+            writer.close();
+
+            is.close();
+            os.close();
+
+            output = session.putAllAttributes(output, buildAttributes(recordsCount, writer.getMimeType()));
+
+            session.transfer(output, targetRelationship);
+            session.remove(input);
+
+            session.getProvenanceReporter().modifyContent(output);
+        } catch (Exception ex) {
+            getLogger().error("Failed to {}; will route to failure", new Object[]{encode ? "encode" : "decode" ,ex});
+            session.rollback();
+            context.yield();

Review comment:
       No need to rollback or yield here. Should instead route the incoming FlowFile to 'failure'

##########
File path: nifi-nar-bundles/nifi-geohash-bundle/nifi-geohash-processors/src/main/java/org/apache/nifi/processors/geohash/GeohashRecord.java
##########
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.geohash;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import ch.hsr.geohash.GeoHash;
+import ch.hsr.geohash.WGS84Point;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Optional;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"geo", "geohash", "record"})
+@CapabilityDescription("A record-based processor that encodes and decodes Geohashes from and to latitude/longitude coordinates.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public class GeohashRecord extends AbstractProcessor {
+    public static final String ENCODE_MODE = "Encode";
+    public static final String DECODE_MODE = "Decode";
+
+    public static final String BINARY_STRING = "BinaryString";
+    public static final String BASE32 = "Base32";
+    public static final String LONG_INT = "LongInt";
+
+    public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
+            .name("mode")
+            .displayName("Mode")
+            .description("Specifies whether to encode latitude/longitude to geohash or decode geohash to latitude/longitude")
+            .required(true)
+            .allowableValues(ENCODE_MODE, DECODE_MODE)
+            .defaultValue(ENCODE_MODE)
+            .build();
+
+    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the record reader service to use for reading incoming data")
+            .required(true)
+            .identifiesControllerService(RecordReaderFactory.class)
+            .build();
+
+    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("Specifies the record writer service to use for writing data")
+            .required(true)
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .build();
+
+    public static final PropertyDescriptor LATITUDE_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("latitude-record-path")
+            .displayName("Latitude Record Path")
+            .description("In the encode mode, this property specifies the record path to retrieve the latitude value;"
+                    + "in the decode mode, this property specifies the record path to put the latitude value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor LONGITUDE_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("longitude-record-path")
+            .displayName("Longitude Record Path")
+            .description("In the encode mode, this property specifies the record path to retrieve the longitude value;"
+                    + "in the decode mode, this property specifies the record path to put the longitude value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_FORMAT = new PropertyDescriptor.Builder()
+            .name("geohash-format")
+            .displayName("Geohash Format")
+            .description("In the encode mode, this property specifies the desired format for encoding geohash;"
+                    + "in the decode mode, this property specifies the format of geohash provided")
+            .required(true)
+            .allowableValues(BASE32, BINARY_STRING, LONG_INT)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_LEVEL = new PropertyDescriptor.Builder()
+            .name("geohash-level")
+            .displayName("Geohash Level")
+            .description("The integer precision level(1-12) desired for encoding geohash")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .dependsOn(MODE, ENCODE_MODE)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("geohash-record-path")
+            .displayName("Geohash Record Path")
+            .description("In the encode mode, this property specifies the record path to put the geohash value;"
+                    + "in the decode mode, this property specifies the record path to retrieve the geohash value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Records that are successfully encoded or decoded will be routed to success")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("Records that cannot be encoded or decoded will be routed to failure")
+            .build();
+
+    public static final List<PropertyDescriptor> RECORD_PATH_PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            LATITUDE_RECORD_PATH,LONGITUDE_RECORD_PATH,GEOHASH_RECORD_PATH
+    ));
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        descriptors = new ArrayList<>();
+        descriptors.add(MODE);
+        descriptors.add(RECORD_READER);
+        descriptors.add(RECORD_WRITER);
+        descriptors.add(LATITUDE_RECORD_PATH);
+        descriptors.add(LONGITUDE_RECORD_PATH);
+        descriptors.add(GEOHASH_FORMAT);
+        descriptors.add(GEOHASH_LEVEL);
+        descriptors.add(GEOHASH_RECORD_PATH);
+        descriptors = Collections.unmodifiableList(descriptors);
+
+        relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) {
+        FlowFile input = session.get();
+        if ( input == null ) {
+            return;
+        }
+
+        final RecordReaderFactory readerFactory =  context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final boolean encode = context.getProperty(MODE).getValue().equalsIgnoreCase(ENCODE_MODE);
+        final String format = context.getProperty(GEOHASH_FORMAT).getValue();
+
+        FlowFile output = session.create(input);
+        try (InputStream is = session.read(input);
+             OutputStream os = session.write(output)) {
+            RecordPathCache cache = new RecordPathCache(RECORD_PATH_PROPERTIES.size() + 1);
+            Map<PropertyDescriptor, RecordPath> paths = new HashMap<>();
+
+            for (PropertyDescriptor descriptor : RECORD_PATH_PROPERTIES) {
+                String rawRecordPath = context.getProperty(descriptor).evaluateAttributeExpressions(input).getValue();
+                RecordPath compiled = cache.getCompiled(rawRecordPath);
+                paths.put(descriptor, compiled);
+            }
+
+            RecordReader reader = readerFactory.createRecordReader(input, is, getLogger());
+            RecordSchema schema = writerFactory.getSchema(input.getAttributes(), reader.getSchema());
+            RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, os, output);
+
+            Record record;
+            Relationship targetRelationship = REL_FAILURE;
+            writer.beginRecordSet();
+
+            int recordsCount = 0;
+            boolean updated = false;
+            while ((record = reader.nextRecord()) != null) {
+                if(encode) {
+                    final int level = Integer.parseInt(context.getProperty(GEOHASH_LEVEL).getValue());
+                    final String rawLatitudePath = context.getProperty(LATITUDE_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+                    RecordPath latitudePath = cache.getCompiled(rawLatitudePath);
+                    final String rawLongitudePath = context.getProperty(LONGITUDE_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+                    RecordPath longitudePath = cache.getCompiled(rawLongitudePath);
+
+                    Object encodedGeohash = getEncodedGeohash(latitudePath, longitudePath, record, format, level);
+
+                    updated = updateRecord(GEOHASH_RECORD_PATH, encodedGeohash, record, paths);
+                }else {
+                    final String rawgeohashPath = context.getProperty(GEOHASH_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+                    RecordPath geohashPath = cache.getCompiled(rawgeohashPath);
+
+                    WGS84Point decodedPoint = getDecodedPointFromGeohash(geohashPath, record, format);
+
+                    if(decodedPoint != null) {
+                        updated = updateRecord(LATITUDE_RECORD_PATH, String.valueOf(decodedPoint.getLatitude()), record, paths)
+                                &&  updateRecord(LONGITUDE_RECORD_PATH, String.valueOf(decodedPoint.getLongitude()), record, paths);
+                    }
+                }
+
+                if (updated) {
+                    targetRelationship = REL_SUCCESS;
+                }
+
+                writer.write(record);
+                recordsCount++;
+
+            }
+            writer.finishRecordSet();

Review comment:
       This call to `finishRecordSet()` will return a `WriteResult`. It will be important to capture that `WriteResult` and call `getAttributes()` on it and include those values in the call to `buildAttributes` below. This allows the RecordWriter to provide attributes that need to go onto the FlowFile, such as the schema, etc.

##########
File path: nifi-nar-bundles/nifi-geohash-bundle/nifi-geohash-processors/src/main/java/org/apache/nifi/processors/geohash/GeohashRecord.java
##########
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.geohash;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import ch.hsr.geohash.GeoHash;
+import ch.hsr.geohash.WGS84Point;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Optional;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"geo", "geohash", "record"})
+@CapabilityDescription("A record-based processor that encodes and decodes Geohashes from and to latitude/longitude coordinates.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public class GeohashRecord extends AbstractProcessor {
+    public static final String ENCODE_MODE = "Encode";
+    public static final String DECODE_MODE = "Decode";
+
+    public static final String BINARY_STRING = "BinaryString";
+    public static final String BASE32 = "Base32";
+    public static final String LONG_INT = "LongInt";
+
+    public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
+            .name("mode")
+            .displayName("Mode")
+            .description("Specifies whether to encode latitude/longitude to geohash or decode geohash to latitude/longitude")
+            .required(true)
+            .allowableValues(ENCODE_MODE, DECODE_MODE)
+            .defaultValue(ENCODE_MODE)
+            .build();
+
+    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the record reader service to use for reading incoming data")
+            .required(true)
+            .identifiesControllerService(RecordReaderFactory.class)
+            .build();
+
+    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("Specifies the record writer service to use for writing data")
+            .required(true)
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .build();
+
+    public static final PropertyDescriptor LATITUDE_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("latitude-record-path")
+            .displayName("Latitude Record Path")
+            .description("In the encode mode, this property specifies the record path to retrieve the latitude value;"
+                    + "in the decode mode, this property specifies the record path to put the latitude value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor LONGITUDE_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("longitude-record-path")
+            .displayName("Longitude Record Path")
+            .description("In the encode mode, this property specifies the record path to retrieve the longitude value;"
+                    + "in the decode mode, this property specifies the record path to put the longitude value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_FORMAT = new PropertyDescriptor.Builder()
+            .name("geohash-format")
+            .displayName("Geohash Format")
+            .description("In the encode mode, this property specifies the desired format for encoding geohash;"
+                    + "in the decode mode, this property specifies the format of geohash provided")
+            .required(true)
+            .allowableValues(BASE32, BINARY_STRING, LONG_INT)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_LEVEL = new PropertyDescriptor.Builder()
+            .name("geohash-level")
+            .displayName("Geohash Level")
+            .description("The integer precision level(1-12) desired for encoding geohash")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .dependsOn(MODE, ENCODE_MODE)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("geohash-record-path")
+            .displayName("Geohash Record Path")
+            .description("In the encode mode, this property specifies the record path to put the geohash value;"
+                    + "in the decode mode, this property specifies the record path to retrieve the geohash value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Records that are successfully encoded or decoded will be routed to success")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("Records that cannot be encoded or decoded will be routed to failure")
+            .build();
+
+    public static final List<PropertyDescriptor> RECORD_PATH_PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            LATITUDE_RECORD_PATH,LONGITUDE_RECORD_PATH,GEOHASH_RECORD_PATH
+    ));
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        descriptors = new ArrayList<>();
+        descriptors.add(MODE);
+        descriptors.add(RECORD_READER);
+        descriptors.add(RECORD_WRITER);
+        descriptors.add(LATITUDE_RECORD_PATH);
+        descriptors.add(LONGITUDE_RECORD_PATH);
+        descriptors.add(GEOHASH_FORMAT);
+        descriptors.add(GEOHASH_LEVEL);
+        descriptors.add(GEOHASH_RECORD_PATH);
+        descriptors = Collections.unmodifiableList(descriptors);
+
+        relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) {
+        FlowFile input = session.get();
+        if ( input == null ) {
+            return;
+        }
+
+        final RecordReaderFactory readerFactory =  context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final boolean encode = context.getProperty(MODE).getValue().equalsIgnoreCase(ENCODE_MODE);
+        final String format = context.getProperty(GEOHASH_FORMAT).getValue();
+
+        FlowFile output = session.create(input);
+        try (InputStream is = session.read(input);
+             OutputStream os = session.write(output)) {
+            RecordPathCache cache = new RecordPathCache(RECORD_PATH_PROPERTIES.size() + 1);
+            Map<PropertyDescriptor, RecordPath> paths = new HashMap<>();
+
+            for (PropertyDescriptor descriptor : RECORD_PATH_PROPERTIES) {
+                String rawRecordPath = context.getProperty(descriptor).evaluateAttributeExpressions(input).getValue();
+                RecordPath compiled = cache.getCompiled(rawRecordPath);
+                paths.put(descriptor, compiled);
+            }
+
+            RecordReader reader = readerFactory.createRecordReader(input, is, getLogger());
+            RecordSchema schema = writerFactory.getSchema(input.getAttributes(), reader.getSchema());
+            RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, os, output);
+
+            Record record;
+            Relationship targetRelationship = REL_FAILURE;
+            writer.beginRecordSet();
+
+            int recordsCount = 0;
+            boolean updated = false;
+            while ((record = reader.nextRecord()) != null) {
+                if(encode) {
+                    final int level = Integer.parseInt(context.getProperty(GEOHASH_LEVEL).getValue());
+                    final String rawLatitudePath = context.getProperty(LATITUDE_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+                    RecordPath latitudePath = cache.getCompiled(rawLatitudePath);
+                    final String rawLongitudePath = context.getProperty(LONGITUDE_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+                    RecordPath longitudePath = cache.getCompiled(rawLongitudePath);
+
+                    Object encodedGeohash = getEncodedGeohash(latitudePath, longitudePath, record, format, level);
+
+                    updated = updateRecord(GEOHASH_RECORD_PATH, encodedGeohash, record, paths);
+                }else {
+                    final String rawgeohashPath = context.getProperty(GEOHASH_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+                    RecordPath geohashPath = cache.getCompiled(rawgeohashPath);
+
+                    WGS84Point decodedPoint = getDecodedPointFromGeohash(geohashPath, record, format);
+
+                    if(decodedPoint != null) {
+                        updated = updateRecord(LATITUDE_RECORD_PATH, String.valueOf(decodedPoint.getLatitude()), record, paths)
+                                &&  updateRecord(LONGITUDE_RECORD_PATH, String.valueOf(decodedPoint.getLongitude()), record, paths);
+                    }
+                }
+
+                if (updated) {

Review comment:
       As this is written, the destination relationship for ALL records will be either 'success' or 'failure' depending on whether or not the last Record in the FlowFile is successful. We should route the original FlowFile to 'failure' if any Record cannot be enriched, OR we should route each individual Record either to 'success' or 'failure'. Or we should simply enrich the records that we could enrich and skip others. I would lean toward providing a property to allow the user to decide. I can definitely envision a use case where users want to say "I expect all records to be enriched. Otherwise, something is wrong and I want the whole FlowFile to be failed." And I can also imagine other use cases where the user will say "Some of the Records have the latitude/longitude elements and some don't. If the elements are there, enrich them, otherwise skip them." So a property gives the user the freedom to configure it as necessary for their use case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org