You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by abhinavrohatgi30 <gi...@git.apache.org> on 2018/03/17 15:57:09 UTC

[GitHub] nifi pull request #2561: NIFI-4035 Implement record-based Solr processors

GitHub user abhinavrohatgi30 opened a pull request:

    https://github.com/apache/nifi/pull/2561

    NIFI-4035 Implement record-based Solr processors

    Thank you for submitting a contribution to Apache NiFi.
    
    In order to streamline the review of the contribution we ask you
    to ensure the following steps have been taken:
    
    ### For all changes:
    - [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
         in the commit message?
    
    - [ ] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
    
    - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)?
    
    - [ ] Is your initial contribution a single, squashed commit?
    
    ### For code changes:
    - [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
    - [ ] Have you written or updated unit tests to verify your changes?
    - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? 
    - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
    - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
    - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?
    
    ### For documentation related changes:
    - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
    
    ### Note:
    Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/abhinavrohatgi30/nifi nifi-4035

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/2561.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2561
    
----
commit 4532645294a225b5b3cde6cf59254a3a38ca15f6
Author: abhinavrohatgi30 <ab...@...>
Date:   2018-03-17T15:35:06Z

    Adding PutSolrRecord Processor that reads NiFi records and indexes them into Solr as SolrDocuments

commit 313a95ef59f5fff31c6bd9a032bc4d82de7df2f9
Author: abhinavrohatgi30 <ab...@...>
Date:   2018-03-17T15:36:04Z

    Adding Test Cases for PutSolrRecord Processor

commit 76003a1b1ef5449ee3cbd51b244dc27e946b5ea3
Author: abhinavrohatgi30 <ab...@...>
Date:   2018-03-17T15:36:58Z

    Adding PutSolrRecord Processor in the list of Processors

----


---

[GitHub] nifi issue #2561: NIFI-4035 Implement record-based Solr processors

Posted by abhinavrohatgi30 <gi...@git.apache.org>.
Github user abhinavrohatgi30 commented on the issue:

    https://github.com/apache/nifi/pull/2561
  
    @bbende  @MikeThomsen  Thanks for reviewing the pull request


---

[GitHub] nifi pull request #2561: NIFI-4035 Implement record-based Solr processors

Posted by abhinavrohatgi30 <gi...@git.apache.org>.
Github user abhinavrohatgi30 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2561#discussion_r175840062
  
    --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java ---
    @@ -280,5 +291,115 @@ public SolrInputDocument toSolrInputDocument(SolrDocument d) {
             }
         }
     
    +    /**
    +     * Writes each Record as a SolrInputDocument.
    +     */
    +    public static void writeRecord(final Record record, final RecordSchema writeSchema, final SolrInputDocument inputDocument,final List<String> fieldsToIndex)
    +            throws IOException {
    +        RecordSchema schema = record.getSchema();
    +
    +        for (int i = 0; i < schema.getFieldCount(); i++) {
    +            final RecordField field = schema.getField(i);
    +            final String fieldName = field.getFieldName();
    +            final Object value = record.getValue(field);
    +            if (value == null || (!fieldsToIndex.isEmpty() && !fieldsToIndex.contains(fieldName))) {
    +                continue;
    +            }else {
    +                final DataType dataType = schema.getDataType(fieldName).get();
    +                writeValue(inputDocument, value, fieldName, dataType,fieldsToIndex);
    +            }
    +        }
    +    }
     
    +    private static void writeValue(final SolrInputDocument inputDocument, final Object value, final String fieldName, final DataType dataType,final List<String> fieldsToIndex) throws IOException {
    +        final DataType chosenDataType = dataType.getFieldType() == RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType(value, (ChoiceDataType) dataType) : dataType;
    +        final Object coercedValue = DataTypeUtils.convertType(value, chosenDataType, fieldName);
    +        if (coercedValue == null) {
    +            return;
    +        }
    +
    +        switch (chosenDataType.getFieldType()) {
    +            case DATE: {
    +                final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat()));
    +                if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
    +                    LocalDate localDate = getLocalDateFromEpochTime(fieldName, coercedValue);
    +                    inputDocument.addField(fieldName,localDate.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                } else {
    +                    inputDocument.addField(fieldName,LocalDate.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                }
    +                break;
    +            }
    +            case TIMESTAMP: {
    +                final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat()));
    +                if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
    +                    LocalDateTime localDateTime = getLocalDateTimeFromEpochTime(fieldName, coercedValue);
    +                    inputDocument.addField(fieldName,localDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                } else {
    +                    inputDocument.addField(fieldName,LocalDateTime.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                }
    +                break;
    +            }
    +            case DOUBLE:
    +                inputDocument.addField(fieldName,DataTypeUtils.toDouble(coercedValue, fieldName));
    +                break;
    +            case FLOAT:
    +                inputDocument.addField(fieldName,DataTypeUtils.toFloat(coercedValue, fieldName));
    +                break;
    +            case LONG:
    +                inputDocument.addField(fieldName,DataTypeUtils.toLong(coercedValue, fieldName));
    +                break;
    +            case INT:
    +            case BYTE:
    +            case SHORT:
    +                inputDocument.addField(fieldName,DataTypeUtils.toInteger(coercedValue, fieldName));
    +                break;
    +            case CHAR:
    +            case STRING:
    +                inputDocument.addField(fieldName,coercedValue.toString());
    +                break;
    +            case BIGINT:
    +                if (coercedValue instanceof Long) {
    +                    inputDocument.addField(fieldName,(Long) coercedValue);
    +                } else {
    +                    inputDocument.addField(fieldName,(BigInteger) coercedValue);
    +                }
    +                break;
    +            case BOOLEAN:
    +                final String stringValue = coercedValue.toString();
    +                if ("true".equalsIgnoreCase(stringValue)) {
    +                    inputDocument.addField(fieldName,true);
    +                } else if ("false".equalsIgnoreCase(stringValue)) {
    +                    inputDocument.addField(fieldName,false);
    +                } else {
    +                    inputDocument.addField(fieldName,stringValue);
    +                }
    +                break;
    +            case RECORD: {
    --- End diff --
    
    Another approach that can be taken is to consider the nested records as child documents. Which would mean that every nested record in the array is a child document for the main record in solr 


---

[GitHub] nifi issue #2561: NIFI-4035 Implement record-based Solr processors

Posted by abhinavrohatgi30 <gi...@git.apache.org>.
Github user abhinavrohatgi30 commented on the issue:

    https://github.com/apache/nifi/pull/2561
  
    @bbende I'll have a look at this and write test cases accordingly.


---

[GitHub] nifi pull request #2561: NIFI-4035 Implement record-based Solr processors

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2561#discussion_r175810746
  
    --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java ---
    @@ -280,5 +291,115 @@ public SolrInputDocument toSolrInputDocument(SolrDocument d) {
             }
         }
     
    +    /**
    +     * Writes each Record as a SolrInputDocument.
    +     */
    +    public static void writeRecord(final Record record, final RecordSchema writeSchema, final SolrInputDocument inputDocument,final List<String> fieldsToIndex)
    +            throws IOException {
    +        RecordSchema schema = record.getSchema();
    +
    +        for (int i = 0; i < schema.getFieldCount(); i++) {
    +            final RecordField field = schema.getField(i);
    +            final String fieldName = field.getFieldName();
    +            final Object value = record.getValue(field);
    +            if (value == null || (!fieldsToIndex.isEmpty() && !fieldsToIndex.contains(fieldName))) {
    +                continue;
    +            }else {
    +                final DataType dataType = schema.getDataType(fieldName).get();
    +                writeValue(inputDocument, value, fieldName, dataType,fieldsToIndex);
    +            }
    +        }
    +    }
     
    +    private static void writeValue(final SolrInputDocument inputDocument, final Object value, final String fieldName, final DataType dataType,final List<String> fieldsToIndex) throws IOException {
    +        final DataType chosenDataType = dataType.getFieldType() == RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType(value, (ChoiceDataType) dataType) : dataType;
    +        final Object coercedValue = DataTypeUtils.convertType(value, chosenDataType, fieldName);
    +        if (coercedValue == null) {
    +            return;
    +        }
    +
    +        switch (chosenDataType.getFieldType()) {
    +            case DATE: {
    +                final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat()));
    +                if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
    +                    LocalDate localDate = getLocalDateFromEpochTime(fieldName, coercedValue);
    +                    inputDocument.addField(fieldName,localDate.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                } else {
    +                    inputDocument.addField(fieldName,LocalDate.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                }
    +                break;
    +            }
    +            case TIMESTAMP: {
    +                final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat()));
    +                if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
    +                    LocalDateTime localDateTime = getLocalDateTimeFromEpochTime(fieldName, coercedValue);
    +                    inputDocument.addField(fieldName,localDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                } else {
    +                    inputDocument.addField(fieldName,LocalDateTime.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                }
    +                break;
    +            }
    +            case DOUBLE:
    +                inputDocument.addField(fieldName,DataTypeUtils.toDouble(coercedValue, fieldName));
    +                break;
    +            case FLOAT:
    +                inputDocument.addField(fieldName,DataTypeUtils.toFloat(coercedValue, fieldName));
    +                break;
    +            case LONG:
    +                inputDocument.addField(fieldName,DataTypeUtils.toLong(coercedValue, fieldName));
    +                break;
    +            case INT:
    +            case BYTE:
    +            case SHORT:
    +                inputDocument.addField(fieldName,DataTypeUtils.toInteger(coercedValue, fieldName));
    +                break;
    +            case CHAR:
    +            case STRING:
    +                inputDocument.addField(fieldName,coercedValue.toString());
    +                break;
    +            case BIGINT:
    +                if (coercedValue instanceof Long) {
    +                    inputDocument.addField(fieldName,(Long) coercedValue);
    +                } else {
    +                    inputDocument.addField(fieldName,(BigInteger) coercedValue);
    +                }
    +                break;
    +            case BOOLEAN:
    +                final String stringValue = coercedValue.toString();
    +                if ("true".equalsIgnoreCase(stringValue)) {
    +                    inputDocument.addField(fieldName,true);
    +                } else if ("false".equalsIgnoreCase(stringValue)) {
    +                    inputDocument.addField(fieldName,false);
    +                } else {
    +                    inputDocument.addField(fieldName,stringValue);
    +                }
    +                break;
    +            case RECORD: {
    --- End diff --
    
    This also brings up another scenario... what do we do if there is an array field, and the type of the elements in the array is a record?
    
    That would be similar to the "exams" array in the above example. With Solr's JSON update handler you would have to say split=/exams and this produces a Solr document for each exam. 
    
    I'm actually not sure what Solr does if you left off the split param because then you would have multiple fields with the same name in the same document, like exams.subject would be there twice.


---

[GitHub] nifi issue #2561: NIFI-4035 Implement record-based Solr processors

Posted by abhinavrohatgi30 <gi...@git.apache.org>.
Github user abhinavrohatgi30 commented on the issue:

    https://github.com/apache/nifi/pull/2561
  
    I'm really sorry, it might take a while, I'm on a vacation and away from my workstation. I'll keep you updated as soon as I am back.



---

[GitHub] nifi pull request #2561: NIFI-4035 Implement record-based Solr processors

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2561#discussion_r175746168
  
    --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/util/MockRecordParser.java ---
    @@ -0,0 +1,105 @@
    +/*
    --- End diff --
    
    See `TestPutHBaseRecord` for an example of how to use `org.apache.nifi.serialization.record.MockRecordParser` which should be able to replace this class.


---

[GitHub] nifi issue #2561: NIFI-4035 Implement record-based Solr processors

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2561
  
    @abhinavrohatgi30 While you were away, I merged another Solr-related commit and that's the reason you now have conflicts.


---

[GitHub] nifi issue #2561: NIFI-4035 Implement record-based Solr processors

Posted by abhinavrohatgi30 <gi...@git.apache.org>.
Github user abhinavrohatgi30 commented on the issue:

    https://github.com/apache/nifi/pull/2561
  
    Hi, I've looked at the comments and I've made the following changes as part of the latest commit that cover all the comments :
    
    1. Fixed the issue with Nested Records (The issue came up because of the change in field names in the previous commit)
    
    2. Fixed the issue with Array of Records (It was generating an Object[] as opposed to a Record[] that I was expecting and as a result was storing the string representation of a Record)
    
    3. Trimming field names individually
    
    4. Adding Test cases for Nested Record, Array of Record and Record Parser failure
    
    5. Using the getLogger() later in the code
    
    6. Wrapping the Jsons in the additionalDetails.html in a <pre> tag
      
    I hope the processor now works as expected, let me know if any further changes are to be made 
    
    Thanks


---

[GitHub] nifi issue #2561: NIFI-4035 Implement record-based Solr processors

Posted by abhinavrohatgi30 <gi...@git.apache.org>.
Github user abhinavrohatgi30 commented on the issue:

    https://github.com/apache/nifi/pull/2561
  
    I'm done with the changes that @bbende  and @MikeThomsen have suggested


---

[GitHub] nifi pull request #2561: NIFI-4035 Implement record-based Solr processors

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2561#discussion_r175760215
  
    --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrRecord.java ---
    @@ -0,0 +1,351 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.nifi.processors.solr;
    +
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.MalformedRecordException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.util.StopWatch;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.solr.client.solrj.SolrServerException;
    +import org.apache.solr.client.solrj.request.UpdateRequest;
    +import org.apache.solr.client.solrj.response.UpdateResponse;
    +import org.apache.solr.common.SolrException;
    +import org.apache.solr.common.SolrInputDocument;
    +import org.apache.solr.common.params.ModifiableSolrParams;
    +import org.apache.solr.common.params.MultiMapSolrParams;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.SortedMap;
    +import java.util.TreeMap;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD;
    +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME;
    +import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION;
    +import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_CLOUD;
    +import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE;
    +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.writeRecord;
    +
    +
    +@Tags({"Apache", "Solr", "Put", "Send","Record"})
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@CapabilityDescription("Indexes the Records from a FlowFile into Solr")
    +@DynamicProperty(name="A Solr request parameter name", value="A Solr request parameter value",
    +        description="These parameters will be passed to Solr on the request")
    +public class PutSolrRecord extends SolrProcessor {
    +
    +    public static final PropertyDescriptor UPDATE_PATH = new PropertyDescriptor
    +            .Builder().name("Solr Update Path")
    +            .description("The path in Solr to post the Flowfile Records")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("/update")
    +            .build();
    +
    +    public static final PropertyDescriptor FIELDS_TO_INDEX  = new PropertyDescriptor
    +            .Builder().name("Fields To Index")
    +            .displayName("Fields To Index")
    +            .description("Comma-separated list of field names to write")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor COMMIT_WITHIN = new PropertyDescriptor
    +            .Builder().name("Commit Within")
    +            .description("The number of milliseconds before the given update is committed")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("5000")
    +            .build();
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("The original FlowFile")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("FlowFiles that failed for any reason other than Solr being unreachable")
    +            .build();
    +
    +    public static final Relationship REL_CONNECTION_FAILURE = new Relationship.Builder()
    --- End diff --
    
    The other Solr processors have a connection failure relationship and it makes it easier to route connection failure back to self, and then send other failure relationship somewhere else because the other failures are likely permanent failures that will never work


---

[GitHub] nifi issue #2561: NIFI-4035 Implement record-based Solr processors

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on the issue:

    https://github.com/apache/nifi/pull/2561
  
    I was able to resolve the conflicts and everything looks good now, going to merge, thanks!


---

[GitHub] nifi pull request #2561: NIFI-4035 Implement record-based Solr processors

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2561#discussion_r176411279
  
    --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrRecord.java ---
    @@ -0,0 +1,351 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.nifi.processors.solr;
    +
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.MalformedRecordException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.util.StopWatch;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.solr.client.solrj.SolrServerException;
    +import org.apache.solr.client.solrj.request.UpdateRequest;
    +import org.apache.solr.client.solrj.response.UpdateResponse;
    +import org.apache.solr.common.SolrException;
    +import org.apache.solr.common.SolrInputDocument;
    +import org.apache.solr.common.params.ModifiableSolrParams;
    +import org.apache.solr.common.params.MultiMapSolrParams;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.SortedMap;
    +import java.util.TreeMap;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD;
    +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME;
    +import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION;
    +import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_CLOUD;
    +import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE;
    +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.writeRecord;
    +
    +
    +@Tags({"Apache", "Solr", "Put", "Send","Record"})
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@CapabilityDescription("Indexes the Records from a FlowFile into Solr")
    +@DynamicProperty(name="A Solr request parameter name", value="A Solr request parameter value",
    +        description="These parameters will be passed to Solr on the request")
    +public class PutSolrRecord extends SolrProcessor {
    +
    +    public static final PropertyDescriptor UPDATE_PATH = new PropertyDescriptor
    +            .Builder().name("Solr Update Path")
    +            .description("The path in Solr to post the Flowfile Records")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("/update")
    +            .build();
    +
    +    public static final PropertyDescriptor FIELDS_TO_INDEX  = new PropertyDescriptor
    +            .Builder().name("Fields To Index")
    +            .displayName("Fields To Index")
    +            .description("Comma-separated list of field names to write")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor COMMIT_WITHIN = new PropertyDescriptor
    +            .Builder().name("Commit Within")
    +            .description("The number of milliseconds before the given update is committed")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("5000")
    +            .build();
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("The original FlowFile")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("FlowFiles that failed for any reason other than Solr being unreachable")
    +            .build();
    +
    +    public static final Relationship REL_CONNECTION_FAILURE = new Relationship.Builder()
    +            .name("connection_failure")
    +            .description("FlowFiles that failed because Solr is unreachable")
    +            .build();
    +
    +    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
    +            .name("put-solr-record-record-reader")
    +            .displayName("Record Reader")
    +            .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.")
    +            .identifiesControllerService(RecordReaderFactory.class)
    +            .required(true)
    +            .build();
    +
    +    public static final String COLLECTION_PARAM_NAME = "collection";
    +    public static final String COMMIT_WITHIN_PARAM_NAME = "commitWithin";
    +    public static final String REPEATING_PARAM_PATTERN = "\\w+\\.\\d+";
    +    public  final ComponentLog logger = getLogger();
    +
    +    private Set<Relationship> relationships;
    +    private List<PropertyDescriptor> descriptors;
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        super.init(context);
    +
    +        final List<PropertyDescriptor> descriptors = new ArrayList<>();
    +        descriptors.add(SOLR_TYPE);
    +        descriptors.add(SOLR_LOCATION);
    +        descriptors.add(COLLECTION);
    +        descriptors.add(UPDATE_PATH);
    +        descriptors.add(RECORD_READER);
    +        descriptors.add(FIELDS_TO_INDEX);
    +        descriptors.add(COMMIT_WITHIN);
    +        descriptors.add(JAAS_CLIENT_APP_NAME);
    +        descriptors.add(BASIC_USERNAME);
    +        descriptors.add(BASIC_PASSWORD);
    +        descriptors.add(SSL_CONTEXT_SERVICE);
    +        descriptors.add(SOLR_SOCKET_TIMEOUT);
    +        descriptors.add(SOLR_CONNECTION_TIMEOUT);
    +        descriptors.add(SOLR_MAX_CONNECTIONS);
    +        descriptors.add(SOLR_MAX_CONNECTIONS_PER_HOST);
    +        descriptors.add(ZK_CLIENT_TIMEOUT);
    +        descriptors.add(ZK_CONNECTION_TIMEOUT);
    +        this.descriptors = Collections.unmodifiableList(descriptors);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        relationships.add(REL_FAILURE);
    +        relationships.add(REL_CONNECTION_FAILURE);
    +        this.relationships = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return this.relationships;
    +    }
    +
    +    @Override
    +    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return this.descriptors;
    +    }
    +
    +    @Override
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +                .description("Specifies the value to send for the '" + propertyDescriptorName + "' request parameter")
    +                .name(propertyDescriptorName)
    +                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +                .dynamic(true)
    +                .expressionLanguageSupported(true)
    +                .build();
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if ( flowFile == null ) {
    +            return;
    +        }
    +
    +        final AtomicReference<Exception> error = new AtomicReference<>(null);
    +        final AtomicReference<Exception> connectionError = new AtomicReference<>(null);
    +
    +        final boolean isSolrCloud = SOLR_TYPE_CLOUD.getValue().equals(context.getProperty(SOLR_TYPE).getValue());
    +        final String collection = context.getProperty(COLLECTION).evaluateAttributeExpressions(flowFile).getValue();
    +        final Long commitWithin = context.getProperty(COMMIT_WITHIN).evaluateAttributeExpressions(flowFile).asLong();
    +        final String contentStreamPath = context.getProperty(UPDATE_PATH).evaluateAttributeExpressions(flowFile).getValue();
    +        final MultiMapSolrParams requestParams = new MultiMapSolrParams(getRequestParams(context, flowFile));
    +        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
    +        final String fieldsToIndex = context.getProperty(FIELDS_TO_INDEX).getValue();
    +
    +        final List<String> fieldList = new ArrayList<>();
    +        if (!StringUtils.isBlank(fieldsToIndex)) {
    +            fieldList.addAll(Arrays.asList(fieldsToIndex.trim().split("[,]")));
    +        }
    +        StopWatch timer = new StopWatch(true);
    +        try (final InputStream in = session.read(flowFile);
    +             final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
    +
    +            Record record;
    +            List<SolrInputDocument> inputDocumentList = new LinkedList<>();
    --- End diff --
    
    We could do more complicated things like try to keep inserting records and write out all the failed records to a new flow file, but in my experience this usually becomes complex and error prone.
    
    In this case there should be very scenarios where specific records are failing because in order to be read by the record reader they already have to conform to the schema. So every SolrDocument being created will already have the same field names with the same types of values. If there is chance of invalid data that doesn't conform to the schema, then this can be handled before this processor using ValidateRecord.
    
    So long story short, I think just keep it simple, and if anything fails for a reason other than connection exception, than just route the whole flow file to failure.


---

[GitHub] nifi pull request #2561: NIFI-4035 Implement record-based Solr processors

Posted by abhinavrohatgi30 <gi...@git.apache.org>.
Github user abhinavrohatgi30 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2561#discussion_r175799302
  
    --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrRecord.java ---
    @@ -0,0 +1,351 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.nifi.processors.solr;
    +
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.MalformedRecordException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.util.StopWatch;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.solr.client.solrj.SolrServerException;
    +import org.apache.solr.client.solrj.request.UpdateRequest;
    +import org.apache.solr.client.solrj.response.UpdateResponse;
    +import org.apache.solr.common.SolrException;
    +import org.apache.solr.common.SolrInputDocument;
    +import org.apache.solr.common.params.ModifiableSolrParams;
    +import org.apache.solr.common.params.MultiMapSolrParams;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.SortedMap;
    +import java.util.TreeMap;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD;
    +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME;
    +import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION;
    +import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_CLOUD;
    +import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE;
    +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.writeRecord;
    +
    +
    +@Tags({"Apache", "Solr", "Put", "Send","Record"})
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@CapabilityDescription("Indexes the Records from a FlowFile into Solr")
    +@DynamicProperty(name="A Solr request parameter name", value="A Solr request parameter value",
    +        description="These parameters will be passed to Solr on the request")
    +public class PutSolrRecord extends SolrProcessor {
    +
    +    public static final PropertyDescriptor UPDATE_PATH = new PropertyDescriptor
    +            .Builder().name("Solr Update Path")
    +            .description("The path in Solr to post the Flowfile Records")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("/update")
    +            .build();
    +
    +    public static final PropertyDescriptor FIELDS_TO_INDEX  = new PropertyDescriptor
    +            .Builder().name("Fields To Index")
    +            .displayName("Fields To Index")
    +            .description("Comma-separated list of field names to write")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor COMMIT_WITHIN = new PropertyDescriptor
    +            .Builder().name("Commit Within")
    +            .description("The number of milliseconds before the given update is committed")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("5000")
    +            .build();
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("The original FlowFile")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("FlowFiles that failed for any reason other than Solr being unreachable")
    +            .build();
    +
    +    public static final Relationship REL_CONNECTION_FAILURE = new Relationship.Builder()
    +            .name("connection_failure")
    +            .description("FlowFiles that failed because Solr is unreachable")
    +            .build();
    +
    +    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
    +            .name("put-solr-record-record-reader")
    +            .displayName("Record Reader")
    +            .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.")
    +            .identifiesControllerService(RecordReaderFactory.class)
    +            .required(true)
    +            .build();
    +
    +    public static final String COLLECTION_PARAM_NAME = "collection";
    +    public static final String COMMIT_WITHIN_PARAM_NAME = "commitWithin";
    +    public static final String REPEATING_PARAM_PATTERN = "\\w+\\.\\d+";
    +    public  final ComponentLog logger = getLogger();
    +
    +    private Set<Relationship> relationships;
    +    private List<PropertyDescriptor> descriptors;
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        super.init(context);
    +
    +        final List<PropertyDescriptor> descriptors = new ArrayList<>();
    +        descriptors.add(SOLR_TYPE);
    +        descriptors.add(SOLR_LOCATION);
    +        descriptors.add(COLLECTION);
    +        descriptors.add(UPDATE_PATH);
    +        descriptors.add(RECORD_READER);
    +        descriptors.add(FIELDS_TO_INDEX);
    +        descriptors.add(COMMIT_WITHIN);
    +        descriptors.add(JAAS_CLIENT_APP_NAME);
    +        descriptors.add(BASIC_USERNAME);
    +        descriptors.add(BASIC_PASSWORD);
    +        descriptors.add(SSL_CONTEXT_SERVICE);
    +        descriptors.add(SOLR_SOCKET_TIMEOUT);
    +        descriptors.add(SOLR_CONNECTION_TIMEOUT);
    +        descriptors.add(SOLR_MAX_CONNECTIONS);
    +        descriptors.add(SOLR_MAX_CONNECTIONS_PER_HOST);
    +        descriptors.add(ZK_CLIENT_TIMEOUT);
    +        descriptors.add(ZK_CONNECTION_TIMEOUT);
    +        this.descriptors = Collections.unmodifiableList(descriptors);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        relationships.add(REL_FAILURE);
    +        relationships.add(REL_CONNECTION_FAILURE);
    +        this.relationships = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return this.relationships;
    +    }
    +
    +    @Override
    +    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return this.descriptors;
    +    }
    +
    +    @Override
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +                .description("Specifies the value to send for the '" + propertyDescriptorName + "' request parameter")
    +                .name(propertyDescriptorName)
    +                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +                .dynamic(true)
    +                .expressionLanguageSupported(true)
    +                .build();
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if ( flowFile == null ) {
    +            return;
    +        }
    +
    +        final AtomicReference<Exception> error = new AtomicReference<>(null);
    +        final AtomicReference<Exception> connectionError = new AtomicReference<>(null);
    +
    +        final boolean isSolrCloud = SOLR_TYPE_CLOUD.getValue().equals(context.getProperty(SOLR_TYPE).getValue());
    +        final String collection = context.getProperty(COLLECTION).evaluateAttributeExpressions(flowFile).getValue();
    +        final Long commitWithin = context.getProperty(COMMIT_WITHIN).evaluateAttributeExpressions(flowFile).asLong();
    +        final String contentStreamPath = context.getProperty(UPDATE_PATH).evaluateAttributeExpressions(flowFile).getValue();
    +        final MultiMapSolrParams requestParams = new MultiMapSolrParams(getRequestParams(context, flowFile));
    +        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
    +        final String fieldsToIndex = context.getProperty(FIELDS_TO_INDEX).getValue();
    +
    +        final List<String> fieldList = new ArrayList<>();
    +        if (!StringUtils.isBlank(fieldsToIndex)) {
    +            fieldList.addAll(Arrays.asList(fieldsToIndex.trim().split("[,]")));
    +        }
    +        StopWatch timer = new StopWatch(true);
    +        try (final InputStream in = session.read(flowFile);
    +             final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
    +
    +            Record record;
    +            List<SolrInputDocument> inputDocumentList = new LinkedList<>();
    --- End diff --
    
    Sure, I'll add a parameter to accept a batch size and modify the processor to index solr documents in batches.


---

[GitHub] nifi pull request #2561: NIFI-4035 Implement record-based Solr processors

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2561#discussion_r175863187
  
    --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrRecord.java ---
    @@ -0,0 +1,351 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.nifi.processors.solr;
    +
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.MalformedRecordException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.util.StopWatch;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.solr.client.solrj.SolrServerException;
    +import org.apache.solr.client.solrj.request.UpdateRequest;
    +import org.apache.solr.client.solrj.response.UpdateResponse;
    +import org.apache.solr.common.SolrException;
    +import org.apache.solr.common.SolrInputDocument;
    +import org.apache.solr.common.params.ModifiableSolrParams;
    +import org.apache.solr.common.params.MultiMapSolrParams;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.SortedMap;
    +import java.util.TreeMap;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD;
    +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME;
    +import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION;
    +import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_CLOUD;
    +import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE;
    +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.writeRecord;
    +
    +
    +@Tags({"Apache", "Solr", "Put", "Send","Record"})
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@CapabilityDescription("Indexes the Records from a FlowFile into Solr")
    +@DynamicProperty(name="A Solr request parameter name", value="A Solr request parameter value",
    +        description="These parameters will be passed to Solr on the request")
    +public class PutSolrRecord extends SolrProcessor {
    +
    +    public static final PropertyDescriptor UPDATE_PATH = new PropertyDescriptor
    +            .Builder().name("Solr Update Path")
    +            .description("The path in Solr to post the Flowfile Records")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("/update")
    +            .build();
    +
    +    public static final PropertyDescriptor FIELDS_TO_INDEX  = new PropertyDescriptor
    +            .Builder().name("Fields To Index")
    +            .displayName("Fields To Index")
    +            .description("Comma-separated list of field names to write")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor COMMIT_WITHIN = new PropertyDescriptor
    +            .Builder().name("Commit Within")
    +            .description("The number of milliseconds before the given update is committed")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("5000")
    +            .build();
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("The original FlowFile")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("FlowFiles that failed for any reason other than Solr being unreachable")
    +            .build();
    +
    +    public static final Relationship REL_CONNECTION_FAILURE = new Relationship.Builder()
    +            .name("connection_failure")
    +            .description("FlowFiles that failed because Solr is unreachable")
    +            .build();
    +
    +    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
    +            .name("put-solr-record-record-reader")
    +            .displayName("Record Reader")
    +            .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.")
    +            .identifiesControllerService(RecordReaderFactory.class)
    +            .required(true)
    +            .build();
    +
    +    public static final String COLLECTION_PARAM_NAME = "collection";
    +    public static final String COMMIT_WITHIN_PARAM_NAME = "commitWithin";
    +    public static final String REPEATING_PARAM_PATTERN = "\\w+\\.\\d+";
    +    public  final ComponentLog logger = getLogger();
    +
    +    private Set<Relationship> relationships;
    +    private List<PropertyDescriptor> descriptors;
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        super.init(context);
    +
    +        final List<PropertyDescriptor> descriptors = new ArrayList<>();
    +        descriptors.add(SOLR_TYPE);
    +        descriptors.add(SOLR_LOCATION);
    +        descriptors.add(COLLECTION);
    +        descriptors.add(UPDATE_PATH);
    +        descriptors.add(RECORD_READER);
    +        descriptors.add(FIELDS_TO_INDEX);
    +        descriptors.add(COMMIT_WITHIN);
    +        descriptors.add(JAAS_CLIENT_APP_NAME);
    +        descriptors.add(BASIC_USERNAME);
    +        descriptors.add(BASIC_PASSWORD);
    +        descriptors.add(SSL_CONTEXT_SERVICE);
    +        descriptors.add(SOLR_SOCKET_TIMEOUT);
    +        descriptors.add(SOLR_CONNECTION_TIMEOUT);
    +        descriptors.add(SOLR_MAX_CONNECTIONS);
    +        descriptors.add(SOLR_MAX_CONNECTIONS_PER_HOST);
    +        descriptors.add(ZK_CLIENT_TIMEOUT);
    +        descriptors.add(ZK_CONNECTION_TIMEOUT);
    +        this.descriptors = Collections.unmodifiableList(descriptors);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        relationships.add(REL_FAILURE);
    +        relationships.add(REL_CONNECTION_FAILURE);
    +        this.relationships = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return this.relationships;
    +    }
    +
    +    @Override
    +    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return this.descriptors;
    +    }
    +
    +    @Override
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +                .description("Specifies the value to send for the '" + propertyDescriptorName + "' request parameter")
    +                .name(propertyDescriptorName)
    +                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +                .dynamic(true)
    +                .expressionLanguageSupported(true)
    +                .build();
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if ( flowFile == null ) {
    +            return;
    +        }
    +
    +        final AtomicReference<Exception> error = new AtomicReference<>(null);
    +        final AtomicReference<Exception> connectionError = new AtomicReference<>(null);
    +
    +        final boolean isSolrCloud = SOLR_TYPE_CLOUD.getValue().equals(context.getProperty(SOLR_TYPE).getValue());
    +        final String collection = context.getProperty(COLLECTION).evaluateAttributeExpressions(flowFile).getValue();
    +        final Long commitWithin = context.getProperty(COMMIT_WITHIN).evaluateAttributeExpressions(flowFile).asLong();
    +        final String contentStreamPath = context.getProperty(UPDATE_PATH).evaluateAttributeExpressions(flowFile).getValue();
    +        final MultiMapSolrParams requestParams = new MultiMapSolrParams(getRequestParams(context, flowFile));
    +        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
    +        final String fieldsToIndex = context.getProperty(FIELDS_TO_INDEX).getValue();
    --- End diff --
    
    Based on above comment this would need .evaluateAttributeExpressions(flowFile)


---

[GitHub] nifi pull request #2561: NIFI-4035 Implement record-based Solr processors

Posted by abhinavrohatgi30 <gi...@git.apache.org>.
Github user abhinavrohatgi30 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2561#discussion_r176208228
  
    --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java ---
    @@ -280,5 +291,115 @@ public SolrInputDocument toSolrInputDocument(SolrDocument d) {
             }
         }
     
    +    /**
    +     * Writes each Record as a SolrInputDocument.
    +     */
    +    public static void writeRecord(final Record record, final RecordSchema writeSchema, final SolrInputDocument inputDocument,final List<String> fieldsToIndex)
    +            throws IOException {
    +        RecordSchema schema = record.getSchema();
    +
    +        for (int i = 0; i < schema.getFieldCount(); i++) {
    +            final RecordField field = schema.getField(i);
    +            final String fieldName = field.getFieldName();
    +            final Object value = record.getValue(field);
    +            if (value == null || (!fieldsToIndex.isEmpty() && !fieldsToIndex.contains(fieldName))) {
    +                continue;
    +            }else {
    +                final DataType dataType = schema.getDataType(fieldName).get();
    +                writeValue(inputDocument, value, fieldName, dataType,fieldsToIndex);
    +            }
    +        }
    +    }
     
    +    private static void writeValue(final SolrInputDocument inputDocument, final Object value, final String fieldName, final DataType dataType,final List<String> fieldsToIndex) throws IOException {
    +        final DataType chosenDataType = dataType.getFieldType() == RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType(value, (ChoiceDataType) dataType) : dataType;
    +        final Object coercedValue = DataTypeUtils.convertType(value, chosenDataType, fieldName);
    +        if (coercedValue == null) {
    +            return;
    +        }
    +
    +        switch (chosenDataType.getFieldType()) {
    +            case DATE: {
    +                final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat()));
    +                if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
    +                    LocalDate localDate = getLocalDateFromEpochTime(fieldName, coercedValue);
    +                    inputDocument.addField(fieldName,localDate.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                } else {
    +                    inputDocument.addField(fieldName,LocalDate.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                }
    +                break;
    +            }
    +            case TIMESTAMP: {
    +                final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat()));
    +                if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
    +                    LocalDateTime localDateTime = getLocalDateTimeFromEpochTime(fieldName, coercedValue);
    +                    inputDocument.addField(fieldName,localDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                } else {
    +                    inputDocument.addField(fieldName,LocalDateTime.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                }
    +                break;
    +            }
    +            case DOUBLE:
    +                inputDocument.addField(fieldName,DataTypeUtils.toDouble(coercedValue, fieldName));
    +                break;
    +            case FLOAT:
    +                inputDocument.addField(fieldName,DataTypeUtils.toFloat(coercedValue, fieldName));
    +                break;
    +            case LONG:
    +                inputDocument.addField(fieldName,DataTypeUtils.toLong(coercedValue, fieldName));
    +                break;
    +            case INT:
    +            case BYTE:
    +            case SHORT:
    +                inputDocument.addField(fieldName,DataTypeUtils.toInteger(coercedValue, fieldName));
    +                break;
    +            case CHAR:
    +            case STRING:
    +                inputDocument.addField(fieldName,coercedValue.toString());
    +                break;
    +            case BIGINT:
    +                if (coercedValue instanceof Long) {
    +                    inputDocument.addField(fieldName,(Long) coercedValue);
    +                } else {
    +                    inputDocument.addField(fieldName,(BigInteger) coercedValue);
    +                }
    +                break;
    +            case BOOLEAN:
    +                final String stringValue = coercedValue.toString();
    +                if ("true".equalsIgnoreCase(stringValue)) {
    +                    inputDocument.addField(fieldName,true);
    +                } else if ("false".equalsIgnoreCase(stringValue)) {
    +                    inputDocument.addField(fieldName,false);
    +                } else {
    +                    inputDocument.addField(fieldName,stringValue);
    +                }
    +                break;
    +            case RECORD: {
    --- End diff --
    
    I've added examples around the behavior of nested records in the additionalDetails.html and I've modified the field names to include the parent field name


---

[GitHub] nifi issue #2561: NIFI-4035 Implement record-based Solr processors

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on the issue:

    https://github.com/apache/nifi/pull/2561
  
    Thanks, will try to take a look in a few days, unless someone gets to it first.


---

[GitHub] nifi pull request #2561: NIFI-4035 Implement record-based Solr processors

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2561#discussion_r178643917
  
    --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrRecord.java ---
    @@ -0,0 +1,373 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.nifi.processors.solr;
    +
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.MalformedRecordException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.util.StopWatch;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.solr.client.solrj.SolrServerException;
    +import org.apache.solr.client.solrj.request.UpdateRequest;
    +import org.apache.solr.client.solrj.response.UpdateResponse;
    +import org.apache.solr.common.SolrException;
    +import org.apache.solr.common.SolrInputDocument;
    +import org.apache.solr.common.params.ModifiableSolrParams;
    +import org.apache.solr.common.params.MultiMapSolrParams;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.SortedMap;
    +import java.util.TreeMap;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD;
    +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME;
    +import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION;
    +import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_CLOUD;
    +import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE;
    +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.writeRecord;
    +
    +
    +@Tags({"Apache", "Solr", "Put", "Send","Record"})
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@CapabilityDescription("Indexes the Records from a FlowFile into Solr")
    +@DynamicProperty(name="A Solr request parameter name", value="A Solr request parameter value",
    +        description="These parameters will be passed to Solr on the request")
    +public class PutSolrRecord extends SolrProcessor {
    +
    +    public static final PropertyDescriptor UPDATE_PATH = new PropertyDescriptor
    +            .Builder().name("Solr Update Path").displayName("Solr Update Path")
    +            .description("The path in Solr to post the Flowfile Records")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("/update")
    +            .build();
    +
    +    public static final PropertyDescriptor FIELDS_TO_INDEX  = new PropertyDescriptor
    +            .Builder().name("Fields To Index").displayName("Fields To Index")
    +            .displayName("Fields To Index")
    +            .description("Comma-separated list of field names to write")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    public static final PropertyDescriptor COMMIT_WITHIN = new PropertyDescriptor
    +            .Builder().name("Commit Within").displayName("Commit Within")
    +            .description("The number of milliseconds before the given update is committed")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("5000")
    +            .build();
    +
    +    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor
    +            .Builder().name("Batch Size").displayName("Batch Size")
    +            .description("The number of solr documents to index per batch")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("500")
    +            .build();
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("The original FlowFile")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("FlowFiles that failed for any reason other than Solr being unreachable")
    +            .build();
    +
    +    public static final Relationship REL_CONNECTION_FAILURE = new Relationship.Builder()
    +            .name("connection_failure")
    +            .description("FlowFiles that failed because Solr is unreachable")
    +            .build();
    +
    +    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
    +            .name("put-solr-record-record-reader").displayName("put-solr-record-record-reader")
    +            .displayName("Record Reader")
    +            .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.")
    +            .identifiesControllerService(RecordReaderFactory.class)
    +            .required(true)
    +            .build();
    +
    +    public static final String COLLECTION_PARAM_NAME = "collection";
    +    public static final String COMMIT_WITHIN_PARAM_NAME = "commitWithin";
    +    public static final String REPEATING_PARAM_PATTERN = "\\w+\\.\\d+";
    +    public  final ComponentLog logger = getLogger();
    +
    +    private Set<Relationship> relationships;
    +    private List<PropertyDescriptor> descriptors;
    +    private static final String EMPTY_STRING = "";
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        super.init(context);
    +
    +        final List<PropertyDescriptor> descriptors = new ArrayList<>();
    +        descriptors.add(SOLR_TYPE);
    +        descriptors.add(SOLR_LOCATION);
    +        descriptors.add(COLLECTION);
    +        descriptors.add(UPDATE_PATH);
    +        descriptors.add(RECORD_READER);
    +        descriptors.add(FIELDS_TO_INDEX);
    +        descriptors.add(COMMIT_WITHIN);
    +        descriptors.add(JAAS_CLIENT_APP_NAME);
    +        descriptors.add(BASIC_USERNAME);
    +        descriptors.add(BASIC_PASSWORD);
    +        descriptors.add(SSL_CONTEXT_SERVICE);
    +        descriptors.add(SOLR_SOCKET_TIMEOUT);
    +        descriptors.add(SOLR_CONNECTION_TIMEOUT);
    +        descriptors.add(SOLR_MAX_CONNECTIONS);
    +        descriptors.add(SOLR_MAX_CONNECTIONS_PER_HOST);
    +        descriptors.add(ZK_CLIENT_TIMEOUT);
    +        descriptors.add(ZK_CONNECTION_TIMEOUT);
    +        descriptors.add(BATCH_SIZE);
    +        this.descriptors = Collections.unmodifiableList(descriptors);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        relationships.add(REL_FAILURE);
    +        relationships.add(REL_CONNECTION_FAILURE);
    +        this.relationships = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return this.relationships;
    +    }
    +
    +    @Override
    +    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return this.descriptors;
    +    }
    +
    +    @Override
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +                .description("Specifies the value to send for the '" + propertyDescriptorName + "' request parameter")
    +                .name(propertyDescriptorName)
    +                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +                .dynamic(true)
    +                .expressionLanguageSupported(true)
    +                .build();
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if ( flowFile == null ) {
    +            return;
    +        }
    +
    +        final AtomicReference<Exception> error = new AtomicReference<>(null);
    +        final AtomicReference<Exception> connectionError = new AtomicReference<>(null);
    +
    +        final boolean isSolrCloud = SOLR_TYPE_CLOUD.getValue().equals(context.getProperty(SOLR_TYPE).getValue());
    +        final String collection = context.getProperty(COLLECTION).evaluateAttributeExpressions(flowFile).getValue();
    +        final Long commitWithin = context.getProperty(COMMIT_WITHIN).evaluateAttributeExpressions(flowFile).asLong();
    +        final String contentStreamPath = context.getProperty(UPDATE_PATH).evaluateAttributeExpressions(flowFile).getValue();
    +        final MultiMapSolrParams requestParams = new MultiMapSolrParams(getRequestParams(context, flowFile));
    +        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
    +        final String fieldsToIndex = context.getProperty(FIELDS_TO_INDEX).evaluateAttributeExpressions(flowFile).getValue();
    +        final Long batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(flowFile).asLong();
    +
    +        final List<String> fieldList = new ArrayList<>();
    +        if (!StringUtils.isBlank(fieldsToIndex)) {
    +            fieldList.addAll(Arrays.asList(fieldsToIndex.trim().split("[,]")));
    --- End diff --
    
    This is only trimming leading/trailing spaces on the entire value, but we should trim each field too in case someone enters "field1, field2" instead of "field1,field2".


---

[GitHub] nifi pull request #2561: NIFI-4035 Implement record-based Solr processors

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/nifi/pull/2561


---

[GitHub] nifi pull request #2561: NIFI-4035 Implement record-based Solr processors

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2561#discussion_r175809550
  
    --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java ---
    @@ -280,5 +291,115 @@ public SolrInputDocument toSolrInputDocument(SolrDocument d) {
             }
         }
     
    +    /**
    +     * Writes each Record as a SolrInputDocument.
    +     */
    +    public static void writeRecord(final Record record, final RecordSchema writeSchema, final SolrInputDocument inputDocument,final List<String> fieldsToIndex)
    +            throws IOException {
    +        RecordSchema schema = record.getSchema();
    +
    +        for (int i = 0; i < schema.getFieldCount(); i++) {
    +            final RecordField field = schema.getField(i);
    +            final String fieldName = field.getFieldName();
    +            final Object value = record.getValue(field);
    +            if (value == null || (!fieldsToIndex.isEmpty() && !fieldsToIndex.contains(fieldName))) {
    +                continue;
    +            }else {
    +                final DataType dataType = schema.getDataType(fieldName).get();
    +                writeValue(inputDocument, value, fieldName, dataType,fieldsToIndex);
    +            }
    +        }
    +    }
     
    +    private static void writeValue(final SolrInputDocument inputDocument, final Object value, final String fieldName, final DataType dataType,final List<String> fieldsToIndex) throws IOException {
    +        final DataType chosenDataType = dataType.getFieldType() == RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType(value, (ChoiceDataType) dataType) : dataType;
    +        final Object coercedValue = DataTypeUtils.convertType(value, chosenDataType, fieldName);
    +        if (coercedValue == null) {
    +            return;
    +        }
    +
    +        switch (chosenDataType.getFieldType()) {
    +            case DATE: {
    +                final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat()));
    +                if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
    +                    LocalDate localDate = getLocalDateFromEpochTime(fieldName, coercedValue);
    +                    inputDocument.addField(fieldName,localDate.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                } else {
    +                    inputDocument.addField(fieldName,LocalDate.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                }
    +                break;
    +            }
    +            case TIMESTAMP: {
    +                final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat()));
    +                if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
    +                    LocalDateTime localDateTime = getLocalDateTimeFromEpochTime(fieldName, coercedValue);
    +                    inputDocument.addField(fieldName,localDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                } else {
    +                    inputDocument.addField(fieldName,LocalDateTime.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                }
    +                break;
    +            }
    +            case DOUBLE:
    +                inputDocument.addField(fieldName,DataTypeUtils.toDouble(coercedValue, fieldName));
    +                break;
    +            case FLOAT:
    +                inputDocument.addField(fieldName,DataTypeUtils.toFloat(coercedValue, fieldName));
    +                break;
    +            case LONG:
    +                inputDocument.addField(fieldName,DataTypeUtils.toLong(coercedValue, fieldName));
    +                break;
    +            case INT:
    +            case BYTE:
    +            case SHORT:
    +                inputDocument.addField(fieldName,DataTypeUtils.toInteger(coercedValue, fieldName));
    +                break;
    +            case CHAR:
    +            case STRING:
    +                inputDocument.addField(fieldName,coercedValue.toString());
    +                break;
    +            case BIGINT:
    +                if (coercedValue instanceof Long) {
    +                    inputDocument.addField(fieldName,(Long) coercedValue);
    +                } else {
    +                    inputDocument.addField(fieldName,(BigInteger) coercedValue);
    +                }
    +                break;
    +            case BOOLEAN:
    +                final String stringValue = coercedValue.toString();
    +                if ("true".equalsIgnoreCase(stringValue)) {
    +                    inputDocument.addField(fieldName,true);
    +                } else if ("false".equalsIgnoreCase(stringValue)) {
    +                    inputDocument.addField(fieldName,false);
    +                } else {
    +                    inputDocument.addField(fieldName,stringValue);
    +                }
    +                break;
    +            case RECORD: {
    --- End diff --
    
    I think we have to handle it since someone can specify a field name in "fields to index" that could be of type record.
    
    I think it makes sense to have a property like "Nested Field Names" with choices for "Fully Qualified" and "Child Only" (or something like that).
    
    This lines up with how Solr's JSON update works:
    
    https://lucene.apache.org/solr/guide/6_6/transforming-and-indexing-custom-json.html#transforming-and-indexing-custom-json
    
    The part that shows....
    
    The default behavior is to use the fully qualified name (FQN) of the node. So, if we don’t define any field mappings, like this:
    
    curl 'http://localhost:8983/solr/my_collection/update/json/docs?split=/exams'\
        -H 'Content-type:application/json' -d '
    {
      "first": "John",
      "last": "Doe",
      "grade": 8,
      "exams": [
        {
          "subject": "Maths",
          "test"   : "term1",
          "marks"  : 90},
        {
          "subject": "Biology",
          "test"   : "term1",
          "marks"  : 86}
      ]
    }'
    
    The indexed documents would be added to the index with fields that look like this:
    
    {
      "first":"John",
      "last":"Doe",
      "grade":8,
      "exams.subject":"Maths",
      "exams.test":"term1",
      "exams.marks":90},
    {
      "first":"John",
      "last":"Doe",
      "grade":8,
      "exams.subject":"Biology",
      "exams.test":"term1",
      "exams.marks":86}
    



---

[GitHub] nifi pull request #2561: NIFI-4035 Implement record-based Solr processors

Posted by abhinavrohatgi30 <gi...@git.apache.org>.
Github user abhinavrohatgi30 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2561#discussion_r175839474
  
    --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java ---
    @@ -280,5 +291,115 @@ public SolrInputDocument toSolrInputDocument(SolrDocument d) {
             }
         }
     
    +    /**
    +     * Writes each Record as a SolrInputDocument.
    +     */
    +    public static void writeRecord(final Record record, final RecordSchema writeSchema, final SolrInputDocument inputDocument,final List<String> fieldsToIndex)
    +            throws IOException {
    +        RecordSchema schema = record.getSchema();
    +
    +        for (int i = 0; i < schema.getFieldCount(); i++) {
    +            final RecordField field = schema.getField(i);
    +            final String fieldName = field.getFieldName();
    +            final Object value = record.getValue(field);
    +            if (value == null || (!fieldsToIndex.isEmpty() && !fieldsToIndex.contains(fieldName))) {
    +                continue;
    +            }else {
    +                final DataType dataType = schema.getDataType(fieldName).get();
    +                writeValue(inputDocument, value, fieldName, dataType,fieldsToIndex);
    +            }
    +        }
    +    }
     
    +    private static void writeValue(final SolrInputDocument inputDocument, final Object value, final String fieldName, final DataType dataType,final List<String> fieldsToIndex) throws IOException {
    +        final DataType chosenDataType = dataType.getFieldType() == RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType(value, (ChoiceDataType) dataType) : dataType;
    +        final Object coercedValue = DataTypeUtils.convertType(value, chosenDataType, fieldName);
    +        if (coercedValue == null) {
    +            return;
    +        }
    +
    +        switch (chosenDataType.getFieldType()) {
    +            case DATE: {
    +                final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat()));
    +                if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
    +                    LocalDate localDate = getLocalDateFromEpochTime(fieldName, coercedValue);
    +                    inputDocument.addField(fieldName,localDate.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                } else {
    +                    inputDocument.addField(fieldName,LocalDate.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                }
    +                break;
    +            }
    +            case TIMESTAMP: {
    +                final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat()));
    +                if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
    +                    LocalDateTime localDateTime = getLocalDateTimeFromEpochTime(fieldName, coercedValue);
    +                    inputDocument.addField(fieldName,localDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                } else {
    +                    inputDocument.addField(fieldName,LocalDateTime.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                }
    +                break;
    +            }
    +            case DOUBLE:
    +                inputDocument.addField(fieldName,DataTypeUtils.toDouble(coercedValue, fieldName));
    +                break;
    +            case FLOAT:
    +                inputDocument.addField(fieldName,DataTypeUtils.toFloat(coercedValue, fieldName));
    +                break;
    +            case LONG:
    +                inputDocument.addField(fieldName,DataTypeUtils.toLong(coercedValue, fieldName));
    +                break;
    +            case INT:
    +            case BYTE:
    +            case SHORT:
    +                inputDocument.addField(fieldName,DataTypeUtils.toInteger(coercedValue, fieldName));
    +                break;
    +            case CHAR:
    +            case STRING:
    +                inputDocument.addField(fieldName,coercedValue.toString());
    +                break;
    +            case BIGINT:
    +                if (coercedValue instanceof Long) {
    +                    inputDocument.addField(fieldName,(Long) coercedValue);
    +                } else {
    +                    inputDocument.addField(fieldName,(BigInteger) coercedValue);
    +                }
    +                break;
    +            case BOOLEAN:
    +                final String stringValue = coercedValue.toString();
    +                if ("true".equalsIgnoreCase(stringValue)) {
    +                    inputDocument.addField(fieldName,true);
    +                } else if ("false".equalsIgnoreCase(stringValue)) {
    +                    inputDocument.addField(fieldName,false);
    +                } else {
    +                    inputDocument.addField(fieldName,stringValue);
    +                }
    +                break;
    +            case RECORD: {
    --- End diff --
    
    With the current code, it writes a single solr document for a record and flattens all the nested records in that single solr document.
    So if there is an array of nested records it would create multiple fields with the same key in the solr document which would eventually mean that the field would be indexed as multivalued in solr with the assumption that the schema has defined the field to be multivalued else it would fail to index.


---

[GitHub] nifi pull request #2561: NIFI-4035 Implement record-based Solr processors

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2561#discussion_r175744146
  
    --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrRecord.java ---
    @@ -0,0 +1,351 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.nifi.processors.solr;
    +
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.MalformedRecordException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.util.StopWatch;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.solr.client.solrj.SolrServerException;
    +import org.apache.solr.client.solrj.request.UpdateRequest;
    +import org.apache.solr.client.solrj.response.UpdateResponse;
    +import org.apache.solr.common.SolrException;
    +import org.apache.solr.common.SolrInputDocument;
    +import org.apache.solr.common.params.ModifiableSolrParams;
    +import org.apache.solr.common.params.MultiMapSolrParams;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.SortedMap;
    +import java.util.TreeMap;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD;
    +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME;
    +import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION;
    +import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_CLOUD;
    +import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE;
    +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.writeRecord;
    +
    +
    +@Tags({"Apache", "Solr", "Put", "Send","Record"})
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@CapabilityDescription("Indexes the Records from a FlowFile into Solr")
    +@DynamicProperty(name="A Solr request parameter name", value="A Solr request parameter value",
    +        description="These parameters will be passed to Solr on the request")
    +public class PutSolrRecord extends SolrProcessor {
    +
    +    public static final PropertyDescriptor UPDATE_PATH = new PropertyDescriptor
    +            .Builder().name("Solr Update Path")
    +            .description("The path in Solr to post the Flowfile Records")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("/update")
    +            .build();
    +
    +    public static final PropertyDescriptor FIELDS_TO_INDEX  = new PropertyDescriptor
    +            .Builder().name("Fields To Index")
    +            .displayName("Fields To Index")
    +            .description("Comma-separated list of field names to write")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor COMMIT_WITHIN = new PropertyDescriptor
    +            .Builder().name("Commit Within")
    +            .description("The number of milliseconds before the given update is committed")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("5000")
    +            .build();
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("The original FlowFile")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("FlowFiles that failed for any reason other than Solr being unreachable")
    +            .build();
    +
    +    public static final Relationship REL_CONNECTION_FAILURE = new Relationship.Builder()
    --- End diff --
    
    Generally this is covered under FAILURE by other processors. You can always put a failure reason attribute on there and users can use RouteOnAttribute to cover this.


---

[GitHub] nifi pull request #2561: NIFI-4035 Implement record-based Solr processors

Posted by abhinavrohatgi30 <gi...@git.apache.org>.
Github user abhinavrohatgi30 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2561#discussion_r175796878
  
    --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrRecord.java ---
    @@ -0,0 +1,351 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.nifi.processors.solr;
    +
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.MalformedRecordException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.util.StopWatch;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.solr.client.solrj.SolrServerException;
    +import org.apache.solr.client.solrj.request.UpdateRequest;
    +import org.apache.solr.client.solrj.response.UpdateResponse;
    +import org.apache.solr.common.SolrException;
    +import org.apache.solr.common.SolrInputDocument;
    +import org.apache.solr.common.params.ModifiableSolrParams;
    +import org.apache.solr.common.params.MultiMapSolrParams;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.SortedMap;
    +import java.util.TreeMap;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD;
    +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME;
    +import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION;
    +import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_CLOUD;
    +import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE;
    +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.writeRecord;
    +
    +
    +@Tags({"Apache", "Solr", "Put", "Send","Record"})
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@CapabilityDescription("Indexes the Records from a FlowFile into Solr")
    +@DynamicProperty(name="A Solr request parameter name", value="A Solr request parameter value",
    +        description="These parameters will be passed to Solr on the request")
    +public class PutSolrRecord extends SolrProcessor {
    +
    +    public static final PropertyDescriptor UPDATE_PATH = new PropertyDescriptor
    +            .Builder().name("Solr Update Path")
    +            .description("The path in Solr to post the Flowfile Records")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("/update")
    +            .build();
    +
    +    public static final PropertyDescriptor FIELDS_TO_INDEX  = new PropertyDescriptor
    --- End diff --
    
    Solr can do child documents which if I understand correctly is what you meant by embedded records, but that is something that I haven't implemented in this processor as I was expecting this processor to be indexing a single solr document for a single record, in which case all the fields would be at the top level. I can look into implementing child documents but then i'm unsure how would the child docs be represented in a record.


---

[GitHub] nifi pull request #2561: NIFI-4035 Implement record-based Solr processors

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2561#discussion_r175764796
  
    --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java ---
    @@ -280,5 +291,115 @@ public SolrInputDocument toSolrInputDocument(SolrDocument d) {
             }
         }
     
    +    /**
    +     * Writes each Record as a SolrInputDocument.
    +     */
    +    public static void writeRecord(final Record record, final RecordSchema writeSchema, final SolrInputDocument inputDocument,final List<String> fieldsToIndex)
    +            throws IOException {
    +        RecordSchema schema = record.getSchema();
    +
    +        for (int i = 0; i < schema.getFieldCount(); i++) {
    +            final RecordField field = schema.getField(i);
    +            final String fieldName = field.getFieldName();
    +            final Object value = record.getValue(field);
    +            if (value == null || (!fieldsToIndex.isEmpty() && !fieldsToIndex.contains(fieldName))) {
    +                continue;
    +            }else {
    +                final DataType dataType = schema.getDataType(fieldName).get();
    +                writeValue(inputDocument, value, fieldName, dataType,fieldsToIndex);
    +            }
    +        }
    +    }
     
    +    private static void writeValue(final SolrInputDocument inputDocument, final Object value, final String fieldName, final DataType dataType,final List<String> fieldsToIndex) throws IOException {
    +        final DataType chosenDataType = dataType.getFieldType() == RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType(value, (ChoiceDataType) dataType) : dataType;
    +        final Object coercedValue = DataTypeUtils.convertType(value, chosenDataType, fieldName);
    +        if (coercedValue == null) {
    +            return;
    +        }
    +
    +        switch (chosenDataType.getFieldType()) {
    +            case DATE: {
    +                final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat()));
    +                if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
    +                    LocalDate localDate = getLocalDateFromEpochTime(fieldName, coercedValue);
    +                    inputDocument.addField(fieldName,localDate.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                } else {
    +                    inputDocument.addField(fieldName,LocalDate.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                }
    +                break;
    +            }
    +            case TIMESTAMP: {
    +                final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat()));
    +                if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
    +                    LocalDateTime localDateTime = getLocalDateTimeFromEpochTime(fieldName, coercedValue);
    +                    inputDocument.addField(fieldName,localDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                } else {
    +                    inputDocument.addField(fieldName,LocalDateTime.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                }
    +                break;
    +            }
    +            case DOUBLE:
    +                inputDocument.addField(fieldName,DataTypeUtils.toDouble(coercedValue, fieldName));
    +                break;
    +            case FLOAT:
    +                inputDocument.addField(fieldName,DataTypeUtils.toFloat(coercedValue, fieldName));
    +                break;
    +            case LONG:
    +                inputDocument.addField(fieldName,DataTypeUtils.toLong(coercedValue, fieldName));
    +                break;
    +            case INT:
    +            case BYTE:
    +            case SHORT:
    +                inputDocument.addField(fieldName,DataTypeUtils.toInteger(coercedValue, fieldName));
    +                break;
    +            case CHAR:
    +            case STRING:
    +                inputDocument.addField(fieldName,coercedValue.toString());
    +                break;
    +            case BIGINT:
    +                if (coercedValue instanceof Long) {
    +                    inputDocument.addField(fieldName,(Long) coercedValue);
    +                } else {
    +                    inputDocument.addField(fieldName,(BigInteger) coercedValue);
    +                }
    +                break;
    +            case BOOLEAN:
    +                final String stringValue = coercedValue.toString();
    +                if ("true".equalsIgnoreCase(stringValue)) {
    +                    inputDocument.addField(fieldName,true);
    +                } else if ("false".equalsIgnoreCase(stringValue)) {
    +                    inputDocument.addField(fieldName,false);
    +                } else {
    +                    inputDocument.addField(fieldName,stringValue);
    +                }
    +                break;
    +            case RECORD: {
    --- End diff --
    
    How do nested records end up being represented in the Solr document? Not saying anything is wrong here, just asking to understand how it works.
    
    Lets say we have a person schema with top-level fields for "firstName" and "lastName", and "address", and the address field is of type record and then has it's own fields "street", "city", "zip"...  
    
    Does the resulting Solr document contain "firstName", "lastName", "street", "city", "zip"? 
    
    Would it make sense to have an option to include the parent field in the field names, so it ends up being "address_street", "address_city", and "address_zip" so that you know where those fields came from?


---

[GitHub] nifi pull request #2561: NIFI-4035 Implement record-based Solr processors

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2561#discussion_r175746503
  
    --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/util/MockRecordParser.java ---
    @@ -0,0 +1,105 @@
    +/*
    --- End diff --
    
    `PutMongoRecordIT.testInsertNestedRecords` is another good example.


---

[GitHub] nifi pull request #2561: NIFI-4035 Implement record-based Solr processors

Posted by abhinavrohatgi30 <gi...@git.apache.org>.
Github user abhinavrohatgi30 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2561#discussion_r175797163
  
    --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/util/MockRecordParser.java ---
    @@ -0,0 +1,105 @@
    +/*
    --- End diff --
    
    Sure, I'll look into these tests.


---

[GitHub] nifi pull request #2561: NIFI-4035 Implement record-based Solr processors

Posted by abhinavrohatgi30 <gi...@git.apache.org>.
Github user abhinavrohatgi30 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2561#discussion_r175882819
  
    --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrRecord.java ---
    @@ -0,0 +1,351 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.nifi.processors.solr;
    +
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.MalformedRecordException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.util.StopWatch;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.solr.client.solrj.SolrServerException;
    +import org.apache.solr.client.solrj.request.UpdateRequest;
    +import org.apache.solr.client.solrj.response.UpdateResponse;
    +import org.apache.solr.common.SolrException;
    +import org.apache.solr.common.SolrInputDocument;
    +import org.apache.solr.common.params.ModifiableSolrParams;
    +import org.apache.solr.common.params.MultiMapSolrParams;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.SortedMap;
    +import java.util.TreeMap;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD;
    +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME;
    +import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION;
    +import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_CLOUD;
    +import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE;
    +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.writeRecord;
    +
    +
    +@Tags({"Apache", "Solr", "Put", "Send","Record"})
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@CapabilityDescription("Indexes the Records from a FlowFile into Solr")
    +@DynamicProperty(name="A Solr request parameter name", value="A Solr request parameter value",
    +        description="These parameters will be passed to Solr on the request")
    +public class PutSolrRecord extends SolrProcessor {
    +
    +    public static final PropertyDescriptor UPDATE_PATH = new PropertyDescriptor
    +            .Builder().name("Solr Update Path")
    +            .description("The path in Solr to post the Flowfile Records")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("/update")
    +            .build();
    +
    +    public static final PropertyDescriptor FIELDS_TO_INDEX  = new PropertyDescriptor
    +            .Builder().name("Fields To Index")
    +            .displayName("Fields To Index")
    +            .description("Comma-separated list of field names to write")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor COMMIT_WITHIN = new PropertyDescriptor
    +            .Builder().name("Commit Within")
    +            .description("The number of milliseconds before the given update is committed")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("5000")
    +            .build();
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("The original FlowFile")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("FlowFiles that failed for any reason other than Solr being unreachable")
    +            .build();
    +
    +    public static final Relationship REL_CONNECTION_FAILURE = new Relationship.Builder()
    +            .name("connection_failure")
    +            .description("FlowFiles that failed because Solr is unreachable")
    +            .build();
    +
    +    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
    +            .name("put-solr-record-record-reader")
    +            .displayName("Record Reader")
    +            .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.")
    +            .identifiesControllerService(RecordReaderFactory.class)
    +            .required(true)
    +            .build();
    +
    +    public static final String COLLECTION_PARAM_NAME = "collection";
    +    public static final String COMMIT_WITHIN_PARAM_NAME = "commitWithin";
    +    public static final String REPEATING_PARAM_PATTERN = "\\w+\\.\\d+";
    +    public  final ComponentLog logger = getLogger();
    +
    +    private Set<Relationship> relationships;
    +    private List<PropertyDescriptor> descriptors;
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        super.init(context);
    +
    +        final List<PropertyDescriptor> descriptors = new ArrayList<>();
    +        descriptors.add(SOLR_TYPE);
    +        descriptors.add(SOLR_LOCATION);
    +        descriptors.add(COLLECTION);
    +        descriptors.add(UPDATE_PATH);
    +        descriptors.add(RECORD_READER);
    +        descriptors.add(FIELDS_TO_INDEX);
    +        descriptors.add(COMMIT_WITHIN);
    +        descriptors.add(JAAS_CLIENT_APP_NAME);
    +        descriptors.add(BASIC_USERNAME);
    +        descriptors.add(BASIC_PASSWORD);
    +        descriptors.add(SSL_CONTEXT_SERVICE);
    +        descriptors.add(SOLR_SOCKET_TIMEOUT);
    +        descriptors.add(SOLR_CONNECTION_TIMEOUT);
    +        descriptors.add(SOLR_MAX_CONNECTIONS);
    +        descriptors.add(SOLR_MAX_CONNECTIONS_PER_HOST);
    +        descriptors.add(ZK_CLIENT_TIMEOUT);
    +        descriptors.add(ZK_CONNECTION_TIMEOUT);
    +        this.descriptors = Collections.unmodifiableList(descriptors);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        relationships.add(REL_FAILURE);
    +        relationships.add(REL_CONNECTION_FAILURE);
    +        this.relationships = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return this.relationships;
    +    }
    +
    +    @Override
    +    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return this.descriptors;
    +    }
    +
    +    @Override
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +                .description("Specifies the value to send for the '" + propertyDescriptorName + "' request parameter")
    +                .name(propertyDescriptorName)
    +                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +                .dynamic(true)
    +                .expressionLanguageSupported(true)
    +                .build();
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if ( flowFile == null ) {
    +            return;
    +        }
    +
    +        final AtomicReference<Exception> error = new AtomicReference<>(null);
    +        final AtomicReference<Exception> connectionError = new AtomicReference<>(null);
    +
    +        final boolean isSolrCloud = SOLR_TYPE_CLOUD.getValue().equals(context.getProperty(SOLR_TYPE).getValue());
    +        final String collection = context.getProperty(COLLECTION).evaluateAttributeExpressions(flowFile).getValue();
    +        final Long commitWithin = context.getProperty(COMMIT_WITHIN).evaluateAttributeExpressions(flowFile).asLong();
    +        final String contentStreamPath = context.getProperty(UPDATE_PATH).evaluateAttributeExpressions(flowFile).getValue();
    +        final MultiMapSolrParams requestParams = new MultiMapSolrParams(getRequestParams(context, flowFile));
    +        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
    +        final String fieldsToIndex = context.getProperty(FIELDS_TO_INDEX).getValue();
    --- End diff --
    
    Sure, I'll make that change


---

[GitHub] nifi pull request #2561: NIFI-4035 Implement record-based Solr processors

Posted by abhinavrohatgi30 <gi...@git.apache.org>.
Github user abhinavrohatgi30 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2561#discussion_r176208789
  
    --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrRecord.java ---
    @@ -0,0 +1,351 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.nifi.processors.solr;
    +
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.MalformedRecordException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.util.StopWatch;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.solr.client.solrj.SolrServerException;
    +import org.apache.solr.client.solrj.request.UpdateRequest;
    +import org.apache.solr.client.solrj.response.UpdateResponse;
    +import org.apache.solr.common.SolrException;
    +import org.apache.solr.common.SolrInputDocument;
    +import org.apache.solr.common.params.ModifiableSolrParams;
    +import org.apache.solr.common.params.MultiMapSolrParams;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.SortedMap;
    +import java.util.TreeMap;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD;
    +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME;
    +import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION;
    +import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_CLOUD;
    +import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE;
    +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.writeRecord;
    +
    +
    +@Tags({"Apache", "Solr", "Put", "Send","Record"})
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@CapabilityDescription("Indexes the Records from a FlowFile into Solr")
    +@DynamicProperty(name="A Solr request parameter name", value="A Solr request parameter value",
    +        description="These parameters will be passed to Solr on the request")
    +public class PutSolrRecord extends SolrProcessor {
    +
    +    public static final PropertyDescriptor UPDATE_PATH = new PropertyDescriptor
    +            .Builder().name("Solr Update Path")
    +            .description("The path in Solr to post the Flowfile Records")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("/update")
    +            .build();
    +
    +    public static final PropertyDescriptor FIELDS_TO_INDEX  = new PropertyDescriptor
    +            .Builder().name("Fields To Index")
    +            .displayName("Fields To Index")
    +            .description("Comma-separated list of field names to write")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor COMMIT_WITHIN = new PropertyDescriptor
    +            .Builder().name("Commit Within")
    +            .description("The number of milliseconds before the given update is committed")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("5000")
    +            .build();
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("The original FlowFile")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("FlowFiles that failed for any reason other than Solr being unreachable")
    +            .build();
    +
    +    public static final Relationship REL_CONNECTION_FAILURE = new Relationship.Builder()
    +            .name("connection_failure")
    +            .description("FlowFiles that failed because Solr is unreachable")
    +            .build();
    +
    +    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
    +            .name("put-solr-record-record-reader")
    +            .displayName("Record Reader")
    +            .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.")
    +            .identifiesControllerService(RecordReaderFactory.class)
    +            .required(true)
    +            .build();
    +
    +    public static final String COLLECTION_PARAM_NAME = "collection";
    +    public static final String COMMIT_WITHIN_PARAM_NAME = "commitWithin";
    +    public static final String REPEATING_PARAM_PATTERN = "\\w+\\.\\d+";
    +    public  final ComponentLog logger = getLogger();
    +
    +    private Set<Relationship> relationships;
    +    private List<PropertyDescriptor> descriptors;
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        super.init(context);
    +
    +        final List<PropertyDescriptor> descriptors = new ArrayList<>();
    +        descriptors.add(SOLR_TYPE);
    +        descriptors.add(SOLR_LOCATION);
    +        descriptors.add(COLLECTION);
    +        descriptors.add(UPDATE_PATH);
    +        descriptors.add(RECORD_READER);
    +        descriptors.add(FIELDS_TO_INDEX);
    +        descriptors.add(COMMIT_WITHIN);
    +        descriptors.add(JAAS_CLIENT_APP_NAME);
    +        descriptors.add(BASIC_USERNAME);
    +        descriptors.add(BASIC_PASSWORD);
    +        descriptors.add(SSL_CONTEXT_SERVICE);
    +        descriptors.add(SOLR_SOCKET_TIMEOUT);
    +        descriptors.add(SOLR_CONNECTION_TIMEOUT);
    +        descriptors.add(SOLR_MAX_CONNECTIONS);
    +        descriptors.add(SOLR_MAX_CONNECTIONS_PER_HOST);
    +        descriptors.add(ZK_CLIENT_TIMEOUT);
    +        descriptors.add(ZK_CONNECTION_TIMEOUT);
    +        this.descriptors = Collections.unmodifiableList(descriptors);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        relationships.add(REL_FAILURE);
    +        relationships.add(REL_CONNECTION_FAILURE);
    +        this.relationships = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return this.relationships;
    +    }
    +
    +    @Override
    +    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return this.descriptors;
    +    }
    +
    +    @Override
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +                .description("Specifies the value to send for the '" + propertyDescriptorName + "' request parameter")
    +                .name(propertyDescriptorName)
    +                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +                .dynamic(true)
    +                .expressionLanguageSupported(true)
    +                .build();
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if ( flowFile == null ) {
    +            return;
    +        }
    +
    +        final AtomicReference<Exception> error = new AtomicReference<>(null);
    +        final AtomicReference<Exception> connectionError = new AtomicReference<>(null);
    +
    +        final boolean isSolrCloud = SOLR_TYPE_CLOUD.getValue().equals(context.getProperty(SOLR_TYPE).getValue());
    +        final String collection = context.getProperty(COLLECTION).evaluateAttributeExpressions(flowFile).getValue();
    +        final Long commitWithin = context.getProperty(COMMIT_WITHIN).evaluateAttributeExpressions(flowFile).asLong();
    +        final String contentStreamPath = context.getProperty(UPDATE_PATH).evaluateAttributeExpressions(flowFile).getValue();
    +        final MultiMapSolrParams requestParams = new MultiMapSolrParams(getRequestParams(context, flowFile));
    +        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
    +        final String fieldsToIndex = context.getProperty(FIELDS_TO_INDEX).getValue();
    --- End diff --
    
    I've made the change to support nifi expressions for the field


---

[GitHub] nifi issue #2561: NIFI-4035 Implement record-based Solr processors

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on the issue:

    https://github.com/apache/nifi/pull/2561
  
    I would try doing a rebase against master to see what happens. In the worst case situation you would have to create another branch off latest master, and then individually cherry-pick your commits from this branch over to the new branch, to get rid of those other commits that are in between yours, but only do that if you can't get this branch straightened out. 


---

[GitHub] nifi issue #2561: NIFI-4035 Implement record-based Solr processors

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2561
  
    @abhinavrohatgi30 You have a merge conflict in this branch. If you resolve it, I'll help @bbende finish the review.


---

[GitHub] nifi issue #2561: NIFI-4035 Implement record-based Solr processors

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on the issue:

    https://github.com/apache/nifi/pull/2561
  
    Yea when you update your branch you should be doing something like the following...
    ```
    git fetch upstream
    git rebase upstream/master
    ```
    This assumes "upstream" points to either Apache NiFi git repo or Apache NiFi Github.
    
    Using rebase will apply all the incoming commits from upstream/master to your branch and then put your commits back on top of that so it looks like yours are always the latest.
    
    You then need to force push to your remote branch `git push origin your-branch --force`


---

[GitHub] nifi pull request #2561: NIFI-4035 Implement record-based Solr processors

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2561#discussion_r175743694
  
    --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrRecord.java ---
    @@ -0,0 +1,351 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.nifi.processors.solr;
    +
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.MalformedRecordException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.util.StopWatch;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.solr.client.solrj.SolrServerException;
    +import org.apache.solr.client.solrj.request.UpdateRequest;
    +import org.apache.solr.client.solrj.response.UpdateResponse;
    +import org.apache.solr.common.SolrException;
    +import org.apache.solr.common.SolrInputDocument;
    +import org.apache.solr.common.params.ModifiableSolrParams;
    +import org.apache.solr.common.params.MultiMapSolrParams;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.SortedMap;
    +import java.util.TreeMap;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD;
    +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME;
    +import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION;
    +import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_CLOUD;
    +import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE;
    +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.writeRecord;
    +
    +
    +@Tags({"Apache", "Solr", "Put", "Send","Record"})
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@CapabilityDescription("Indexes the Records from a FlowFile into Solr")
    +@DynamicProperty(name="A Solr request parameter name", value="A Solr request parameter value",
    +        description="These parameters will be passed to Solr on the request")
    +public class PutSolrRecord extends SolrProcessor {
    +
    +    public static final PropertyDescriptor UPDATE_PATH = new PropertyDescriptor
    +            .Builder().name("Solr Update Path")
    +            .description("The path in Solr to post the Flowfile Records")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("/update")
    +            .build();
    +
    +    public static final PropertyDescriptor FIELDS_TO_INDEX  = new PropertyDescriptor
    --- End diff --
    
    This could get dicey if a user does embedded records. I know ElasticSearch supports that, and it's my understanding that Solr can do them too. Might want to think about this because a user might say they want to do these 3 top level and these 2 embedded fields and nothing else.


---

[GitHub] nifi issue #2561: NIFI-4035 Implement record-based Solr processors

Posted by abhinavrohatgi30 <gi...@git.apache.org>.
Github user abhinavrohatgi30 commented on the issue:

    https://github.com/apache/nifi/pull/2561
  
    Hi @bbende , I've brought it down to a single commit, can you have a look at it now?


---

[GitHub] nifi issue #2561: NIFI-4035 Implement record-based Solr processors

Posted by abhinavrohatgi30 <gi...@git.apache.org>.
Github user abhinavrohatgi30 commented on the issue:

    https://github.com/apache/nifi/pull/2561
  
    Sorry, instead of doing the force push i resolved conflicts and did a push, can i now do the rebase again on the current commit or will i have to add a new commit inorder to rebase from the master branch?


---

[GitHub] nifi pull request #2561: NIFI-4035 Implement record-based Solr processors

Posted by abhinavrohatgi30 <gi...@git.apache.org>.
Github user abhinavrohatgi30 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2561#discussion_r175801577
  
    --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java ---
    @@ -280,5 +291,115 @@ public SolrInputDocument toSolrInputDocument(SolrDocument d) {
             }
         }
     
    +    /**
    +     * Writes each Record as a SolrInputDocument.
    +     */
    +    public static void writeRecord(final Record record, final RecordSchema writeSchema, final SolrInputDocument inputDocument,final List<String> fieldsToIndex)
    +            throws IOException {
    +        RecordSchema schema = record.getSchema();
    +
    +        for (int i = 0; i < schema.getFieldCount(); i++) {
    +            final RecordField field = schema.getField(i);
    +            final String fieldName = field.getFieldName();
    +            final Object value = record.getValue(field);
    +            if (value == null || (!fieldsToIndex.isEmpty() && !fieldsToIndex.contains(fieldName))) {
    +                continue;
    +            }else {
    +                final DataType dataType = schema.getDataType(fieldName).get();
    +                writeValue(inputDocument, value, fieldName, dataType,fieldsToIndex);
    +            }
    +        }
    +    }
     
    +    private static void writeValue(final SolrInputDocument inputDocument, final Object value, final String fieldName, final DataType dataType,final List<String> fieldsToIndex) throws IOException {
    +        final DataType chosenDataType = dataType.getFieldType() == RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType(value, (ChoiceDataType) dataType) : dataType;
    +        final Object coercedValue = DataTypeUtils.convertType(value, chosenDataType, fieldName);
    +        if (coercedValue == null) {
    +            return;
    +        }
    +
    +        switch (chosenDataType.getFieldType()) {
    +            case DATE: {
    +                final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat()));
    +                if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
    +                    LocalDate localDate = getLocalDateFromEpochTime(fieldName, coercedValue);
    +                    inputDocument.addField(fieldName,localDate.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                } else {
    +                    inputDocument.addField(fieldName,LocalDate.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                }
    +                break;
    +            }
    +            case TIMESTAMP: {
    +                final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat()));
    +                if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
    +                    LocalDateTime localDateTime = getLocalDateTimeFromEpochTime(fieldName, coercedValue);
    +                    inputDocument.addField(fieldName,localDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                } else {
    +                    inputDocument.addField(fieldName,LocalDateTime.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                }
    +                break;
    +            }
    +            case DOUBLE:
    +                inputDocument.addField(fieldName,DataTypeUtils.toDouble(coercedValue, fieldName));
    +                break;
    +            case FLOAT:
    +                inputDocument.addField(fieldName,DataTypeUtils.toFloat(coercedValue, fieldName));
    +                break;
    +            case LONG:
    +                inputDocument.addField(fieldName,DataTypeUtils.toLong(coercedValue, fieldName));
    +                break;
    +            case INT:
    +            case BYTE:
    +            case SHORT:
    +                inputDocument.addField(fieldName,DataTypeUtils.toInteger(coercedValue, fieldName));
    +                break;
    +            case CHAR:
    +            case STRING:
    +                inputDocument.addField(fieldName,coercedValue.toString());
    +                break;
    +            case BIGINT:
    +                if (coercedValue instanceof Long) {
    +                    inputDocument.addField(fieldName,(Long) coercedValue);
    +                } else {
    +                    inputDocument.addField(fieldName,(BigInteger) coercedValue);
    +                }
    +                break;
    +            case BOOLEAN:
    +                final String stringValue = coercedValue.toString();
    +                if ("true".equalsIgnoreCase(stringValue)) {
    +                    inputDocument.addField(fieldName,true);
    +                } else if ("false".equalsIgnoreCase(stringValue)) {
    +                    inputDocument.addField(fieldName,false);
    +                } else {
    +                    inputDocument.addField(fieldName,stringValue);
    +                }
    +                break;
    +            case RECORD: {
    --- End diff --
    
    Yes, I've implemented it to flatten the nested record into a single solr document, although i was sort of unsure if there would be a need to consider nested records. If there is one, I'll make changes to the field name as you suggested so that one knows where the field came from.


---

[GitHub] nifi pull request #2561: NIFI-4035 Implement record-based Solr processors

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2561#discussion_r175858048
  
    --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java ---
    @@ -280,5 +291,115 @@ public SolrInputDocument toSolrInputDocument(SolrDocument d) {
             }
         }
     
    +    /**
    +     * Writes each Record as a SolrInputDocument.
    +     */
    +    public static void writeRecord(final Record record, final RecordSchema writeSchema, final SolrInputDocument inputDocument,final List<String> fieldsToIndex)
    +            throws IOException {
    +        RecordSchema schema = record.getSchema();
    +
    +        for (int i = 0; i < schema.getFieldCount(); i++) {
    +            final RecordField field = schema.getField(i);
    +            final String fieldName = field.getFieldName();
    +            final Object value = record.getValue(field);
    +            if (value == null || (!fieldsToIndex.isEmpty() && !fieldsToIndex.contains(fieldName))) {
    +                continue;
    +            }else {
    +                final DataType dataType = schema.getDataType(fieldName).get();
    +                writeValue(inputDocument, value, fieldName, dataType,fieldsToIndex);
    +            }
    +        }
    +    }
     
    +    private static void writeValue(final SolrInputDocument inputDocument, final Object value, final String fieldName, final DataType dataType,final List<String> fieldsToIndex) throws IOException {
    +        final DataType chosenDataType = dataType.getFieldType() == RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType(value, (ChoiceDataType) dataType) : dataType;
    +        final Object coercedValue = DataTypeUtils.convertType(value, chosenDataType, fieldName);
    +        if (coercedValue == null) {
    +            return;
    +        }
    +
    +        switch (chosenDataType.getFieldType()) {
    +            case DATE: {
    +                final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat()));
    +                if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
    +                    LocalDate localDate = getLocalDateFromEpochTime(fieldName, coercedValue);
    +                    inputDocument.addField(fieldName,localDate.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                } else {
    +                    inputDocument.addField(fieldName,LocalDate.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                }
    +                break;
    +            }
    +            case TIMESTAMP: {
    +                final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat()));
    +                if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
    +                    LocalDateTime localDateTime = getLocalDateTimeFromEpochTime(fieldName, coercedValue);
    +                    inputDocument.addField(fieldName,localDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                } else {
    +                    inputDocument.addField(fieldName,LocalDateTime.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                }
    +                break;
    +            }
    +            case DOUBLE:
    +                inputDocument.addField(fieldName,DataTypeUtils.toDouble(coercedValue, fieldName));
    +                break;
    +            case FLOAT:
    +                inputDocument.addField(fieldName,DataTypeUtils.toFloat(coercedValue, fieldName));
    +                break;
    +            case LONG:
    +                inputDocument.addField(fieldName,DataTypeUtils.toLong(coercedValue, fieldName));
    +                break;
    +            case INT:
    +            case BYTE:
    +            case SHORT:
    +                inputDocument.addField(fieldName,DataTypeUtils.toInteger(coercedValue, fieldName));
    +                break;
    +            case CHAR:
    +            case STRING:
    +                inputDocument.addField(fieldName,coercedValue.toString());
    +                break;
    +            case BIGINT:
    +                if (coercedValue instanceof Long) {
    +                    inputDocument.addField(fieldName,(Long) coercedValue);
    +                } else {
    +                    inputDocument.addField(fieldName,(BigInteger) coercedValue);
    +                }
    +                break;
    +            case BOOLEAN:
    +                final String stringValue = coercedValue.toString();
    +                if ("true".equalsIgnoreCase(stringValue)) {
    +                    inputDocument.addField(fieldName,true);
    +                } else if ("false".equalsIgnoreCase(stringValue)) {
    +                    inputDocument.addField(fieldName,false);
    +                } else {
    +                    inputDocument.addField(fieldName,stringValue);
    +                }
    +                break;
    +            case RECORD: {
    --- End diff --
    
    I think the current approach is probably fine for now, maybe we can make a note in the @CapabilityDescription that mentions how nested records are handled, or in the additionalDetails.html you could provide example input and show it would be handled.
    
    Someone can always use ConvertRecord to convert whatever format to JSON, and then use the existing PutSolrContentStream with Solr's splitting if they want to do something like that.


---

[GitHub] nifi pull request #2561: NIFI-4035 Implement record-based Solr processors

Posted by abhinavrohatgi30 <gi...@git.apache.org>.
Github user abhinavrohatgi30 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2561#discussion_r176207854
  
    --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrRecord.java ---
    @@ -0,0 +1,351 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.nifi.processors.solr;
    +
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.MalformedRecordException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.util.StopWatch;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.solr.client.solrj.SolrServerException;
    +import org.apache.solr.client.solrj.request.UpdateRequest;
    +import org.apache.solr.client.solrj.response.UpdateResponse;
    +import org.apache.solr.common.SolrException;
    +import org.apache.solr.common.SolrInputDocument;
    +import org.apache.solr.common.params.ModifiableSolrParams;
    +import org.apache.solr.common.params.MultiMapSolrParams;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.SortedMap;
    +import java.util.TreeMap;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD;
    +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME;
    +import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION;
    +import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_CLOUD;
    +import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE;
    +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.writeRecord;
    +
    +
    +@Tags({"Apache", "Solr", "Put", "Send","Record"})
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@CapabilityDescription("Indexes the Records from a FlowFile into Solr")
    +@DynamicProperty(name="A Solr request parameter name", value="A Solr request parameter value",
    +        description="These parameters will be passed to Solr on the request")
    +public class PutSolrRecord extends SolrProcessor {
    +
    +    public static final PropertyDescriptor UPDATE_PATH = new PropertyDescriptor
    +            .Builder().name("Solr Update Path")
    +            .description("The path in Solr to post the Flowfile Records")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("/update")
    +            .build();
    +
    +    public static final PropertyDescriptor FIELDS_TO_INDEX  = new PropertyDescriptor
    +            .Builder().name("Fields To Index")
    +            .displayName("Fields To Index")
    +            .description("Comma-separated list of field names to write")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor COMMIT_WITHIN = new PropertyDescriptor
    +            .Builder().name("Commit Within")
    +            .description("The number of milliseconds before the given update is committed")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("5000")
    +            .build();
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("The original FlowFile")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("FlowFiles that failed for any reason other than Solr being unreachable")
    +            .build();
    +
    +    public static final Relationship REL_CONNECTION_FAILURE = new Relationship.Builder()
    +            .name("connection_failure")
    +            .description("FlowFiles that failed because Solr is unreachable")
    +            .build();
    +
    +    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
    +            .name("put-solr-record-record-reader")
    +            .displayName("Record Reader")
    +            .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.")
    +            .identifiesControllerService(RecordReaderFactory.class)
    +            .required(true)
    +            .build();
    +
    +    public static final String COLLECTION_PARAM_NAME = "collection";
    +    public static final String COMMIT_WITHIN_PARAM_NAME = "commitWithin";
    +    public static final String REPEATING_PARAM_PATTERN = "\\w+\\.\\d+";
    +    public  final ComponentLog logger = getLogger();
    +
    +    private Set<Relationship> relationships;
    +    private List<PropertyDescriptor> descriptors;
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        super.init(context);
    +
    +        final List<PropertyDescriptor> descriptors = new ArrayList<>();
    +        descriptors.add(SOLR_TYPE);
    +        descriptors.add(SOLR_LOCATION);
    +        descriptors.add(COLLECTION);
    +        descriptors.add(UPDATE_PATH);
    +        descriptors.add(RECORD_READER);
    +        descriptors.add(FIELDS_TO_INDEX);
    +        descriptors.add(COMMIT_WITHIN);
    +        descriptors.add(JAAS_CLIENT_APP_NAME);
    +        descriptors.add(BASIC_USERNAME);
    +        descriptors.add(BASIC_PASSWORD);
    +        descriptors.add(SSL_CONTEXT_SERVICE);
    +        descriptors.add(SOLR_SOCKET_TIMEOUT);
    +        descriptors.add(SOLR_CONNECTION_TIMEOUT);
    +        descriptors.add(SOLR_MAX_CONNECTIONS);
    +        descriptors.add(SOLR_MAX_CONNECTIONS_PER_HOST);
    +        descriptors.add(ZK_CLIENT_TIMEOUT);
    +        descriptors.add(ZK_CONNECTION_TIMEOUT);
    +        this.descriptors = Collections.unmodifiableList(descriptors);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        relationships.add(REL_FAILURE);
    +        relationships.add(REL_CONNECTION_FAILURE);
    +        this.relationships = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return this.relationships;
    +    }
    +
    +    @Override
    +    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return this.descriptors;
    +    }
    +
    +    @Override
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +                .description("Specifies the value to send for the '" + propertyDescriptorName + "' request parameter")
    +                .name(propertyDescriptorName)
    +                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +                .dynamic(true)
    +                .expressionLanguageSupported(true)
    +                .build();
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if ( flowFile == null ) {
    +            return;
    +        }
    +
    +        final AtomicReference<Exception> error = new AtomicReference<>(null);
    +        final AtomicReference<Exception> connectionError = new AtomicReference<>(null);
    +
    +        final boolean isSolrCloud = SOLR_TYPE_CLOUD.getValue().equals(context.getProperty(SOLR_TYPE).getValue());
    +        final String collection = context.getProperty(COLLECTION).evaluateAttributeExpressions(flowFile).getValue();
    +        final Long commitWithin = context.getProperty(COMMIT_WITHIN).evaluateAttributeExpressions(flowFile).asLong();
    +        final String contentStreamPath = context.getProperty(UPDATE_PATH).evaluateAttributeExpressions(flowFile).getValue();
    +        final MultiMapSolrParams requestParams = new MultiMapSolrParams(getRequestParams(context, flowFile));
    +        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
    +        final String fieldsToIndex = context.getProperty(FIELDS_TO_INDEX).getValue();
    +
    +        final List<String> fieldList = new ArrayList<>();
    +        if (!StringUtils.isBlank(fieldsToIndex)) {
    +            fieldList.addAll(Arrays.asList(fieldsToIndex.trim().split("[,]")));
    +        }
    +        StopWatch timer = new StopWatch(true);
    +        try (final InputStream in = session.read(flowFile);
    +             final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
    +
    +            Record record;
    +            List<SolrInputDocument> inputDocumentList = new LinkedList<>();
    --- End diff --
    
    I've added a parameter to index documents in batches, although now the whole flowfile would fail even if one of the records of the flowfile threw an exception. I understand it is ideal in case of a Solr connection failure but in case of some other exception specific to a record would it be better if we process the records that do not throw an exception? If yes, would we in that case route the flowfile to failure as some records still threw an exception? 


---

[GitHub] nifi pull request #2561: NIFI-4035 Implement record-based Solr processors

Posted by abhinavrohatgi30 <gi...@git.apache.org>.
Github user abhinavrohatgi30 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2561#discussion_r175816508
  
    --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java ---
    @@ -280,5 +291,115 @@ public SolrInputDocument toSolrInputDocument(SolrDocument d) {
             }
         }
     
    +    /**
    +     * Writes each Record as a SolrInputDocument.
    +     */
    +    public static void writeRecord(final Record record, final RecordSchema writeSchema, final SolrInputDocument inputDocument,final List<String> fieldsToIndex)
    +            throws IOException {
    +        RecordSchema schema = record.getSchema();
    +
    +        for (int i = 0; i < schema.getFieldCount(); i++) {
    +            final RecordField field = schema.getField(i);
    +            final String fieldName = field.getFieldName();
    +            final Object value = record.getValue(field);
    +            if (value == null || (!fieldsToIndex.isEmpty() && !fieldsToIndex.contains(fieldName))) {
    +                continue;
    +            }else {
    +                final DataType dataType = schema.getDataType(fieldName).get();
    +                writeValue(inputDocument, value, fieldName, dataType,fieldsToIndex);
    +            }
    +        }
    +    }
     
    +    private static void writeValue(final SolrInputDocument inputDocument, final Object value, final String fieldName, final DataType dataType,final List<String> fieldsToIndex) throws IOException {
    +        final DataType chosenDataType = dataType.getFieldType() == RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType(value, (ChoiceDataType) dataType) : dataType;
    +        final Object coercedValue = DataTypeUtils.convertType(value, chosenDataType, fieldName);
    +        if (coercedValue == null) {
    +            return;
    +        }
    +
    +        switch (chosenDataType.getFieldType()) {
    +            case DATE: {
    +                final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat()));
    +                if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
    +                    LocalDate localDate = getLocalDateFromEpochTime(fieldName, coercedValue);
    +                    inputDocument.addField(fieldName,localDate.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                } else {
    +                    inputDocument.addField(fieldName,LocalDate.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                }
    +                break;
    +            }
    +            case TIMESTAMP: {
    +                final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat()));
    +                if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
    +                    LocalDateTime localDateTime = getLocalDateTimeFromEpochTime(fieldName, coercedValue);
    +                    inputDocument.addField(fieldName,localDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                } else {
    +                    inputDocument.addField(fieldName,LocalDateTime.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                }
    +                break;
    +            }
    +            case DOUBLE:
    +                inputDocument.addField(fieldName,DataTypeUtils.toDouble(coercedValue, fieldName));
    +                break;
    +            case FLOAT:
    +                inputDocument.addField(fieldName,DataTypeUtils.toFloat(coercedValue, fieldName));
    +                break;
    +            case LONG:
    +                inputDocument.addField(fieldName,DataTypeUtils.toLong(coercedValue, fieldName));
    +                break;
    +            case INT:
    +            case BYTE:
    +            case SHORT:
    +                inputDocument.addField(fieldName,DataTypeUtils.toInteger(coercedValue, fieldName));
    +                break;
    +            case CHAR:
    +            case STRING:
    +                inputDocument.addField(fieldName,coercedValue.toString());
    +                break;
    +            case BIGINT:
    +                if (coercedValue instanceof Long) {
    +                    inputDocument.addField(fieldName,(Long) coercedValue);
    +                } else {
    +                    inputDocument.addField(fieldName,(BigInteger) coercedValue);
    +                }
    +                break;
    +            case BOOLEAN:
    +                final String stringValue = coercedValue.toString();
    +                if ("true".equalsIgnoreCase(stringValue)) {
    +                    inputDocument.addField(fieldName,true);
    +                } else if ("false".equalsIgnoreCase(stringValue)) {
    +                    inputDocument.addField(fieldName,false);
    +                } else {
    +                    inputDocument.addField(fieldName,stringValue);
    +                }
    +                break;
    +            case RECORD: {
    --- End diff --
    
    Do you want me to replicate the top level field in N nested record of the array and convert into into N solr documents, but this might get complicated if there are more than one levels of nested records.


---

[GitHub] nifi issue #2561: NIFI-4035 Implement record-based Solr processors

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on the issue:

    https://github.com/apache/nifi/pull/2561
  
    I tried to re-base this against master so I could squash it down to a single commit, but the re-base is encountering a lot of conflicts, which really shouldn't be happening because its conflicting with itself. Can you work through getting it down to a single commit?
    
    Normally it should just be:
    `git rebase -i upstream/master`
    
    Then in the list of commits you choose "s" for all the commits except the top-one, which squashes them all into the top one. Then force push.


---

[GitHub] nifi pull request #2561: NIFI-4035 Implement record-based Solr processors

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2561#discussion_r175818379
  
    --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java ---
    @@ -280,5 +291,115 @@ public SolrInputDocument toSolrInputDocument(SolrDocument d) {
             }
         }
     
    +    /**
    +     * Writes each Record as a SolrInputDocument.
    +     */
    +    public static void writeRecord(final Record record, final RecordSchema writeSchema, final SolrInputDocument inputDocument,final List<String> fieldsToIndex)
    +            throws IOException {
    +        RecordSchema schema = record.getSchema();
    +
    +        for (int i = 0; i < schema.getFieldCount(); i++) {
    +            final RecordField field = schema.getField(i);
    +            final String fieldName = field.getFieldName();
    +            final Object value = record.getValue(field);
    +            if (value == null || (!fieldsToIndex.isEmpty() && !fieldsToIndex.contains(fieldName))) {
    +                continue;
    +            }else {
    +                final DataType dataType = schema.getDataType(fieldName).get();
    +                writeValue(inputDocument, value, fieldName, dataType,fieldsToIndex);
    +            }
    +        }
    +    }
     
    +    private static void writeValue(final SolrInputDocument inputDocument, final Object value, final String fieldName, final DataType dataType,final List<String> fieldsToIndex) throws IOException {
    +        final DataType chosenDataType = dataType.getFieldType() == RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType(value, (ChoiceDataType) dataType) : dataType;
    +        final Object coercedValue = DataTypeUtils.convertType(value, chosenDataType, fieldName);
    +        if (coercedValue == null) {
    +            return;
    +        }
    +
    +        switch (chosenDataType.getFieldType()) {
    +            case DATE: {
    +                final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat()));
    +                if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
    +                    LocalDate localDate = getLocalDateFromEpochTime(fieldName, coercedValue);
    +                    inputDocument.addField(fieldName,localDate.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                } else {
    +                    inputDocument.addField(fieldName,LocalDate.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                }
    +                break;
    +            }
    +            case TIMESTAMP: {
    +                final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat()));
    +                if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
    +                    LocalDateTime localDateTime = getLocalDateTimeFromEpochTime(fieldName, coercedValue);
    +                    inputDocument.addField(fieldName,localDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                } else {
    +                    inputDocument.addField(fieldName,LocalDateTime.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                }
    +                break;
    +            }
    +            case DOUBLE:
    +                inputDocument.addField(fieldName,DataTypeUtils.toDouble(coercedValue, fieldName));
    +                break;
    +            case FLOAT:
    +                inputDocument.addField(fieldName,DataTypeUtils.toFloat(coercedValue, fieldName));
    +                break;
    +            case LONG:
    +                inputDocument.addField(fieldName,DataTypeUtils.toLong(coercedValue, fieldName));
    +                break;
    +            case INT:
    +            case BYTE:
    +            case SHORT:
    +                inputDocument.addField(fieldName,DataTypeUtils.toInteger(coercedValue, fieldName));
    +                break;
    +            case CHAR:
    +            case STRING:
    +                inputDocument.addField(fieldName,coercedValue.toString());
    +                break;
    +            case BIGINT:
    +                if (coercedValue instanceof Long) {
    +                    inputDocument.addField(fieldName,(Long) coercedValue);
    +                } else {
    +                    inputDocument.addField(fieldName,(BigInteger) coercedValue);
    +                }
    +                break;
    +            case BOOLEAN:
    +                final String stringValue = coercedValue.toString();
    +                if ("true".equalsIgnoreCase(stringValue)) {
    +                    inputDocument.addField(fieldName,true);
    +                } else if ("false".equalsIgnoreCase(stringValue)) {
    +                    inputDocument.addField(fieldName,false);
    +                } else {
    +                    inputDocument.addField(fieldName,stringValue);
    +                }
    +                break;
    +            case RECORD: {
    --- End diff --
    
    Well with the code you already have, what does the Solr document look like when there is an array of records?


---

[GitHub] nifi issue #2561: NIFI-4035 Implement record-based Solr processors

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on the issue:

    https://github.com/apache/nifi/pull/2561
  
    Nested records and arrays of nested records are not working correctly...
    
    **Scenario 1 - Nested Record**
    
    Schema:
    ```
    {
        "type": "record",
        "name": "exams",
        "fields" : [
          { "name": "first", "type": "string" },
          { "name": "last", "type": "string" },
          { "name": "grade", "type": "int" },
          {
            "name": "exam",
            "type": {
              "name" : "exam",
              "type" : "record",
              "fields" : [
                { "name": "subject", "type": "string" },
                { "name": "test", "type": "string" },
                { "name": "marks", "type": "int" }
              ]
            }
          }
        ]
    }
    ```
    
    Input:
    ```
    {
      "first": "Abhi",
      "last": "R",
      "grade": 8,
      "exam": {
        "subject": "Maths",
        "test" : "term1",
        "marks" : 90
      }
    }
    ```
    
    Result:
    ```
    java.util.NoSuchElementException: No value present
    	at java.util.Optional.get(Optional.java:135)
    	at org.apache.nifi.processors.solr.SolrUtils.writeRecord(SolrUtils.java:313)
    	at org.apache.nifi.processors.solr.SolrUtils.writeValue(SolrUtils.java:384)
    	at org.apache.nifi.processors.solr.SolrUtils.writeRecord(SolrUtils.java:314)
    	at org.apache.nifi.processors.solr.PutSolrRecord.onTrigger(PutSolrRecord.java:247)
    	at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
    ```
    
    **Scenario 2 - Array of Records**
    
    Schema:
    ```
    {
        "type": "record",
        "name": "exams",
        "fields" : [
          { "name": "first", "type": "string" },
          { "name": "last", "type": "string" },
          { "name": "grade", "type": "int" },
          {
            "name": "exams",
            "type": {
              "type" : "array",
              "items" : {
                "name" : "exam",
                "type" : "record",
                "fields" : [
                    { "name": "subject", "type": "string" },
                    { "name": "test", "type": "string" },
                    { "name": "marks", "type": "int" }
                ]
              }
            }
          }
        ]
    }
    ```
    
    Input:
    ```
    {
    "first": "Abhi",
    "last": "R",
    "grade": 8,
    "exams": [
        {
            "subject": "Maths",
            "test" : "term1",
            "marks" : 90
        },
        {
            "subject": "Physics",
            "test" : "term1",
            "marks" : 95
        }
    ]
    }
    ```
    
    Result:
    
    Solr Document with multi-valued field exams where the values are the toString of a MapRecord:
    ```
     "exams":["org.apache.nifi.serialization.record.MapRecord:MapRecord[{marks=90, test=term1, subject=Maths}]",
              "org.apache.nifi.serialization.record.MapRecord:MapRecord[{marks=95, test=term1, subject=Physics}]"],
    ```
    Should have created fields like exams_marks, exams_test, exams_subject.
    
    Here is a full template for the two scenarios:
    
    https://gist.githubusercontent.com/bbende/edc2e7d61db83b29533ac3fc520de30f/raw/8764d50ed5e14d876c53a0b84b3af5741d910b3b/PutSolrRecordTesting.xml
    
    There needs to be unit tests that cover both these cases.


---

[GitHub] nifi pull request #2561: NIFI-4035 Implement record-based Solr processors

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2561#discussion_r175863036
  
    --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrRecord.java ---
    @@ -0,0 +1,351 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.nifi.processors.solr;
    +
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.MalformedRecordException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.util.StopWatch;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.solr.client.solrj.SolrServerException;
    +import org.apache.solr.client.solrj.request.UpdateRequest;
    +import org.apache.solr.client.solrj.response.UpdateResponse;
    +import org.apache.solr.common.SolrException;
    +import org.apache.solr.common.SolrInputDocument;
    +import org.apache.solr.common.params.ModifiableSolrParams;
    +import org.apache.solr.common.params.MultiMapSolrParams;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.SortedMap;
    +import java.util.TreeMap;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD;
    +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME;
    +import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION;
    +import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_CLOUD;
    +import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE;
    +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.writeRecord;
    +
    +
    +@Tags({"Apache", "Solr", "Put", "Send","Record"})
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@CapabilityDescription("Indexes the Records from a FlowFile into Solr")
    +@DynamicProperty(name="A Solr request parameter name", value="A Solr request parameter value",
    +        description="These parameters will be passed to Solr on the request")
    +public class PutSolrRecord extends SolrProcessor {
    +
    +    public static final PropertyDescriptor UPDATE_PATH = new PropertyDescriptor
    +            .Builder().name("Solr Update Path")
    +            .description("The path in Solr to post the Flowfile Records")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("/update")
    +            .build();
    +
    +    public static final PropertyDescriptor FIELDS_TO_INDEX  = new PropertyDescriptor
    +            .Builder().name("Fields To Index")
    +            .displayName("Fields To Index")
    +            .description("Comma-separated list of field names to write")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    --- End diff --
    
    Since a record reader can dynamically obtain a schema based on the incoming flow file using the schema.name attribute, I think "Fields to Index" should support expression language so each flow file could potentially supply a different set of fields.


---

[GitHub] nifi pull request #2561: NIFI-4035 Implement record-based Solr processors

Posted by abhinavrohatgi30 <gi...@git.apache.org>.
Github user abhinavrohatgi30 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2561#discussion_r176206589
  
    --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrRecord.java ---
    @@ -0,0 +1,351 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.nifi.processors.solr;
    +
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.MalformedRecordException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.util.StopWatch;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.solr.client.solrj.SolrServerException;
    +import org.apache.solr.client.solrj.request.UpdateRequest;
    +import org.apache.solr.client.solrj.response.UpdateResponse;
    +import org.apache.solr.common.SolrException;
    +import org.apache.solr.common.SolrInputDocument;
    +import org.apache.solr.common.params.ModifiableSolrParams;
    +import org.apache.solr.common.params.MultiMapSolrParams;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.SortedMap;
    +import java.util.TreeMap;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD;
    +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME;
    +import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION;
    +import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_CLOUD;
    +import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE;
    +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.writeRecord;
    +
    +
    +@Tags({"Apache", "Solr", "Put", "Send","Record"})
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@CapabilityDescription("Indexes the Records from a FlowFile into Solr")
    +@DynamicProperty(name="A Solr request parameter name", value="A Solr request parameter value",
    +        description="These parameters will be passed to Solr on the request")
    +public class PutSolrRecord extends SolrProcessor {
    +
    +    public static final PropertyDescriptor UPDATE_PATH = new PropertyDescriptor
    +            .Builder().name("Solr Update Path")
    +            .description("The path in Solr to post the Flowfile Records")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("/update")
    +            .build();
    +
    +    public static final PropertyDescriptor FIELDS_TO_INDEX  = new PropertyDescriptor
    --- End diff --
    
    I have added an example in the additionalDetails.html on how to specify fields for nested records.


---

[GitHub] nifi pull request #2561: NIFI-4035 Implement record-based Solr processors

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2561#discussion_r178643485
  
    --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrRecord.java ---
    @@ -0,0 +1,373 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.nifi.processors.solr;
    +
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.MalformedRecordException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.util.StopWatch;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.solr.client.solrj.SolrServerException;
    +import org.apache.solr.client.solrj.request.UpdateRequest;
    +import org.apache.solr.client.solrj.response.UpdateResponse;
    +import org.apache.solr.common.SolrException;
    +import org.apache.solr.common.SolrInputDocument;
    +import org.apache.solr.common.params.ModifiableSolrParams;
    +import org.apache.solr.common.params.MultiMapSolrParams;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.SortedMap;
    +import java.util.TreeMap;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD;
    +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME;
    +import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION;
    +import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_CLOUD;
    +import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE;
    +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.writeRecord;
    +
    +
    +@Tags({"Apache", "Solr", "Put", "Send","Record"})
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@CapabilityDescription("Indexes the Records from a FlowFile into Solr")
    +@DynamicProperty(name="A Solr request parameter name", value="A Solr request parameter value",
    +        description="These parameters will be passed to Solr on the request")
    +public class PutSolrRecord extends SolrProcessor {
    +
    +    public static final PropertyDescriptor UPDATE_PATH = new PropertyDescriptor
    +            .Builder().name("Solr Update Path").displayName("Solr Update Path")
    +            .description("The path in Solr to post the Flowfile Records")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("/update")
    +            .build();
    +
    +    public static final PropertyDescriptor FIELDS_TO_INDEX  = new PropertyDescriptor
    +            .Builder().name("Fields To Index").displayName("Fields To Index")
    +            .displayName("Fields To Index")
    +            .description("Comma-separated list of field names to write")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    public static final PropertyDescriptor COMMIT_WITHIN = new PropertyDescriptor
    +            .Builder().name("Commit Within").displayName("Commit Within")
    +            .description("The number of milliseconds before the given update is committed")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("5000")
    +            .build();
    +
    +    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor
    +            .Builder().name("Batch Size").displayName("Batch Size")
    +            .description("The number of solr documents to index per batch")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("500")
    +            .build();
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("The original FlowFile")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("FlowFiles that failed for any reason other than Solr being unreachable")
    +            .build();
    +
    +    public static final Relationship REL_CONNECTION_FAILURE = new Relationship.Builder()
    +            .name("connection_failure")
    +            .description("FlowFiles that failed because Solr is unreachable")
    +            .build();
    +
    +    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
    +            .name("put-solr-record-record-reader").displayName("put-solr-record-record-reader")
    +            .displayName("Record Reader")
    +            .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.")
    +            .identifiesControllerService(RecordReaderFactory.class)
    +            .required(true)
    +            .build();
    +
    +    public static final String COLLECTION_PARAM_NAME = "collection";
    +    public static final String COMMIT_WITHIN_PARAM_NAME = "commitWithin";
    +    public static final String REPEATING_PARAM_PATTERN = "\\w+\\.\\d+";
    +    public  final ComponentLog logger = getLogger();
    --- End diff --
    
    Need to remove this and use getLogger() later in the code, the logger could still be null here


---

[GitHub] nifi issue #2561: NIFI-4035 Implement record-based Solr processors

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2561
  
    @abhinavrohatgi30 Looks like your latest push grabbed a bunch of other folks' commits. Unless @bbende disagrees, I think you're going to need to rebase and repush.


---

[GitHub] nifi issue #2561: NIFI-4035 Implement record-based Solr processors

Posted by abhinavrohatgi30 <gi...@git.apache.org>.
Github user abhinavrohatgi30 commented on the issue:

    https://github.com/apache/nifi/pull/2561
  
    NIFI-4035 Adding a PutSolrRecord Processor that reads NiFi Records and indexes them into Solr as SolrDocuments.


---

[GitHub] nifi pull request #2561: NIFI-4035 Implement record-based Solr processors

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2561#discussion_r178644667
  
    --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrRecord.java ---
    @@ -0,0 +1,643 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.nifi.processors.solr;
    +
    +import org.apache.nifi.controller.AbstractControllerService;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.serialization.record.MockRecordParser;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.ssl.SSLContextService;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.apache.solr.client.solrj.SolrClient;
    +import org.apache.solr.client.solrj.SolrQuery;
    +import org.apache.solr.client.solrj.SolrRequest;
    +import org.apache.solr.client.solrj.SolrServerException;
    +import org.apache.solr.client.solrj.impl.HttpSolrClient;
    +import org.apache.solr.client.solrj.impl.Krb5HttpClientConfigurer;
    +import org.apache.solr.client.solrj.response.QueryResponse;
    +import org.apache.solr.common.SolrDocument;
    +import org.apache.solr.common.SolrException;
    +import org.apache.solr.common.StringUtils;
    +import org.apache.solr.common.util.NamedList;
    +import org.junit.Assert;
    +import org.junit.Test;
    +import org.mockito.Mockito;
    +
    +import javax.net.ssl.SSLContext;
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static org.mockito.Matchers.any;
    +import static org.mockito.Matchers.eq;
    +import static org.mockito.Mockito.times;
    +import static org.mockito.Mockito.verify;
    +import static org.mockito.Mockito.when;
    +
    +/**
    + * Test for PutSolrRecord Processor
    + */
    +public class TestPutSolrRecord {
    --- End diff --
    
    I think there should be 3 new tests added...
    
    1) Test what happens when the recordParser throws an exception, just set failAfter on the record parser:
    `recordParser.failAfter(0);`
    And ensure that it routes to failure.
    
    2) Test a nested record
    
    3) Test an array of nested records


---

[GitHub] nifi pull request #2561: NIFI-4035 Implement record-based Solr processors

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2561#discussion_r178643673
  
    --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrRecord.java ---
    @@ -0,0 +1,373 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.nifi.processors.solr;
    +
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.MalformedRecordException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.util.StopWatch;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.solr.client.solrj.SolrServerException;
    +import org.apache.solr.client.solrj.request.UpdateRequest;
    +import org.apache.solr.client.solrj.response.UpdateResponse;
    +import org.apache.solr.common.SolrException;
    +import org.apache.solr.common.SolrInputDocument;
    +import org.apache.solr.common.params.ModifiableSolrParams;
    +import org.apache.solr.common.params.MultiMapSolrParams;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.SortedMap;
    +import java.util.TreeMap;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD;
    +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME;
    +import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION;
    +import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_CLOUD;
    +import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE;
    +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.writeRecord;
    +
    +
    +@Tags({"Apache", "Solr", "Put", "Send","Record"})
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@CapabilityDescription("Indexes the Records from a FlowFile into Solr")
    +@DynamicProperty(name="A Solr request parameter name", value="A Solr request parameter value",
    +        description="These parameters will be passed to Solr on the request")
    +public class PutSolrRecord extends SolrProcessor {
    +
    +    public static final PropertyDescriptor UPDATE_PATH = new PropertyDescriptor
    +            .Builder().name("Solr Update Path").displayName("Solr Update Path")
    +            .description("The path in Solr to post the Flowfile Records")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("/update")
    +            .build();
    +
    +    public static final PropertyDescriptor FIELDS_TO_INDEX  = new PropertyDescriptor
    +            .Builder().name("Fields To Index").displayName("Fields To Index")
    +            .displayName("Fields To Index")
    +            .description("Comma-separated list of field names to write")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    public static final PropertyDescriptor COMMIT_WITHIN = new PropertyDescriptor
    +            .Builder().name("Commit Within").displayName("Commit Within")
    +            .description("The number of milliseconds before the given update is committed")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("5000")
    +            .build();
    +
    +    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor
    +            .Builder().name("Batch Size").displayName("Batch Size")
    +            .description("The number of solr documents to index per batch")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("500")
    +            .build();
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("The original FlowFile")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("FlowFiles that failed for any reason other than Solr being unreachable")
    +            .build();
    +
    +    public static final Relationship REL_CONNECTION_FAILURE = new Relationship.Builder()
    +            .name("connection_failure")
    +            .description("FlowFiles that failed because Solr is unreachable")
    +            .build();
    +
    +    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
    +            .name("put-solr-record-record-reader").displayName("put-solr-record-record-reader")
    +            .displayName("Record Reader")
    +            .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.")
    +            .identifiesControllerService(RecordReaderFactory.class)
    +            .required(true)
    +            .build();
    +
    +    public static final String COLLECTION_PARAM_NAME = "collection";
    +    public static final String COMMIT_WITHIN_PARAM_NAME = "commitWithin";
    +    public static final String REPEATING_PARAM_PATTERN = "\\w+\\.\\d+";
    +    public  final ComponentLog logger = getLogger();
    +
    +    private Set<Relationship> relationships;
    +    private List<PropertyDescriptor> descriptors;
    +    private static final String EMPTY_STRING = "";
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        super.init(context);
    +
    +        final List<PropertyDescriptor> descriptors = new ArrayList<>();
    +        descriptors.add(SOLR_TYPE);
    +        descriptors.add(SOLR_LOCATION);
    +        descriptors.add(COLLECTION);
    +        descriptors.add(UPDATE_PATH);
    +        descriptors.add(RECORD_READER);
    +        descriptors.add(FIELDS_TO_INDEX);
    +        descriptors.add(COMMIT_WITHIN);
    +        descriptors.add(JAAS_CLIENT_APP_NAME);
    +        descriptors.add(BASIC_USERNAME);
    +        descriptors.add(BASIC_PASSWORD);
    +        descriptors.add(SSL_CONTEXT_SERVICE);
    +        descriptors.add(SOLR_SOCKET_TIMEOUT);
    +        descriptors.add(SOLR_CONNECTION_TIMEOUT);
    +        descriptors.add(SOLR_MAX_CONNECTIONS);
    +        descriptors.add(SOLR_MAX_CONNECTIONS_PER_HOST);
    +        descriptors.add(ZK_CLIENT_TIMEOUT);
    +        descriptors.add(ZK_CONNECTION_TIMEOUT);
    +        descriptors.add(BATCH_SIZE);
    +        this.descriptors = Collections.unmodifiableList(descriptors);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        relationships.add(REL_FAILURE);
    +        relationships.add(REL_CONNECTION_FAILURE);
    +        this.relationships = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return this.relationships;
    +    }
    +
    +    @Override
    +    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return this.descriptors;
    +    }
    +
    +    @Override
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +                .description("Specifies the value to send for the '" + propertyDescriptorName + "' request parameter")
    +                .name(propertyDescriptorName)
    +                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +                .dynamic(true)
    +                .expressionLanguageSupported(true)
    +                .build();
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if ( flowFile == null ) {
    +            return;
    +        }
    +
    +        final AtomicReference<Exception> error = new AtomicReference<>(null);
    +        final AtomicReference<Exception> connectionError = new AtomicReference<>(null);
    +
    +        final boolean isSolrCloud = SOLR_TYPE_CLOUD.getValue().equals(context.getProperty(SOLR_TYPE).getValue());
    +        final String collection = context.getProperty(COLLECTION).evaluateAttributeExpressions(flowFile).getValue();
    +        final Long commitWithin = context.getProperty(COMMIT_WITHIN).evaluateAttributeExpressions(flowFile).asLong();
    +        final String contentStreamPath = context.getProperty(UPDATE_PATH).evaluateAttributeExpressions(flowFile).getValue();
    +        final MultiMapSolrParams requestParams = new MultiMapSolrParams(getRequestParams(context, flowFile));
    +        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
    +        final String fieldsToIndex = context.getProperty(FIELDS_TO_INDEX).evaluateAttributeExpressions(flowFile).getValue();
    +        final Long batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(flowFile).asLong();
    +
    +        final List<String> fieldList = new ArrayList<>();
    +        if (!StringUtils.isBlank(fieldsToIndex)) {
    +            fieldList.addAll(Arrays.asList(fieldsToIndex.trim().split("[,]")));
    +        }
    +        StopWatch timer = new StopWatch(true);
    +        try (final InputStream in = session.read(flowFile);
    +             final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
    +
    +            Record record;
    +            List<SolrInputDocument> inputDocumentList = new LinkedList<>();
    +            try {
    +                while ((record = reader.nextRecord()) != null) {
    +                SolrInputDocument inputDoc = new SolrInputDocument();
    +                writeRecord(record, inputDoc,fieldList,EMPTY_STRING);
    +                inputDocumentList.add(inputDoc);
    +                if(inputDocumentList.size()==batchSize) {
    +                    index(isSolrCloud, collection, commitWithin, contentStreamPath, requestParams, inputDocumentList);
    +                    inputDocumentList = new ArrayList<>();
    +                }
    +               index(isSolrCloud, collection, commitWithin, contentStreamPath, requestParams, inputDocumentList);
    +            }
    +            } catch (SolrException e) {
    +                error.set(e);
    +            } catch (SolrServerException e) {
    +                if (causedByIOException(e)) {
    +                    //Exit in case of a solr connection error
    +                    connectionError.set(e);
    +                } else {
    +                    error.set(e);
    +                }
    +            } catch (IOException e) {
    +                //Exit in case of a solr connection error
    +                connectionError.set(e);
    +            }
    +        } catch (final IOException | SchemaNotFoundException | MalformedRecordException e) {
    +            logger.error("Could not parse incoming data", e);
    --- End diff --
    
    Change to getLogger().error


---

[GitHub] nifi pull request #2561: NIFI-4035 Implement record-based Solr processors

Posted by abhinavrohatgi30 <gi...@git.apache.org>.
Github user abhinavrohatgi30 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2561#discussion_r175883171
  
    --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java ---
    @@ -280,5 +291,115 @@ public SolrInputDocument toSolrInputDocument(SolrDocument d) {
             }
         }
     
    +    /**
    +     * Writes each Record as a SolrInputDocument.
    +     */
    +    public static void writeRecord(final Record record, final RecordSchema writeSchema, final SolrInputDocument inputDocument,final List<String> fieldsToIndex)
    +            throws IOException {
    +        RecordSchema schema = record.getSchema();
    +
    +        for (int i = 0; i < schema.getFieldCount(); i++) {
    +            final RecordField field = schema.getField(i);
    +            final String fieldName = field.getFieldName();
    +            final Object value = record.getValue(field);
    +            if (value == null || (!fieldsToIndex.isEmpty() && !fieldsToIndex.contains(fieldName))) {
    +                continue;
    +            }else {
    +                final DataType dataType = schema.getDataType(fieldName).get();
    +                writeValue(inputDocument, value, fieldName, dataType,fieldsToIndex);
    +            }
    +        }
    +    }
     
    +    private static void writeValue(final SolrInputDocument inputDocument, final Object value, final String fieldName, final DataType dataType,final List<String> fieldsToIndex) throws IOException {
    +        final DataType chosenDataType = dataType.getFieldType() == RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType(value, (ChoiceDataType) dataType) : dataType;
    +        final Object coercedValue = DataTypeUtils.convertType(value, chosenDataType, fieldName);
    +        if (coercedValue == null) {
    +            return;
    +        }
    +
    +        switch (chosenDataType.getFieldType()) {
    +            case DATE: {
    +                final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat()));
    +                if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
    +                    LocalDate localDate = getLocalDateFromEpochTime(fieldName, coercedValue);
    +                    inputDocument.addField(fieldName,localDate.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                } else {
    +                    inputDocument.addField(fieldName,LocalDate.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                }
    +                break;
    +            }
    +            case TIMESTAMP: {
    +                final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat()));
    +                if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
    +                    LocalDateTime localDateTime = getLocalDateTimeFromEpochTime(fieldName, coercedValue);
    +                    inputDocument.addField(fieldName,localDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                } else {
    +                    inputDocument.addField(fieldName,LocalDateTime.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                }
    +                break;
    +            }
    +            case DOUBLE:
    +                inputDocument.addField(fieldName,DataTypeUtils.toDouble(coercedValue, fieldName));
    +                break;
    +            case FLOAT:
    +                inputDocument.addField(fieldName,DataTypeUtils.toFloat(coercedValue, fieldName));
    +                break;
    +            case LONG:
    +                inputDocument.addField(fieldName,DataTypeUtils.toLong(coercedValue, fieldName));
    +                break;
    +            case INT:
    +            case BYTE:
    +            case SHORT:
    +                inputDocument.addField(fieldName,DataTypeUtils.toInteger(coercedValue, fieldName));
    +                break;
    +            case CHAR:
    +            case STRING:
    +                inputDocument.addField(fieldName,coercedValue.toString());
    +                break;
    +            case BIGINT:
    +                if (coercedValue instanceof Long) {
    +                    inputDocument.addField(fieldName,(Long) coercedValue);
    +                } else {
    +                    inputDocument.addField(fieldName,(BigInteger) coercedValue);
    +                }
    +                break;
    +            case BOOLEAN:
    +                final String stringValue = coercedValue.toString();
    +                if ("true".equalsIgnoreCase(stringValue)) {
    +                    inputDocument.addField(fieldName,true);
    +                } else if ("false".equalsIgnoreCase(stringValue)) {
    +                    inputDocument.addField(fieldName,false);
    +                } else {
    +                    inputDocument.addField(fieldName,stringValue);
    +                }
    +                break;
    +            case RECORD: {
    --- End diff --
    
    Ok, I'll then just change the field names as discussed before and I'll leave the implementation as it is and give some examples as part of the additionalDetails.html


---

[GitHub] nifi pull request #2561: NIFI-4035 Implement record-based Solr processors

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2561#discussion_r175761850
  
    --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrRecord.java ---
    @@ -0,0 +1,351 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.nifi.processors.solr;
    +
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.MalformedRecordException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.util.StopWatch;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.solr.client.solrj.SolrServerException;
    +import org.apache.solr.client.solrj.request.UpdateRequest;
    +import org.apache.solr.client.solrj.response.UpdateResponse;
    +import org.apache.solr.common.SolrException;
    +import org.apache.solr.common.SolrInputDocument;
    +import org.apache.solr.common.params.ModifiableSolrParams;
    +import org.apache.solr.common.params.MultiMapSolrParams;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.SortedMap;
    +import java.util.TreeMap;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD;
    +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME;
    +import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION;
    +import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE;
    +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_CLOUD;
    +import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE;
    +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT;
    +import static org.apache.nifi.processors.solr.SolrUtils.writeRecord;
    +
    +
    +@Tags({"Apache", "Solr", "Put", "Send","Record"})
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@CapabilityDescription("Indexes the Records from a FlowFile into Solr")
    +@DynamicProperty(name="A Solr request parameter name", value="A Solr request parameter value",
    +        description="These parameters will be passed to Solr on the request")
    +public class PutSolrRecord extends SolrProcessor {
    +
    +    public static final PropertyDescriptor UPDATE_PATH = new PropertyDescriptor
    +            .Builder().name("Solr Update Path")
    +            .description("The path in Solr to post the Flowfile Records")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("/update")
    +            .build();
    +
    +    public static final PropertyDescriptor FIELDS_TO_INDEX  = new PropertyDescriptor
    +            .Builder().name("Fields To Index")
    +            .displayName("Fields To Index")
    +            .description("Comma-separated list of field names to write")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor COMMIT_WITHIN = new PropertyDescriptor
    +            .Builder().name("Commit Within")
    +            .description("The number of milliseconds before the given update is committed")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("5000")
    +            .build();
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("The original FlowFile")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("FlowFiles that failed for any reason other than Solr being unreachable")
    +            .build();
    +
    +    public static final Relationship REL_CONNECTION_FAILURE = new Relationship.Builder()
    +            .name("connection_failure")
    +            .description("FlowFiles that failed because Solr is unreachable")
    +            .build();
    +
    +    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
    +            .name("put-solr-record-record-reader")
    +            .displayName("Record Reader")
    +            .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.")
    +            .identifiesControllerService(RecordReaderFactory.class)
    +            .required(true)
    +            .build();
    +
    +    public static final String COLLECTION_PARAM_NAME = "collection";
    +    public static final String COMMIT_WITHIN_PARAM_NAME = "commitWithin";
    +    public static final String REPEATING_PARAM_PATTERN = "\\w+\\.\\d+";
    +    public  final ComponentLog logger = getLogger();
    +
    +    private Set<Relationship> relationships;
    +    private List<PropertyDescriptor> descriptors;
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        super.init(context);
    +
    +        final List<PropertyDescriptor> descriptors = new ArrayList<>();
    +        descriptors.add(SOLR_TYPE);
    +        descriptors.add(SOLR_LOCATION);
    +        descriptors.add(COLLECTION);
    +        descriptors.add(UPDATE_PATH);
    +        descriptors.add(RECORD_READER);
    +        descriptors.add(FIELDS_TO_INDEX);
    +        descriptors.add(COMMIT_WITHIN);
    +        descriptors.add(JAAS_CLIENT_APP_NAME);
    +        descriptors.add(BASIC_USERNAME);
    +        descriptors.add(BASIC_PASSWORD);
    +        descriptors.add(SSL_CONTEXT_SERVICE);
    +        descriptors.add(SOLR_SOCKET_TIMEOUT);
    +        descriptors.add(SOLR_CONNECTION_TIMEOUT);
    +        descriptors.add(SOLR_MAX_CONNECTIONS);
    +        descriptors.add(SOLR_MAX_CONNECTIONS_PER_HOST);
    +        descriptors.add(ZK_CLIENT_TIMEOUT);
    +        descriptors.add(ZK_CONNECTION_TIMEOUT);
    +        this.descriptors = Collections.unmodifiableList(descriptors);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        relationships.add(REL_FAILURE);
    +        relationships.add(REL_CONNECTION_FAILURE);
    +        this.relationships = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return this.relationships;
    +    }
    +
    +    @Override
    +    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return this.descriptors;
    +    }
    +
    +    @Override
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +                .description("Specifies the value to send for the '" + propertyDescriptorName + "' request parameter")
    +                .name(propertyDescriptorName)
    +                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +                .dynamic(true)
    +                .expressionLanguageSupported(true)
    +                .build();
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if ( flowFile == null ) {
    +            return;
    +        }
    +
    +        final AtomicReference<Exception> error = new AtomicReference<>(null);
    +        final AtomicReference<Exception> connectionError = new AtomicReference<>(null);
    +
    +        final boolean isSolrCloud = SOLR_TYPE_CLOUD.getValue().equals(context.getProperty(SOLR_TYPE).getValue());
    +        final String collection = context.getProperty(COLLECTION).evaluateAttributeExpressions(flowFile).getValue();
    +        final Long commitWithin = context.getProperty(COMMIT_WITHIN).evaluateAttributeExpressions(flowFile).asLong();
    +        final String contentStreamPath = context.getProperty(UPDATE_PATH).evaluateAttributeExpressions(flowFile).getValue();
    +        final MultiMapSolrParams requestParams = new MultiMapSolrParams(getRequestParams(context, flowFile));
    +        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
    +        final String fieldsToIndex = context.getProperty(FIELDS_TO_INDEX).getValue();
    +
    +        final List<String> fieldList = new ArrayList<>();
    +        if (!StringUtils.isBlank(fieldsToIndex)) {
    +            fieldList.addAll(Arrays.asList(fieldsToIndex.trim().split("[,]")));
    +        }
    +        StopWatch timer = new StopWatch(true);
    +        try (final InputStream in = session.read(flowFile);
    +             final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
    +
    +            Record record;
    +            List<SolrInputDocument> inputDocumentList = new LinkedList<>();
    --- End diff --
    
    A big benefit of record-processing in NiFi is to pass around flow files with many records in them (thousands or millions) and thus avoid having many small flow file... given that, we probably want to avoid reading all of the records into a list here, as that would mean the entire content of the flow file would essentially be read into memory. 
    
    I would suggest some kind of batch size property like 500 or 1,000, and every time the batch size is reached then send a batch to Solr and start a new batch.


---

[GitHub] nifi pull request #2561: NIFI-4035 Implement record-based Solr processors

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2561#discussion_r178644182
  
    --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/docs/org.apache.nifi.processors.solr.PutSolrRecord/additionalDetails.html ---
    @@ -0,0 +1,108 @@
    +<!DOCTYPE html>
    +<html lang="en">
    +<!--
    +  Licensed to the Apache Software Foundation (ASF) under one or more
    +  contributor license agreements.  See the NOTICE file distributed with
    +  this work for additional information regarding copyright ownership.
    +  The ASF licenses this file to You under the Apache License, Version 2.0
    +  (the "License"); you may not use this file except in compliance with
    +  the License.  You may obtain a copy of the License at
    +      http://www.apache.org/licenses/LICENSE-2.0
    +  Unless required by applicable law or agreed to in writing, software
    +  distributed under the License is distributed on an "AS IS" BASIS,
    +  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +  See the License for the specific language governing permissions and
    +  limitations under the License.
    +-->
    +<head>
    +    <meta charset="utf-8" />
    +    <title>PutSolrRecord</title>
    +    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
    +</head>
    +
    +<body>
    +<h2>Usage Example</h2>
    +<p>
    +    This processor reads the NiFi record and indexes it into Solr as a SolrDocument.
    +    Any properties added to this processor by the user are
    +    passed to Solr on the update request. It is required that the input record reader
    +    should be specified for this processor. Additionally, if only selected fields of a record are to be indexed
    +    you can specify the field name as a comma-separated list under the fields property.
    +</p>
    +<p>
    +    Example: To specify specific fields of the record  to be indexed:
    +</p>
    +<ul>
    +    <li><strong>Fields To Index</strong>: field1,field2,field3</li>
    +</ul>
    +<p>
    +    <strong>NOTE:</strong> In case of nested the field names should be prefixed with the parent field name.
    +</p>
    +<ul>
    +    <li><strong>Fields To Index</strong>: parentField1,parentField2,<strong>parentField3_childField1</strong>,<strong>parentField3_childField2</strong></li>
    +</ul>
    +<p>
    +   In case of nested records, this processor would flatten all the nested records into a single solr document, the field name of the field in a child document would follow the format of <strong>{Parent Field Name}_{Child Field Name}</strong>.
    +</p>
    +<p>
    +    Example:
    +    <strong>For a record created from the following json:</strong>
    --- End diff --
    
    Can we wrap all the JSON examples in pre elements so that they display like code-blocks when viewing the documentation?


---