You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/06/01 16:30:32 UTC

[GitHub] [nifi] alopresto commented on a change in pull request #3984: NIFI-6970 add DistributeRecord processor

alopresto commented on a change in pull request #3984:
URL: https://github.com/apache/nifi/pull/3984#discussion_r433348987



##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeHashRecord.java
##########
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import com.sangupta.murmur.Murmur2;
+
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.ProcessSession;
+
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Comparator;
+
+
+@SideEffectFree
+@Tags({"route", "distribute", "weighted", "record"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Processor that distribute data over user specified relationships by distribution key/keys." +
+        " Data is distributed across relationships in the amount proportional to the relationship weight. For example, if there are " +
+        "two relationships and the first has a weight of 9 while the second has a weight of 10, the first will be sent 9 / 19 parts" +
+        " of the rows, and the second will be sent 10 / 19. " +
+        "To select the relationship that a row of data is sent to, specified keys extracted from record as string," +
+        " join with `-` delimiter and hash evaluate from this string, its remainder is taken from dividing it " +
+        "by the total weight of the relationships. If there is specified single integer key, hash value will not be calculated " +
+        "and processor just take remainder of division by the sum of the relationship weights from this value." +
+        " The row is sent to the relationship" +
+        " that corresponds to the half-interval of the remainders from 'prev_weight' to 'prev_weights + weight', where" +
+        " 'prev_weights' is the total weight of the relationships with the smallest number, and 'weight' is the weight of this relationship." +
+        " For example, if there are two relationships, and the first has a weight of 9 while the second has a weight of 10," +
+        " the row will be sent to the first relationship for the remainders from the range [0, 9), and to the second for the remainders from the range [9, 19).")
+@DynamicProperty(name = "The name of the relationship to route data to",
+        value = "Weight for this relationship.",
+        expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+        description = "Each user-defined property specifies a relationship and weight for this.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer"),
+        @WritesAttribute(attribute = "record.count", description = "The number of records selected by the query")
+})
+public class DistributeHashRecord extends AbstractProcessor {
+
+    public static final String MURMURHASH_32 = "murmurhash_32";
+
+    public static PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("reader")
+            .displayName("Record Reader")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema")
+            .build();
+
+    public static PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+            .name("writer")
+            .displayName("Record Writer")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .required(true)
+            .description("Specifies the Controller Service to use for writing out the records")
+            .build();
+
+    private static final String KEY_DELIMITER = ",";
+
+    public static PropertyDescriptor KEYS = new PropertyDescriptor.Builder()
+            .name("keys")
+            .displayName("Keys")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .description("Field names in a record separated by commas." +
+                    " If record has one key and this key is integer or long then hash function " +
+                    "will not be evaluated and processor will distribute record by this numerical value. " +
+                    "If this record has several keys for distribution or one key with not 'int' or 'long' type then processor will " +
+                    "obtain keys from record, trim them and join with `-` delimiter like <firstKey>-<secondKey>-<...> " +
+                    "then evaluate hash function which return numerical value for distribution")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static PropertyDescriptor HASH_FUNCTION = new PropertyDescriptor.Builder()
+            .name("hash function")
+            .displayName("Hash Function")
+            .required(true)
+            .description("Hash algorithm for keys hashing")
+            .allowableValues(MURMURHASH_32)

Review comment:
       I'm curious what this hash function is, and why there is a configurable property for it if there is only one allowable value. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org