You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/11/24 14:59:59 UTC

[GitHub] [nifi] YolandaMDavis commented on a change in pull request #5476: NIFI-9259: Adding GeohashRecord Processor

YolandaMDavis commented on a change in pull request #5476:
URL: https://github.com/apache/nifi/pull/5476#discussion_r756159196



##########
File path: nifi-nar-bundles/nifi-geohash-bundle/nifi-geohash-processors/src/main/java/org/apache/nifi/processors/geohash/GeohashRecord.java
##########
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.geohash;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import ch.hsr.geohash.GeoHash;
+import ch.hsr.geohash.WGS84Point;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Optional;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"geo", "geohash", "record"})
+@CapabilityDescription("A record-based processor that encodes and decodes Geohashes from and to latitude/longitude coordinates.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public class GeohashRecord extends AbstractProcessor {
+    public static final String ENCODE_MODE = "Encode";
+    public static final String DECODE_MODE = "Decode";
+
+    public static final String BINARY_STRING = "BinaryString";
+    public static final String BASE32 = "Base32";
+    public static final String LONG_INT = "LongInt";
+
+    public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
+            .name("mode")
+            .displayName("Mode")
+            .description("Specifies whether to encode latitude/longitude to geohash or decode geohash to latitude/longitude")
+            .required(true)
+            .allowableValues(ENCODE_MODE, DECODE_MODE)
+            .defaultValue(ENCODE_MODE)
+            .build();
+
+    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the record reader service to use for reading incoming data")
+            .required(true)
+            .identifiesControllerService(RecordReaderFactory.class)
+            .build();
+
+    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("Specifies the record writer service to use for writing data")
+            .required(true)
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .build();
+
+    public static final PropertyDescriptor LATITUDE_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("latitude-record-path")
+            .displayName("Latitude Record Path")
+            .description("In the encode mode, this property specifies the record path to retrieve the latitude value;"
+                    + "in the decode mode, this property specifies the record path to put the latitude value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor LONGITUDE_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("longitude-record-path")
+            .displayName("Longitude Record Path")
+            .description("In the encode mode, this property specifies the record path to retrieve the longitude value;"
+                    + "in the decode mode, this property specifies the record path to put the longitude value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_FORMAT = new PropertyDescriptor.Builder()
+            .name("geohash-format")
+            .displayName("Geohash Format")
+            .description("In the encode mode, this property specifies the desired format for encoding geohash;"
+                    + "in the decode mode, this property specifies the format of geohash provided")
+            .required(true)
+            .allowableValues(BASE32, BINARY_STRING, LONG_INT)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_LEVEL = new PropertyDescriptor.Builder()
+            .name("geohash-level")
+            .displayName("Geohash Level")
+            .description("The integer precision level(1-12) desired for encoding geohash")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .dependsOn(MODE, ENCODE_MODE)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("geohash-record-path")
+            .displayName("Geohash Record Path")
+            .description("In the encode mode, this property specifies the record path to put the geohash value;"
+                    + "in the decode mode, this property specifies the record path to retrieve the geohash value")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Records that are successfully encoded or decoded will be routed to success")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("Records that cannot be encoded or decoded will be routed to failure")
+            .build();
+
+    public static final List<PropertyDescriptor> RECORD_PATH_PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            LATITUDE_RECORD_PATH,LONGITUDE_RECORD_PATH,GEOHASH_RECORD_PATH
+    ));
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        descriptors = new ArrayList<>();
+        descriptors.add(MODE);
+        descriptors.add(RECORD_READER);
+        descriptors.add(RECORD_WRITER);
+        descriptors.add(LATITUDE_RECORD_PATH);
+        descriptors.add(LONGITUDE_RECORD_PATH);
+        descriptors.add(GEOHASH_FORMAT);
+        descriptors.add(GEOHASH_LEVEL);
+        descriptors.add(GEOHASH_RECORD_PATH);
+        descriptors = Collections.unmodifiableList(descriptors);
+
+        relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) {
+        FlowFile input = session.get();
+        if ( input == null ) {
+            return;
+        }
+
+        final RecordReaderFactory readerFactory =  context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final boolean encode = context.getProperty(MODE).getValue().equalsIgnoreCase(ENCODE_MODE);
+        final String format = context.getProperty(GEOHASH_FORMAT).getValue();
+
+        FlowFile output = session.create(input);
+        try (InputStream is = session.read(input);
+             OutputStream os = session.write(output)) {
+            RecordPathCache cache = new RecordPathCache(RECORD_PATH_PROPERTIES.size() + 1);
+            Map<PropertyDescriptor, RecordPath> paths = new HashMap<>();
+
+            for (PropertyDescriptor descriptor : RECORD_PATH_PROPERTIES) {
+                String rawRecordPath = context.getProperty(descriptor).evaluateAttributeExpressions(input).getValue();
+                RecordPath compiled = cache.getCompiled(rawRecordPath);
+                paths.put(descriptor, compiled);
+            }
+
+            RecordReader reader = readerFactory.createRecordReader(input, is, getLogger());
+            RecordSchema schema = writerFactory.getSchema(input.getAttributes(), reader.getSchema());
+            RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, os, output);
+
+            Record record;
+            Relationship targetRelationship = REL_FAILURE;
+            writer.beginRecordSet();
+
+            int recordsCount = 0;
+            boolean updated = false;
+            while ((record = reader.nextRecord()) != null) {
+                if(encode) {
+                    final int level = Integer.parseInt(context.getProperty(GEOHASH_LEVEL).getValue());
+                    final String rawLatitudePath = context.getProperty(LATITUDE_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+                    RecordPath latitudePath = cache.getCompiled(rawLatitudePath);
+                    final String rawLongitudePath = context.getProperty(LONGITUDE_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+                    RecordPath longitudePath = cache.getCompiled(rawLongitudePath);
+
+                    Object encodedGeohash = getEncodedGeohash(latitudePath, longitudePath, record, format, level);
+
+                    updated = updateRecord(GEOHASH_RECORD_PATH, encodedGeohash, record, paths);
+                }else {
+                    final String rawgeohashPath = context.getProperty(GEOHASH_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+                    RecordPath geohashPath = cache.getCompiled(rawgeohashPath);
+
+                    WGS84Point decodedPoint = getDecodedPointFromGeohash(geohashPath, record, format);
+
+                    if(decodedPoint != null) {
+                        updated = updateRecord(LATITUDE_RECORD_PATH, String.valueOf(decodedPoint.getLatitude()), record, paths)
+                                &&  updateRecord(LONGITUDE_RECORD_PATH, String.valueOf(decodedPoint.getLongitude()), record, paths);
+                    }
+                }
+
+                if (updated) {

Review comment:
        @mikayla-yang I'd like to recommend another alternative which I think would simplify a bit the approach here, yet still aligns with what @markap14 described.  I think this can be similar to what was done in GeoEnrichIPRecord, by having a property that indicates whether or not processing should be "all or nothing" or to allow individual record processing.  And instead of having the success or fail have a "found", "not found"  and "original" relationships.
   
   In the instance where it's all or nothing and we have one record that did not produce a geohash, the entire flow file would land on the "not found" relationship, otherwise it would land in "found".  In the case where we'd support individual record processing, it would then ensure to split records into found or not found as needed.  And it would always populate the original connection with the original payload no matter what option is selected.
   
   What do you think of this? @markap14 does this align with the scenario you described? @mikayla-yang would this be something you could migrate to if @markap14 agrees it's a fit?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org