You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2018/04/24 14:51:17 UTC

nifi git commit: NIFI-4035 - Adding PutSolrRecord Processor that reads NiFi records and indexes them into Solr as SolrDocuments

Repository: nifi
Updated Branches:
  refs/heads/master 0390c0f19 -> e3f472079


NIFI-4035 - Adding PutSolrRecord Processor that reads NiFi records and indexes them into Solr as SolrDocuments

Adding Test Cases for PutSolrRecord Processor

Adding PutSolrRecord Processor in the list of Processors

Resolving checkstyle errors

Resolving checkstyle errors in test classes

Adding License information and additional information about the processor

1. Implementing Batch Indexing 2. Changes for nested records 3. Removing MockRecordParser

Fixing bugs with nested records

Updating version of dependencies

Setting Expression Language Scope

This closes #2561.

Signed-off-by: Bryan Bende <bb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e3f47207
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e3f47207
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e3f47207

Branch: refs/heads/master
Commit: e3f4720797a9777264d1732b2e1475b8f344a8f0
Parents: 0390c0f
Author: abhinavrohatgi30 <ab...@gmail.com>
Authored: Sat Mar 17 16:35:06 2018 +0100
Committer: Bryan Bende <bb...@apache.org>
Committed: Tue Apr 24 10:50:40 2018 -0400

----------------------------------------------------------------------
 .../nifi-solr-processors/pom.xml                |   6 +
 .../nifi/processors/solr/PutSolrRecord.java     | 337 ++++++++
 .../apache/nifi/processors/solr/SolrUtils.java  | 140 +++-
 .../org.apache.nifi.processor.Processor         |   1 +
 .../additionalDetails.html                      | 114 +++
 .../nifi/processors/solr/TestPutSolrRecord.java | 832 +++++++++++++++++++
 .../solr/testCollection/conf/schema.xml         |  66 +-
 7 files changed, 1465 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/e3f47207/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml
index 5684f37..ea1499a 100755
--- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml
@@ -94,6 +94,12 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock-record-utils</artifactId>
+            <version>1.7.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-simple</artifactId>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/nifi/blob/e3f47207/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrRecord.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrRecord.java
new file mode 100644
index 0000000..0b3e107
--- /dev/null
+++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrRecord.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.solr;
+
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.UpdateResponse;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.MultiMapSolrParams;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD;
+import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME;
+import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION;
+import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME;
+import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT;
+import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION;
+import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS;
+import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST;
+import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT;
+import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE;
+import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_CLOUD;
+import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE;
+import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT;
+import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT;
+import static org.apache.nifi.processors.solr.SolrUtils.writeRecord;
+
+
+@Tags({"Apache", "Solr", "Put", "Send","Record"})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Indexes the Records from a FlowFile into Solr")
+@DynamicProperty(name="A Solr request parameter name", value="A Solr request parameter value",
+        description="These parameters will be passed to Solr on the request")
+public class PutSolrRecord extends SolrProcessor {
+
+    public static final PropertyDescriptor UPDATE_PATH = new PropertyDescriptor
+            .Builder().name("Solr Update Path").displayName("Solr Update Path")
+            .description("The path in Solr to post the Flowfile Records")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .defaultValue("/update")
+            .build();
+
+    public static final PropertyDescriptor FIELDS_TO_INDEX  = new PropertyDescriptor
+            .Builder().name("Fields To Index").displayName("Fields To Index")
+            .displayName("Fields To Index")
+            .description("Comma-separated list of field names to write")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor COMMIT_WITHIN = new PropertyDescriptor
+            .Builder().name("Commit Within").displayName("Commit Within")
+            .description("The number of milliseconds before the given update is committed")
+            .required(false)
+            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .defaultValue("5000")
+            .build();
+
+    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor
+            .Builder().name("Batch Size").displayName("Batch Size")
+            .description("The number of solr documents to index per batch")
+            .required(false)
+            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .defaultValue("500")
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("The original FlowFile")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("FlowFiles that failed for any reason other than Solr being unreachable")
+            .build();
+
+    public static final Relationship REL_CONNECTION_FAILURE = new Relationship.Builder()
+            .name("connection_failure")
+            .description("FlowFiles that failed because Solr is unreachable")
+            .build();
+
+    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("put-solr-record-record-reader").displayName("put-solr-record-record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    public static final String COLLECTION_PARAM_NAME = "collection";
+    public static final String COMMIT_WITHIN_PARAM_NAME = "commitWithin";
+    public static final String REPEATING_PARAM_PATTERN = "\\w+\\.\\d+";
+
+    private Set<Relationship> relationships;
+    private List<PropertyDescriptor> descriptors;
+    private static final String EMPTY_STRING = "";
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        super.init(context);
+
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(SOLR_TYPE);
+        descriptors.add(SOLR_LOCATION);
+        descriptors.add(COLLECTION);
+        descriptors.add(UPDATE_PATH);
+        descriptors.add(RECORD_READER);
+        descriptors.add(FIELDS_TO_INDEX);
+        descriptors.add(COMMIT_WITHIN);
+        descriptors.add(JAAS_CLIENT_APP_NAME);
+        descriptors.add(BASIC_USERNAME);
+        descriptors.add(BASIC_PASSWORD);
+        descriptors.add(SSL_CONTEXT_SERVICE);
+        descriptors.add(SOLR_SOCKET_TIMEOUT);
+        descriptors.add(SOLR_CONNECTION_TIMEOUT);
+        descriptors.add(SOLR_MAX_CONNECTIONS);
+        descriptors.add(SOLR_MAX_CONNECTIONS_PER_HOST);
+        descriptors.add(ZK_CLIENT_TIMEOUT);
+        descriptors.add(ZK_CONNECTION_TIMEOUT);
+        descriptors.add(BATCH_SIZE);
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        relationships.add(REL_CONNECTION_FAILURE);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return this.descriptors;
+    }
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .description("Specifies the value to send for the '" + propertyDescriptorName + "' request parameter")
+                .name(propertyDescriptorName)
+                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                .dynamic(true)
+                .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                .build();
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if ( flowFile == null ) {
+            return;
+        }
+
+        final AtomicReference<Exception> error = new AtomicReference<>(null);
+        final AtomicReference<Exception> connectionError = new AtomicReference<>(null);
+
+        final boolean isSolrCloud = SOLR_TYPE_CLOUD.getValue().equals(context.getProperty(SOLR_TYPE).getValue());
+        final String collection = context.getProperty(COLLECTION).evaluateAttributeExpressions(flowFile).getValue();
+        final Long commitWithin = context.getProperty(COMMIT_WITHIN).evaluateAttributeExpressions(flowFile).asLong();
+        final String contentStreamPath = context.getProperty(UPDATE_PATH).evaluateAttributeExpressions(flowFile).getValue();
+        final MultiMapSolrParams requestParams = new MultiMapSolrParams(SolrUtils.getRequestParams(context, flowFile));
+        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final String fieldsToIndex = context.getProperty(FIELDS_TO_INDEX).evaluateAttributeExpressions(flowFile).getValue();
+        final Long batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(flowFile).asLong();
+
+        final List<String> fieldList = new ArrayList<>();
+        if (!StringUtils.isBlank(fieldsToIndex)) {
+            Arrays.stream(fieldsToIndex.split(",")).forEach( f -> fieldList.add(f.trim()));
+        }
+        StopWatch timer = new StopWatch(true);
+        try (final InputStream in = session.read(flowFile);
+             final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
+
+            Record record;
+            List<SolrInputDocument> inputDocumentList = new LinkedList<>();
+            try {
+                while ((record = reader.nextRecord()) != null) {
+                SolrInputDocument inputDoc = new SolrInputDocument();
+                writeRecord(record, inputDoc,fieldList,EMPTY_STRING);
+                inputDocumentList.add(inputDoc);
+                if(inputDocumentList.size()==batchSize) {
+                    index(isSolrCloud, collection, commitWithin, contentStreamPath, requestParams, inputDocumentList);
+                    inputDocumentList = new ArrayList<>();
+                }
+               index(isSolrCloud, collection, commitWithin, contentStreamPath, requestParams, inputDocumentList);
+            }
+            } catch (SolrException e) {
+                error.set(e);
+            } catch (SolrServerException e) {
+                if (causedByIOException(e)) {
+                    //Exit in case of a solr connection error
+                    connectionError.set(e);
+                } else {
+                    error.set(e);
+                }
+            } catch (IOException e) {
+                //Exit in case of a solr connection error
+                connectionError.set(e);
+            }
+        } catch (final IOException | SchemaNotFoundException | MalformedRecordException e) {
+            getLogger().error("Could not parse incoming data", e);
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+        timer.stop();
+
+        if (error.get() != null) {
+            getLogger().error("Failed to send all the records of the {} to Solr due to {}; routing to failure",
+                    new Object[]{flowFile, error.get()});
+            session.transfer(flowFile, REL_FAILURE);
+        } else if (connectionError.get() != null) {
+            getLogger().error("Failed to send {} to Solr due to {}; routing to connection_failure",
+                    new Object[]{flowFile, connectionError.get()});
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_CONNECTION_FAILURE);
+        } else {
+            StringBuilder transitUri = new StringBuilder("solr://");
+            transitUri.append(getSolrLocation());
+            if (isSolrCloud) {
+                transitUri.append(":").append(collection);
+            }
+
+            final long duration = timer.getDuration(TimeUnit.MILLISECONDS);
+            session.getProvenanceReporter().send(flowFile, transitUri.toString(), duration, true);
+            getLogger().info("Successfully sent {} to Solr in {} millis", new Object[]{flowFile, duration});
+            session.transfer(flowFile, REL_SUCCESS);
+        }
+    }
+
+    private void index(boolean isSolrCloud, String collection, Long commitWithin, String contentStreamPath, MultiMapSolrParams requestParams, List<SolrInputDocument> inputDocumentList)
+            throws IOException, SolrServerException,SolrException {
+        UpdateRequest request = new UpdateRequest(contentStreamPath);
+        request.setParams(new ModifiableSolrParams());
+
+        // add the extra params, don't use 'set' in case of repeating params
+        Iterator<String> paramNames = requestParams.getParameterNamesIterator();
+        while (paramNames.hasNext()) {
+            String paramName = paramNames.next();
+            for (String paramValue : requestParams.getParams(paramName)) {
+                request.getParams().add(paramName, paramValue);
+            }
+        }
+
+        // specify the collection for SolrCloud
+        if (isSolrCloud) {
+            request.setParam(COLLECTION_PARAM_NAME, collection);
+        }
+
+        if (commitWithin != null && commitWithin > 0) {
+            request.setParam(COMMIT_WITHIN_PARAM_NAME, commitWithin.toString());
+        }
+
+        // if a username and password were provided then pass them for basic auth
+        if (isBasicAuthEnabled()) {
+            request.setBasicAuthCredentials(getUsername(), getPassword());
+        }
+        request.add(inputDocumentList);
+        UpdateResponse response = request.process(getSolrClient());
+        getLogger().debug("Got {} response from Solr", new Object[]{response.getStatus()});
+        inputDocumentList.clear();
+    }
+
+    private boolean causedByIOException(SolrServerException e) {
+        boolean foundIOException = false;
+        Throwable cause = e.getCause();
+        while (cause != null) {
+            if (cause instanceof IOException) {
+                foundIOException = true;
+                break;
+            }
+            cause = cause.getCause();
+        }
+        return foundIOException;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e3f47207/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java
index ae83b1c..2b324ec 100755
--- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java
+++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java
@@ -33,6 +33,7 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.ListRecordSet;
 import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.Record;
@@ -40,6 +41,8 @@ import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
@@ -56,7 +59,13 @@ import org.apache.solr.common.params.MultiMapSolrParams;
 import javax.net.ssl.SSLContext;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.math.BigInteger;
 import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
@@ -223,7 +232,7 @@ public class SolrUtils {
             httpClient.getConnectionManager().getSchemeRegistry().register(httpsScheme);
         }
 
-        if (SOLR_TYPE_STANDARD.equals(context.getProperty(SOLR_TYPE).getValue())) {
+        if (SOLR_TYPE_STANDARD.getValue().equals(context.getProperty(SOLR_TYPE).getValue())) {
             return new HttpSolrClient(solrLocation, httpClient);
         } else {
             final String collection = context.getProperty(COLLECTION).evaluateAttributeExpressions().getValue();
@@ -325,4 +334,133 @@ public class SolrUtils {
 
         return paramsMap;
     }
+
+    /**
+     * Writes each Record as a SolrInputDocument.
+     */
+    public static void writeRecord(final Record record, final SolrInputDocument inputDocument,final List<String> fieldsToIndex,String parentFieldName)
+            throws IOException {
+        RecordSchema schema = record.getSchema();
+
+        for (int i = 0; i < schema.getFieldCount(); i++) {
+            final RecordField field = schema.getField(i);
+            String fieldName;
+            if(!StringUtils.isBlank(parentFieldName)) {
+                // Prefixing parent field name
+                fieldName = parentFieldName+"_"+field.getFieldName();
+            }else{
+                fieldName = field.getFieldName();
+            }
+            final Object value = record.getValue(field);
+            if (value == null) {
+                continue;
+            }else {
+                final DataType dataType = schema.getDataType(field.getFieldName()).get();
+                writeValue(inputDocument, value, fieldName, dataType,fieldsToIndex);
+            }
+        }
+    }
+
+    private static void writeValue(final SolrInputDocument inputDocument, final Object value, final String fieldName, final DataType dataType,final List<String> fieldsToIndex) throws IOException {
+        final DataType chosenDataType = dataType.getFieldType() == RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType(value, (ChoiceDataType) dataType) : dataType;
+        final Object coercedValue = DataTypeUtils.convertType(value, chosenDataType, fieldName);
+        if (coercedValue == null) {
+            return;
+        }
+
+        switch (chosenDataType.getFieldType()) {
+            case DATE: {
+                final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat()));
+                if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
+                    LocalDate localDate = getLocalDateFromEpochTime(fieldName, coercedValue);
+                    addFieldToSolrDocument(inputDocument,fieldName,localDate.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z',fieldsToIndex);
+                } else {
+                    addFieldToSolrDocument(inputDocument,fieldName,LocalDate.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z',fieldsToIndex);
+                }
+                break;
+            }
+            case TIMESTAMP: {
+                final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat()));
+                if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
+                    LocalDateTime localDateTime = getLocalDateTimeFromEpochTime(fieldName, coercedValue);
+                    addFieldToSolrDocument(inputDocument,fieldName,localDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z',fieldsToIndex);
+                } else {
+                    addFieldToSolrDocument(inputDocument,fieldName,LocalDateTime.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z',fieldsToIndex);
+                }
+                break;
+            }
+            case DOUBLE:
+                addFieldToSolrDocument(inputDocument,fieldName,DataTypeUtils.toDouble(coercedValue, fieldName),fieldsToIndex);
+                break;
+            case FLOAT:
+                addFieldToSolrDocument(inputDocument,fieldName,DataTypeUtils.toFloat(coercedValue, fieldName),fieldsToIndex);
+                break;
+            case LONG:
+                addFieldToSolrDocument(inputDocument,fieldName,DataTypeUtils.toLong(coercedValue, fieldName),fieldsToIndex);
+                break;
+            case INT:
+            case BYTE:
+            case SHORT:
+                addFieldToSolrDocument(inputDocument,fieldName,DataTypeUtils.toInteger(coercedValue, fieldName),fieldsToIndex);
+                break;
+            case CHAR:
+            case STRING:
+                addFieldToSolrDocument(inputDocument,fieldName,coercedValue.toString(),fieldsToIndex);
+                break;
+            case BIGINT:
+                if (coercedValue instanceof Long) {
+                    addFieldToSolrDocument(inputDocument,fieldName,(Long) coercedValue,fieldsToIndex);
+                } else {
+                    addFieldToSolrDocument(inputDocument,fieldName,(BigInteger)coercedValue,fieldsToIndex);
+                }
+                break;
+            case BOOLEAN:
+                final String stringValue = coercedValue.toString();
+                if ("true".equalsIgnoreCase(stringValue)) {
+                    addFieldToSolrDocument(inputDocument,fieldName,true,fieldsToIndex);
+                } else if ("false".equalsIgnoreCase(stringValue)) {
+                    addFieldToSolrDocument(inputDocument,fieldName,false,fieldsToIndex);
+                } else {
+                    addFieldToSolrDocument(inputDocument,fieldName,stringValue,fieldsToIndex);
+                }
+                break;
+            case RECORD: {
+                final Record record = (Record) coercedValue;
+                writeRecord(record, inputDocument,fieldsToIndex,fieldName);
+                break;
+            }
+            case ARRAY:
+            default:
+                if (coercedValue instanceof Object[]) {
+                    final Object[] values = (Object[]) coercedValue;
+                    for(Object element : values){
+                        if(element instanceof  Record){
+                            writeRecord((Record)element,inputDocument,fieldsToIndex,fieldName);
+                        }else{
+                            addFieldToSolrDocument(inputDocument,fieldName,coercedValue.toString(),fieldsToIndex);
+                        }
+                    }
+                } else {
+                    addFieldToSolrDocument(inputDocument,fieldName,coercedValue.toString(),fieldsToIndex);
+                }
+                break;
+        }
+    }
+
+    private static void addFieldToSolrDocument(SolrInputDocument inputDocument,String fieldName,Object fieldValue,List<String> fieldsToIndex){
+        if ((!fieldsToIndex.isEmpty() && fieldsToIndex.contains(fieldName)) || fieldsToIndex.isEmpty()){
+            inputDocument.addField(fieldName, fieldValue);
+        }
+    }
+
+    private static LocalDate getLocalDateFromEpochTime(String fieldName, Object coercedValue) {
+        Long date = DataTypeUtils.toLong(coercedValue, fieldName);
+        return Instant.ofEpochMilli(date).atZone(ZoneId.systemDefault()).toLocalDate();
+    }
+
+    private static LocalDateTime getLocalDateTimeFromEpochTime(String fieldName, Object coercedValue) {
+        Long date = DataTypeUtils.toLong(coercedValue, fieldName);
+        return Instant.ofEpochMilli(date).atZone(ZoneId.systemDefault()).toLocalDateTime();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e3f47207/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index cc05423..e9cff2b 100644
--- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -13,5 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 org.apache.nifi.processors.solr.PutSolrContentStream
+org.apache.nifi.processors.solr.PutSolrRecord
 org.apache.nifi.processors.solr.GetSolr
 org.apache.nifi.processors.solr.QuerySolr

http://git-wip-us.apache.org/repos/asf/nifi/blob/e3f47207/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/docs/org.apache.nifi.processors.solr.PutSolrRecord/additionalDetails.html
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/docs/org.apache.nifi.processors.solr.PutSolrRecord/additionalDetails.html b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/docs/org.apache.nifi.processors.solr.PutSolrRecord/additionalDetails.html
new file mode 100644
index 0000000..12315b9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/docs/org.apache.nifi.processors.solr.PutSolrRecord/additionalDetails.html
@@ -0,0 +1,114 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>PutSolrRecord</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+</head>
+
+<body>
+<h2>Usage Example</h2>
+<p>
+    This processor reads the NiFi record and indexes it into Solr as a SolrDocument.
+    Any properties added to this processor by the user are
+    passed to Solr on the update request. It is required that the input record reader
+    should be specified for this processor. Additionally, if only selected fields of a record are to be indexed
+    you can specify the field name as a comma-separated list under the fields property.
+</p>
+<p>
+    Example: To specify specific fields of the record  to be indexed:
+</p>
+<ul>
+    <li><strong>Fields To Index</strong>: field1,field2,field3</li>
+</ul>
+<p>
+    <strong>NOTE:</strong> In case of nested the field names should be prefixed with the parent field name.
+</p>
+<ul>
+    <li><strong>Fields To Index</strong>: parentField1,parentField2,<strong>parentField3_childField1</strong>,<strong>parentField3_childField2</strong></li>
+</ul>
+<p>
+   In case of nested records, this processor would flatten all the nested records into a single solr document, the field name of the field in a child document would follow the format of <strong>{Parent Field Name}_{Child Field Name}</strong>.
+</p>
+<p>
+    Example:
+    <strong>For a record created from the following json:</strong><br/>
+</p>
+<pre>
+    {
+        "first": "Abhi",
+        "last": "R",
+        "grade": 8,
+        "exams": {
+            "subject": "Maths",
+            "test" : "term1",
+            "marks" : 90
+        }
+    }
+</pre>
+<p>
+    <strong>The corresponding solr document would be represented  as below:</strong><br/>
+</p>
+<pre>
+    {
+        "first": "Abhi",
+        "last": "R",
+        "grade": 8,
+        "exams_subject": "Maths",
+        "exams_test" : "term1",
+        "exams_marks" : 90
+    }
+</pre>
+<p>
+    Similarly in case of an array of  nested records, this processor would flatten all the nested records into a single solr document, the field name of the field in a child document would follow the format of <strong>{Parent Field Name}_{Child Field Name}</strong> and would be a multivalued field in the solr document.
+    Example:
+    <strong>For a record created from the following json:</strong><br/>
+</p>
+<pre>
+    {
+    "first": "Abhi",
+    "last": "R",
+    "grade": 8,
+    "exams": [
+        {
+            "subject": "Maths",
+            "test" : "term1",
+            "marks" : 90
+        },
+        {
+            "subject": "Physics",
+            "test" : "term1",
+            "marks" : 95
+        }
+    ]
+    }
+</pre>
+<p>
+    <strong>The corresponding solr document would be represented  as below:</strong><br/>
+</p>
+<pre>
+    {
+        "first": "Abhi",
+        "last": "R",
+        "grade": 8,
+        "exams_subject": ["Maths","Physics"]
+        "exams_test" : ["term1","term1"]
+        "exams_marks" : [90,95]
+    }
+</pre>
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/nifi/blob/e3f47207/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrRecord.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrRecord.java
new file mode 100644
index 0000000..17801aa
--- /dev/null
+++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrRecord.java
@@ -0,0 +1,832 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.solr;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.impl.Krb5HttpClientConfigurer;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.StringUtils;
+import org.apache.solr.common.util.NamedList;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import javax.net.ssl.SSLContext;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test for PutSolrRecord Processor
+ */
+public class TestPutSolrRecord {
+
+    private static final String DEFAULT_SOLR_CORE = "testCollection";
+
+    @Test
+    public void testPutSolrOnTriggerIndex() throws IOException, InitializationException, SolrServerException {
+        final SolrClient solrClient = createEmbeddedSolrClient(DEFAULT_SOLR_CORE);
+        TestableProcessor proc = new TestableProcessor(solrClient);
+
+        TestRunner runner= createDefaultTestRunner(proc);
+        MockRecordParser recordParser = new MockRecordParser();
+        runner.addControllerService("parser", recordParser);
+
+        runner.enableControllerService(recordParser);
+        runner.setProperty(PutSolrRecord.RECORD_READER, "parser");
+
+        runner.setProperty(PutSolrRecord.UPDATE_PATH, "/update");
+
+        recordParser.addSchemaField("id", RecordFieldType.INT);
+        recordParser.addSchemaField("first", RecordFieldType.STRING);
+        recordParser.addSchemaField("last", RecordFieldType.STRING);
+        recordParser.addSchemaField("grade", RecordFieldType.INT);
+        recordParser.addSchemaField("subject", RecordFieldType.STRING);
+        recordParser.addSchemaField("test", RecordFieldType.STRING);
+        recordParser.addSchemaField("marks", RecordFieldType.INT);
+
+        SolrDocument solrDocument = new SolrDocument();
+        solrDocument.put("id",1);
+        solrDocument.put("first","Abhinav");
+        solrDocument.put("last","R");
+        solrDocument.put("grade",8);
+        solrDocument.put("subject","Chemistry");
+        solrDocument.put("test","term1");
+        solrDocument.put("marks",98);
+
+        recordParser.addRecord(1, "Abhinav","R",8,"Chemistry","term1", 98);
+
+        try {
+            runner.enqueue(new byte[0], new HashMap<String, String>() {{
+                put("id", "1");
+            }});
+            runner.run(1, false);
+            verifySolrDocuments(proc.getSolrClient(), Collections.singletonList(solrDocument));
+            runner.assertTransferCount(PutSolrRecord.REL_FAILURE, 0);
+            runner.assertTransferCount(PutSolrRecord.REL_CONNECTION_FAILURE, 0);
+            runner.assertTransferCount(PutSolrRecord.REL_SUCCESS, 1);
+        } finally {
+            try {
+                proc.getSolrClient().close();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    @Test
+    public void testPutSolrOnTriggerIndexForANestedRecord() throws IOException, InitializationException, SolrServerException {
+        final SolrClient solrClient = createEmbeddedSolrClient(DEFAULT_SOLR_CORE);
+        TestableProcessor proc = new TestableProcessor(solrClient);
+
+        TestRunner runner= createDefaultTestRunner(proc);
+        MockRecordParser recordParser = new MockRecordParser();
+        runner.addControllerService("parser", recordParser);
+
+        runner.enableControllerService(recordParser);
+        runner.setProperty(PutSolrRecord.RECORD_READER, "parser");
+
+        runner.setProperty(PutSolrRecord.UPDATE_PATH, "/update");
+
+        recordParser.addSchemaField("id", RecordFieldType.INT);
+        recordParser.addSchemaField("first", RecordFieldType.STRING);
+        recordParser.addSchemaField("last", RecordFieldType.STRING);
+        recordParser.addSchemaField("grade", RecordFieldType.INT);
+        recordParser.addSchemaField("exam", RecordFieldType.RECORD);
+
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("subject", RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("test", RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("marks", RecordFieldType.INT.getDataType()));
+        RecordSchema schema = new SimpleRecordSchema(fields);
+
+        Map<String,Object> values = new HashMap<>();
+        values.put("subject","Chemistry");
+        values.put("test","term1");
+        values.put("marks",98);
+        final Record record = new MapRecord(schema,values);
+
+        recordParser.addRecord(1, "Abhinav","R",8,record);
+
+
+        SolrDocument solrDocument = new SolrDocument();
+        solrDocument.put("id",1);
+        solrDocument.put("first","Abhinav");
+        solrDocument.put("last","R");
+        solrDocument.put("grade",8);
+        solrDocument.put("exam_subject","Chemistry");
+        solrDocument.put("exam_test","term1");
+        solrDocument.put("exam_marks",98);
+
+        try {
+            runner.enqueue(new byte[0], new HashMap<String, String>() {{
+                put("id", "1");
+            }});
+            runner.run(1, false);
+            runner.assertTransferCount(PutSolrRecord.REL_FAILURE, 0);
+            runner.assertTransferCount(PutSolrRecord.REL_CONNECTION_FAILURE, 0);
+            runner.assertTransferCount(PutSolrRecord.REL_SUCCESS, 1);
+            verifySolrDocuments(proc.getSolrClient(), Collections.singletonList(solrDocument));
+        } finally {
+            try {
+                proc.getSolrClient().close();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+
+    @Test
+    public void testRecordParserExceptionShouldRoutToFailure() throws IOException, InitializationException, SolrServerException {
+        final SolrClient solrClient = createEmbeddedSolrClient(DEFAULT_SOLR_CORE);
+        TestableProcessor proc = new TestableProcessor(solrClient);
+
+        TestRunner runner= createDefaultTestRunner(proc);
+        MockRecordParser recordParser = new MockRecordParser();
+        runner.addControllerService("parser", recordParser);
+
+        runner.enableControllerService(recordParser);
+        runner.setProperty(PutSolrRecord.RECORD_READER, "parser");
+
+        runner.setProperty(PutSolrRecord.UPDATE_PATH, "/update");
+
+        recordParser.addSchemaField("id", RecordFieldType.INT);
+        recordParser.addSchemaField("first", RecordFieldType.STRING);
+        recordParser.addSchemaField("last", RecordFieldType.STRING);
+        recordParser.addSchemaField("grade", RecordFieldType.INT);
+
+        recordParser.failAfter(0);
+
+        try {
+            runner.enqueue(new byte[0], new HashMap<String, String>() {{
+                put("id", "1");
+            }});
+            runner.run(1, false);
+            runner.assertTransferCount(PutSolrRecord.REL_FAILURE, 1);
+            runner.assertTransferCount(PutSolrRecord.REL_CONNECTION_FAILURE, 0);
+            runner.assertTransferCount(PutSolrRecord.REL_SUCCESS, 0);
+            verifySolrDocuments(proc.getSolrClient(),Collections.EMPTY_LIST);
+        } finally {
+            try {
+                proc.getSolrClient().close();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    @Test
+    public void testPutSolrOnTriggerIndexForAnArrayOfNestedRecord() throws IOException, InitializationException, SolrServerException {
+        final SolrClient solrClient = createEmbeddedSolrClient(DEFAULT_SOLR_CORE);
+        TestableProcessor proc = new TestableProcessor(solrClient);
+
+        TestRunner runner= createDefaultTestRunner(proc);
+        MockRecordParser recordParser = new MockRecordParser();
+        runner.addControllerService("parser", recordParser);
+
+        runner.enableControllerService(recordParser);
+        runner.setProperty(PutSolrRecord.RECORD_READER, "parser");
+
+        runner.setProperty(PutSolrRecord.UPDATE_PATH, "/update");
+
+        recordParser.addSchemaField("id", RecordFieldType.INT);
+        recordParser.addSchemaField("first", RecordFieldType.STRING);
+        recordParser.addSchemaField("last", RecordFieldType.STRING);
+        recordParser.addSchemaField("grade", RecordFieldType.INT);
+        recordParser.addSchemaField("exams", RecordFieldType.ARRAY);
+
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("subject", RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("test", RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("marks", RecordFieldType.INT.getDataType()));
+        RecordSchema schema = new SimpleRecordSchema(fields);
+
+        Map<String,Object> values1 = new HashMap<>();
+        values1.put("subject","Chemistry");
+        values1.put("test","term1");
+        values1.put("marks",98);
+        final Record record1 = new MapRecord(schema,values1);
+
+        Map<String,Object> values2 = new HashMap<>();
+        values2.put("subject","Maths");
+        values2.put("test","term1");
+        values2.put("marks",98);
+        final Record record2 = new MapRecord(schema,values2);
+
+        recordParser.addRecord(1, "Abhinav","R",8,new Record[]{record1,record2});
+
+        SolrDocument solrDocument = new SolrDocument();
+        solrDocument.put("id",1);
+        solrDocument.put("first","Abhinav");
+        solrDocument.put("last","R");
+        solrDocument.put("grade",8);
+        solrDocument.put("exams_subject", Stream.of("Chemistry","Maths").collect(Collectors.toList()));
+        solrDocument.put("exams_test",Stream.of("term1","term1").collect(Collectors.toList()));
+        solrDocument.put("exams_marks",Stream.of(98,98).collect(Collectors.toList()));
+
+        try {
+            runner.enqueue(new byte[0], new HashMap<String, String>() {{
+                put("id", "1");
+            }});
+            runner.run(1, false);
+            verifySolrDocuments(solrClient,Collections.singletonList(solrDocument));
+            runner.assertTransferCount(PutSolrRecord.REL_FAILURE, 0);
+            runner.assertTransferCount(PutSolrRecord.REL_CONNECTION_FAILURE, 0);
+            runner.assertTransferCount(PutSolrRecord.REL_SUCCESS, 1);
+
+        }catch (Exception e){
+            e.printStackTrace();
+        }finally {
+            try {
+                proc.getSolrClient().close();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    @Test
+    public void testCollectionExpressionLanguage() throws IOException, SolrServerException, InitializationException {
+        final String collection = "collection1";
+        final CollectionVerifyingProcessor proc = new CollectionVerifyingProcessor(collection);
+
+        TestRunner runner = TestRunners.newTestRunner(proc);
+
+        MockRecordParser recordParser = new MockRecordParser();
+        runner.addControllerService("parser", recordParser);
+
+        runner.enableControllerService(recordParser);
+        runner.setProperty(PutSolrRecord.RECORD_READER, "parser");
+
+        recordParser.addSchemaField("id", RecordFieldType.INT);
+        recordParser.addSchemaField("first", RecordFieldType.STRING);
+        recordParser.addSchemaField("last", RecordFieldType.STRING);
+        recordParser.addSchemaField("grade", RecordFieldType.INT);
+        recordParser.addSchemaField("subject", RecordFieldType.STRING);
+        recordParser.addSchemaField("test", RecordFieldType.STRING);
+        recordParser.addSchemaField("marks", RecordFieldType.INT);
+
+
+        recordParser.addRecord(1, "Abhinav","R",8,"Chemistry","term1", 98);
+
+        runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_CLOUD.getValue());
+        runner.setProperty(SolrUtils.SOLR_LOCATION, "localhost:9983");
+        runner.setProperty(SolrUtils.COLLECTION, "${solr.collection}");
+
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put("solr.collection", collection);
+        attributes.put("id","1");
+        try {
+            runner.enqueue(new byte[0], attributes);
+            runner.run(1, false);
+            runner.assertTransferCount(PutSolrRecord.REL_FAILURE, 0);
+            runner.assertTransferCount(PutSolrRecord.REL_CONNECTION_FAILURE, 0);
+            runner.assertTransferCount(PutSolrRecord.REL_SUCCESS, 1);
+
+        } finally {
+            try {
+                proc.getSolrClient().close();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    @Test
+    public void testSolrServerExceptionShouldRouteToFailure() throws IOException, SolrServerException, InitializationException {
+        final Throwable throwable = new SolrServerException("Invalid Document");
+        final ExceptionThrowingProcessor proc = new ExceptionThrowingProcessor(throwable);
+
+        final TestRunner runner = createDefaultTestRunner(proc);
+        runner.setProperty(PutSolrRecord.UPDATE_PATH, "/update");
+
+        MockRecordParser recordParser = new MockRecordParser();
+
+        recordParser.addRecord(1, "Abhinav","R",8,"Chemistry","term1", 98);
+        runner.addControllerService("parser", recordParser);
+
+        runner.enableControllerService(recordParser);
+        runner.setProperty(PutSolrRecord.RECORD_READER, "parser");
+
+
+        try {
+            runner.enqueue(new byte[0], new HashMap<String, String>() {{
+                put("id", "1");
+            }});
+            runner.run(1,false);
+
+            runner.assertAllFlowFilesTransferred(PutSolrRecord.REL_FAILURE, 1);
+            verify(proc.getSolrClient(), times(1)).request(any(SolrRequest.class), eq((String)null));
+        }finally {
+            try {
+                proc.getSolrClient().close();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    @Test
+    public void testSolrServerExceptionCausedByIOExceptionShouldRouteToConnectionFailure() throws IOException, SolrServerException, InitializationException {
+        final Throwable throwable = new SolrServerException(new IOException("Error communicating with Solr"));
+        final ExceptionThrowingProcessor proc = new ExceptionThrowingProcessor(throwable);
+
+        final TestRunner runner = createDefaultTestRunner(proc);
+
+        runner.setProperty(PutSolrRecord.UPDATE_PATH, "/update");
+
+        MockRecordParser recordParser = new MockRecordParser();
+        recordParser.addRecord(1, "Abhinav","R",8,"Chemistry","term1", 98);
+        runner.addControllerService("parser", recordParser);
+
+        runner.enableControllerService(recordParser);
+        runner.setProperty(PutSolrRecord.RECORD_READER, "parser");
+
+        try {
+            runner.enqueue(new byte[0], new HashMap<String, String>() {{
+                put("id", "1");
+            }});
+            runner.run();
+
+            runner.assertAllFlowFilesTransferred(PutSolrRecord.REL_CONNECTION_FAILURE, 1);
+            verify(proc.getSolrClient(), times(1)).request(any(SolrRequest.class), eq((String)null));
+        }finally {
+            try {
+                proc.getSolrClient().close();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    @Test
+    public void testSolrExceptionShouldRouteToFailure() throws IOException, SolrServerException, InitializationException {
+        final Throwable throwable = new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Error");
+        final ExceptionThrowingProcessor proc = new ExceptionThrowingProcessor(throwable);
+
+        final TestRunner runner = createDefaultTestRunner(proc);
+        runner.setProperty(PutSolrRecord.UPDATE_PATH, "/update");
+
+        MockRecordParser recordParser = new MockRecordParser();
+        recordParser.addRecord(1, "Abhinav","R",8,"Chemistry","term1", 98);
+        runner.addControllerService("parser", recordParser);
+
+        runner.enableControllerService(recordParser);
+        runner.setProperty(PutSolrRecord.RECORD_READER, "parser");
+
+        try {
+            runner.enqueue(new byte[0], new HashMap<String, String>() {{
+                put("id", "1");
+            }});
+            runner.run();
+
+            runner.assertAllFlowFilesTransferred(PutSolrRecord.REL_FAILURE, 1);
+            verify(proc.getSolrClient(), times(1)).request(any(SolrRequest.class), eq((String)null));
+        }finally {
+            try {
+                proc.getSolrClient().close();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    @Test
+    public void testRemoteSolrExceptionShouldRouteToFailure() throws IOException, SolrServerException, InitializationException {
+        final Throwable throwable = new HttpSolrClient.RemoteSolrException(
+                "host", 401, "error", new NumberFormatException());
+        final ExceptionThrowingProcessor proc = new ExceptionThrowingProcessor(throwable);
+
+        final TestRunner runner = createDefaultTestRunner(proc);
+        runner.setProperty(PutSolrRecord.UPDATE_PATH, "/update");
+
+        MockRecordParser recordParser = new MockRecordParser();
+        recordParser.addRecord(1, "Abhinav","R",8,"Chemistry","term1", 98);
+        runner.addControllerService("parser", recordParser);
+
+        runner.enableControllerService(recordParser);
+        runner.setProperty(PutSolrRecord.RECORD_READER, "parser");
+
+        try {
+            runner.enqueue(new byte[0], new HashMap<String, String>() {{
+                put("id", "1");
+            }});
+            runner.run();
+
+            runner.assertAllFlowFilesTransferred(PutSolrRecord.REL_FAILURE, 1);
+            verify(proc.getSolrClient(), times(1)).request(any(SolrRequest.class), eq((String)null));
+        }finally {
+            try {
+                proc.getSolrClient().close();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    @Test
+    public void testIOExceptionShouldRouteToConnectionFailure() throws IOException, SolrServerException, InitializationException {
+        final Throwable throwable = new IOException("Error communicating with Solr");
+        final ExceptionThrowingProcessor proc = new ExceptionThrowingProcessor(throwable);
+
+        final TestRunner runner = createDefaultTestRunner(proc);
+        runner.setProperty(PutSolrRecord.UPDATE_PATH, "/update");
+
+        MockRecordParser recordParser = new MockRecordParser();
+        recordParser.addRecord(1, "Abhinav","R",8,"Chemistry","term1", 98);
+        runner.addControllerService("parser", recordParser);
+        runner.enableControllerService(recordParser);
+        runner.setProperty(PutSolrRecord.RECORD_READER, "parser");
+
+        try {
+            runner.enqueue(new byte[0], new HashMap<String, String>() {{
+                put("id", "1");
+            }});
+            runner.run();
+
+            runner.assertAllFlowFilesTransferred(PutSolrRecord.REL_CONNECTION_FAILURE, 1);
+            verify(proc.getSolrClient(), times(1)).request(any(SolrRequest.class), eq((String)null));
+        }finally {
+            try {
+                proc.getSolrClient().close();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    @Test
+    public void testSolrTypeCloudShouldRequireCollection() throws InitializationException {
+        final TestRunner runner = TestRunners.newTestRunner(PutSolrRecord.class);
+        MockRecordParser recordParser = new MockRecordParser();
+        recordParser.addRecord(1, "Abhinav","R",8,"Chemistry","term1", 98);
+        runner.addControllerService("parser", recordParser);
+        runner.enableControllerService(recordParser);
+        runner.setProperty(PutSolrRecord.RECORD_READER, "parser");
+        runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_CLOUD.getValue());
+        runner.setProperty(SolrUtils.SOLR_LOCATION, "http://localhost:8443/solr");
+        runner.assertNotValid();
+
+        runner.setProperty(SolrUtils.COLLECTION, "someCollection1");
+        runner.assertValid();
+    }
+
+
+    @Test
+    public void testSolrTypeStandardShouldNotRequireCollection() throws InitializationException {
+        final TestRunner runner = TestRunners.newTestRunner(PutSolrRecord.class);
+        MockRecordParser recordParser = new MockRecordParser();
+        recordParser.addRecord(1, "Abhinav","R",8,"Chemistry","term1", 98);
+        runner.addControllerService("parser", recordParser);
+        runner.enableControllerService(recordParser);
+        runner.setProperty(PutSolrRecord.RECORD_READER, "parser");
+        runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_STANDARD.getValue());
+        runner.setProperty(SolrUtils.SOLR_LOCATION, "http://localhost:8443/solr");
+        runner.assertValid();
+    }
+
+    @Test
+    public void testHttpsUrlShouldRequireSSLContext() throws InitializationException {
+        final TestRunner runner = TestRunners.newTestRunner(PutSolrRecord.class);
+        MockRecordParser recordParser = new MockRecordParser();
+        recordParser.addRecord(1, "Abhinav","R",8,"Chemistry","term1", 98);
+        runner.addControllerService("parser", recordParser);
+        runner.enableControllerService(recordParser);
+        runner.setProperty(PutSolrRecord.RECORD_READER, "parser");
+
+        runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_STANDARD.getValue());
+        runner.setProperty(SolrUtils.SOLR_LOCATION, "https://localhost:8443/solr");
+        runner.assertNotValid();
+
+        final SSLContextService sslContextService = new MockSSLContextService();
+        runner.addControllerService("ssl-context", sslContextService);
+        runner.enableControllerService(sslContextService);
+
+        runner.setProperty(SolrUtils.SSL_CONTEXT_SERVICE, "ssl-context");
+        runner.assertValid();
+    }
+
+    @Test
+    public void testHttpUrlShouldNotAllowSSLContext() throws InitializationException {
+        final TestRunner runner = TestRunners.newTestRunner(PutSolrRecord.class);
+        MockRecordParser recordParser = new MockRecordParser();
+        recordParser.addRecord(1, "Abhinav","R",8,"Chemistry","term1", 98);
+        runner.addControllerService("parser", recordParser);
+        runner.enableControllerService(recordParser);
+        runner.setProperty(PutSolrRecord.RECORD_READER, "parser");
+
+        runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_STANDARD.getValue());
+        runner.setProperty(SolrUtils.SOLR_LOCATION, "http://localhost:8443/solr");
+        runner.assertValid();
+
+        final SSLContextService sslContextService = new MockSSLContextService();
+        runner.addControllerService("ssl-context", sslContextService);
+        runner.enableControllerService(sslContextService);
+
+        runner.setProperty(SolrUtils.SSL_CONTEXT_SERVICE, "ssl-context");
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testUsernamePasswordValidation() throws InitializationException {
+        final TestRunner runner = TestRunners.newTestRunner(PutSolrRecord.class);
+        MockRecordParser recordParser = new MockRecordParser();
+        recordParser.addRecord(1, "Abhinav","R",8,"Chemistry","term1", 98);
+        runner.addControllerService("parser", recordParser);
+        runner.enableControllerService(recordParser);
+        runner.setProperty(PutSolrRecord.RECORD_READER, "parser");
+
+        runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_STANDARD.getValue());
+        runner.setProperty(SolrUtils.SOLR_LOCATION, "http://localhost:8443/solr");
+        runner.assertValid();
+
+        runner.setProperty(SolrUtils.BASIC_USERNAME, "user1");
+        runner.assertNotValid();
+
+        runner.setProperty(SolrUtils.BASIC_PASSWORD, "password");
+        runner.assertValid();
+
+        runner.setProperty(SolrUtils.BASIC_USERNAME, "");
+        runner.assertNotValid();
+
+        runner.setProperty(SolrUtils.BASIC_USERNAME, "${solr.user}");
+        runner.assertNotValid();
+
+        runner.setVariable("solr.user", "solrRocks");
+        runner.assertValid();
+
+        runner.setProperty(SolrUtils.BASIC_PASSWORD, "${solr.password}");
+        runner.assertNotValid();
+
+        runner.setVariable("solr.password", "solrRocksPassword");
+        runner.assertValid();
+    }
+
+    @Test
+    public void testJAASClientAppNameValidation() throws InitializationException {
+        final TestRunner runner = TestRunners.newTestRunner(PutSolrRecord.class);
+        MockRecordParser recordParser = new MockRecordParser();
+        recordParser.addRecord(1, "Abhinav","R",8,"Chemistry","term1", 98);
+        runner.addControllerService("parser", recordParser);
+        runner.enableControllerService(recordParser);
+        runner.setProperty(PutSolrRecord.RECORD_READER, "parser");
+        runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_STANDARD.getValue());
+        runner.setProperty(SolrUtils.SOLR_LOCATION, "http://localhost:8443/solr");
+        runner.assertValid();
+
+        // clear the jaas config system property if it was set
+        final String jaasConfig = System.getProperty(Krb5HttpClientConfigurer.LOGIN_CONFIG_PROP);
+        if (!StringUtils.isEmpty(jaasConfig)) {
+            System.clearProperty(Krb5HttpClientConfigurer.LOGIN_CONFIG_PROP);
+        }
+
+        // should be invalid if we have a client name but not config file
+        runner.setProperty(SolrUtils.JAAS_CLIENT_APP_NAME, "Client");
+        runner.assertNotValid();
+
+        // should be invalid if we have a client name that is not in the config file
+        final File jaasConfigFile = new File("src/test/resources/jaas-client.conf");
+        System.setProperty(Krb5HttpClientConfigurer.LOGIN_CONFIG_PROP, jaasConfigFile.getAbsolutePath());
+        runner.assertNotValid();
+
+        // should be valid now that the name matches up with the config file
+        runner.setProperty(SolrUtils.JAAS_CLIENT_APP_NAME, "SolrJClient");
+        runner.assertValid();
+    }
+
+
+    /**
+     * Creates a base TestRunner with Solr Type of standard.
+     */
+    private static TestRunner createDefaultTestRunner(PutSolrRecord processor) {
+        TestRunner runner = TestRunners.newTestRunner(processor);
+        runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_STANDARD.getValue());
+        runner.setProperty(SolrUtils.SOLR_LOCATION, "http://localhost:8443/solr");
+        return runner;
+    }
+
+    // Override createSolrClient and return the passed in SolrClient
+    private class TestableProcessor extends PutSolrRecord {
+        private SolrClient solrClient;
+
+        public TestableProcessor(SolrClient solrClient) {
+            this.solrClient = solrClient;
+        }
+        @Override
+        protected SolrClient createSolrClient(ProcessContext context, String solrLocation) {
+            return solrClient;
+        }
+    }
+
+    // Create an EmbeddedSolrClient with the given core name.
+    private static SolrClient createEmbeddedSolrClient(String coreName) throws IOException {
+        String relPath = TestPutSolrRecord.class.getProtectionDomain()
+                .getCodeSource().getLocation().getFile()
+                + "../../target";
+
+        return EmbeddedSolrServerFactory.create(
+                EmbeddedSolrServerFactory.DEFAULT_SOLR_HOME,
+                coreName, relPath);
+    }
+
+    // Override the createSolrClient method to inject a custom SolrClient.
+    private class CollectionVerifyingProcessor extends PutSolrRecord {
+
+        private SolrClient mockSolrClient;
+
+        private final String expectedCollection;
+
+        public CollectionVerifyingProcessor(final String expectedCollection) {
+            this.expectedCollection = expectedCollection;
+        }
+
+        @Override
+        protected SolrClient createSolrClient(ProcessContext context, String solrLocation) {
+            mockSolrClient = new SolrClient() {
+                @Override
+                public NamedList<Object> request(SolrRequest solrRequest, String s) throws SolrServerException, IOException {
+                    Assert.assertEquals(expectedCollection, solrRequest.getParams().get(PutSolrRecord.COLLECTION_PARAM_NAME));
+                    return new NamedList<>();
+                }
+
+                @Override
+                public void close() {
+
+                }
+
+            };
+            return mockSolrClient;
+        }
+
+    }
+
+    /**
+     * Verify that given SolrServer contains the expected SolrDocuments.
+     */
+    private static void verifySolrDocuments(SolrClient solrServer, Collection<SolrDocument> expectedDocuments)
+            throws IOException, SolrServerException {
+
+        solrServer.commit();
+
+        SolrQuery query = new SolrQuery("*:*");
+        QueryResponse qResponse = solrServer.query(query);
+        Assert.assertEquals(expectedDocuments.size(), qResponse.getResults().getNumFound());
+
+        // verify documents have expected fields and values
+        for (SolrDocument expectedDoc : expectedDocuments) {
+            boolean found = false;
+            for (SolrDocument solrDocument : qResponse.getResults()) {
+                boolean foundAllFields = true;
+                for (String expectedField : expectedDoc.getFieldNames()) {
+                    Object expectedVal = expectedDoc.getFirstValue(expectedField);
+                    Object actualVal = solrDocument.getFirstValue(expectedField);
+                    foundAllFields = expectedVal.equals(actualVal);
+                }
+
+                if (foundAllFields) {
+                    found = true;
+                    break;
+                }
+            }
+            Assert.assertTrue("Could not find " + expectedDoc, found);
+        }
+    }
+
+    // Override the createSolrClient method to inject a Mock.
+    private class ExceptionThrowingProcessor extends PutSolrRecord {
+
+        private SolrClient mockSolrClient;
+        private Throwable throwable;
+
+        public ExceptionThrowingProcessor(Throwable throwable) {
+            this.throwable = throwable;
+        }
+
+        @Override
+        protected SolrClient createSolrClient(ProcessContext context, String solrLocation) {
+            mockSolrClient = Mockito.mock(SolrClient.class);
+            try {
+                when(mockSolrClient.request(any(SolrRequest.class),
+                        eq((String)null))).thenThrow(throwable);
+            } catch (SolrServerException|IOException e) {
+                Assert.fail(e.getMessage());
+            }
+            return mockSolrClient;
+        }
+
+    }
+
+
+    /**
+     * Mock implementation so we don't need to have a real keystore/truststore available for testing.
+     */
+    private class MockSSLContextService extends AbstractControllerService implements SSLContextService {
+
+        @Override
+        public SSLContext createSSLContext(ClientAuth clientAuth) throws ProcessException {
+            return null;
+        }
+
+        @Override
+        public String getTrustStoreFile() {
+            return null;
+        }
+
+        @Override
+        public String getTrustStoreType() {
+            return null;
+        }
+
+        @Override
+        public String getTrustStorePassword() {
+            return null;
+        }
+
+        @Override
+        public boolean isTrustStoreConfigured() {
+            return false;
+        }
+
+        @Override
+        public String getKeyStoreFile() {
+            return null;
+        }
+
+        @Override
+        public String getKeyStoreType() {
+            return null;
+        }
+
+        @Override
+        public String getKeyStorePassword() {
+            return null;
+        }
+
+        @Override
+        public String getKeyPassword() {
+            return null;
+        }
+
+        @Override
+        public boolean isKeyStoreConfigured() {
+            return false;
+        }
+
+        @Override
+        public String getSslAlgorithm() {
+            return null;
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e3f47207/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/solr/testCollection/conf/schema.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/solr/testCollection/conf/schema.xml b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/solr/testCollection/conf/schema.xml
index 58b9256..aae2ae5 100644
--- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/solr/testCollection/conf/schema.xml
+++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/solr/testCollection/conf/schema.xml
@@ -1,31 +1,37 @@
-<?xml version="1.0" encoding="UTF-8" ?>
-<schema version="1.5" name="testCollection">
-
-    <fieldType name="string" class="solr.StrField"/>
-    <fieldType name="date" class="solr.TrieDateField" precisionStep="0" positionIncrementGap="0"/>
-    <fieldType name="int" class="solr.TrieIntField" precisionStep="0" positionIncrementGap="0"/>
-    <fieldType name="float" class="solr.TrieFloatField" precisionStep="0" positionIncrementGap="0"/>
-    <fieldType name="long" class="solr.TrieLongField" precisionStep="0" positionIncrementGap="0"/>
-    <fieldType name="double" class="solr.TrieDoubleField" precisionStep="0" positionIncrementGap="0"/>
-
-    <field name="_version_" type="long" indexed="true" stored="true"/>
-
-    <field name="first" type="string" indexed="true" stored="true" />
-    <field name="last" type="string" indexed="true" stored="true" />
-    <field name="grade" type="int" indexed="true" stored="true" />
-    <field name="marks" type="int" indexed="true" stored="true" />
-    <field name="test" type="string" indexed="true" stored="true" />
-    <field name="subject" type="string" indexed="true" stored="true" />
-
-    <field name="created" type="date" indexed="true" stored="true" />
-
-    <field name="id" type="string" indexed="true" stored="true" />
-    <field name="double_single" type="double" indexed="true" stored="true" />
-    <field name="integer_single" type="int" indexed="true" stored="true" />
-    <field name="integer_multi" type="int" indexed="true" stored="true"  multiValued="true"/>
-    <field name="string_single" type="string" indexed="true" stored="true" />
-    <field name="string_multi" type="string" indexed="true" stored="true" multiValued="true"/>
-
-    <uniqueKey>id</uniqueKey>
-
+<?xml version="1.0" encoding="UTF-8" ?>
+<schema version="1.5" name="testCollection">
+
+    <fieldType name="string" class="solr.StrField"/>
+    <fieldType name="date" class="solr.TrieDateField" precisionStep="0" positionIncrementGap="0"/>
+    <fieldType name="int" class="solr.TrieIntField" precisionStep="0" positionIncrementGap="0"/>
+    <fieldType name="float" class="solr.TrieFloatField" precisionStep="0" positionIncrementGap="0"/>
+    <fieldType name="long" class="solr.TrieLongField" precisionStep="0" positionIncrementGap="0"/>
+    <fieldType name="double" class="solr.TrieDoubleField" precisionStep="0" positionIncrementGap="0"/>
+
+    <field name="_version_" type="long" indexed="true" stored="true"/>
+
+    <field name="first" type="string" indexed="true" stored="true" />
+    <field name="last" type="string" indexed="true" stored="true" />
+    <field name="grade" type="int" indexed="true" stored="true" />
+    <field name="marks" type="int" indexed="true" stored="true" />
+    <field name="test" type="string" indexed="true" stored="true" />
+    <field name="subject" type="string" indexed="true" stored="true" />
+    <field name="exam_marks" type="int" indexed="true" stored="true"/>
+    <field name="exam_test" type="string" indexed="true" stored="true"/>
+    <field name="exam_subject" type="string" indexed="true" stored="true"/>
+    <field name="exams_marks" type="int" indexed="true" stored="true" multiValued="true"/>
+    <field name="exams_test" type="string" indexed="true" stored="true" multiValued="true"/>
+    <field name="exams_subject" type="string" indexed="true" stored="true" multiValued="true"/>
+
+    <field name="created" type="date" indexed="true" stored="true" />
+
+    <field name="id" type="string" indexed="true" stored="true" />
+    <field name="double_single" type="double" indexed="true" stored="true" />
+    <field name="integer_single" type="int" indexed="true" stored="true" />
+    <field name="integer_multi" type="int" indexed="true" stored="true"  multiValued="true"/>
+    <field name="string_single" type="string" indexed="true" stored="true" />
+    <field name="string_multi" type="string" indexed="true" stored="true" multiValued="true"/>
+
+    <uniqueKey>id</uniqueKey>
+
 </schema>
\ No newline at end of file