You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jg...@apache.org on 2021/10/07 09:58:29 UTC

[nifi] branch main updated: NIFI-7990: Add properties to map Record field as @timestamp in output to Elasticsearch for PutElasticsearchRecord and PutElasticsearchHttpRecord processors; NIFI-7474 allow mapped id field to be retained within the Record for PutElasticsearchRecord

This is an automated email from the ASF dual-hosted git repository.

jgresock pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 3892e50  NIFI-7990: Add properties to map Record field as @timestamp in output to Elasticsearch for PutElasticsearchRecord and PutElasticsearchHttpRecord processors; NIFI-7474 allow mapped id field to be retained within the Record for PutElasticsearchRecord
3892e50 is described below

commit 3892e50991ff803b762c26aec6067aecacda548e
Author: Chris Sampson <ch...@gmail.com>
AuthorDate: Thu Sep 2 11:57:57 2021 +0100

    NIFI-7990: Add properties to map Record field as @timestamp in output to Elasticsearch for PutElasticsearchRecord and PutElasticsearchHttpRecord processors; NIFI-7474 allow mapped id field to be retained within the Record for PutElasticsearchRecord
    
    Signed-off-by: Joe Gresock <jg...@gmail.com>
    
    This closes #4691.
---
 .../nifi/elasticsearch/IndexOperationRequest.java  |   8 +-
 .../elasticsearch/PutElasticsearchHttpRecord.java  | 200 +++++++++-----
 .../TestPutElasticsearchHttpRecord.java            | 158 ++++++++++-
 .../nifi-elasticsearch-restapi-processors/pom.xml  |  12 +-
 .../elasticsearch/PutElasticsearchRecord.java      | 303 +++++++++++++++++----
 .../PutElasticsearchRecordTest.groovy              | 140 +++++++++-
 6 files changed, 665 insertions(+), 156 deletions(-)

diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java
index 7de8807..be79416 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java
@@ -65,9 +65,9 @@ public class IndexOperationRequest {
         Index("index"),
         Update("update"),
         Upsert("upsert");
-        String value;
+        private final String value;
 
-        Operation(String value) {
+        Operation(final String value) {
             this.value = value;
         }
 
@@ -80,6 +80,10 @@ public class IndexOperationRequest {
                     .filter(o -> o.getValue().equalsIgnoreCase(value)).findFirst()
                     .orElseThrow(() -> new IllegalArgumentException(String.format("Unknown Index Operation %s", value)));
         }
+
+        public static String[] allValues() {
+            return Arrays.stream(Operation.values()).map(Operation::getValue).sorted().toArray(String[]::new);
+        }
     }
 
     @Override
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
index b9f2e27..6333be3 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
@@ -66,7 +66,6 @@ import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.type.ArrayDataType;
 import org.apache.nifi.serialization.record.type.ChoiceDataType;
 import org.apache.nifi.serialization.record.type.MapDataType;
-import org.apache.nifi.serialization.record.type.RecordDataType;
 import org.apache.nifi.serialization.record.util.DataTypeUtils;
 import org.apache.nifi.util.StringUtils;
 
@@ -78,13 +77,13 @@ import java.io.OutputStream;
 import java.math.BigInteger;
 import java.net.URL;
 import java.nio.charset.Charset;
-import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 
@@ -153,7 +152,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
     static final PropertyDescriptor ID_RECORD_PATH = new PropertyDescriptor.Builder()
             .name("put-es-record-id-path")
             .displayName("Identifier Record Path")
-            .description("A RecordPath pointing to a field in the record(s) that contains the identifier for the document. If the Index Operation is \"index\", "
+            .description("A RecordPath pointing to a field in the record(s) that contains the identifier for the document. If the Index Operation is \"index\" or \"create\", "
                     + "this property may be left empty or evaluate to an empty value, in which case the document's identifier will be "
                     + "auto-generated by Elasticsearch. For all other Index Operations, the field's value must be non-empty.")
             .required(false)
@@ -209,12 +208,31 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
             .required(true)
             .build();
 
+    static final PropertyDescriptor AT_TIMESTAMP = new PropertyDescriptor.Builder()
+            .name("put-es-record-at-timestamp")
+            .displayName("@timestamp Value")
+            .description("The value to use as the @timestamp field (required for Elasticsearch Data Streams)")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor AT_TIMESTAMP_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("put-es-record-at-timestamp-path")
+            .displayName("@timestamp Record Path")
+            .description("A RecordPath pointing to a field in the record(s) that contains the @timestamp for the document. " +
+                    "If left blank the @timestamp will be determined using the main @timestamp property")
+            .required(false)
+            .addValidator(new RecordPathValidator())
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
     static final PropertyDescriptor DATE_FORMAT = new PropertyDescriptor.Builder()
             .name("Date Format")
             .description("Specifies the format to use when reading/writing Date fields. "
                     + "If not specified, the default format '" + RecordFieldType.DATE.getDefaultFormat() + "' is used. "
                     + "If specified, the value must match the Java Simple Date Format (for example, MM/dd/yyyy for a two-digit month, followed by "
-                    + "a two-digit day, followed by a four-digit year, all separated by '/' characters, as in 01/01/2017).")
+                    + "a two-digit day, followed by a four-digit year, all separated by '/' characters, as in 01/25/2017).")
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .addValidator(new SimpleDateFormatValidator())
             .required(false)
@@ -235,7 +253,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
                     + "If not specified, the default format '" + RecordFieldType.TIMESTAMP.getDefaultFormat() + "' is used. "
                     + "If specified, the value must match the Java Simple Date Format (for example, MM/dd/yyyy HH:mm:ss for a two-digit month, followed by "
                     + "a two-digit day, followed by a four-digit year, all separated by '/' characters; and then followed by a two-digit hour in 24-hour format, followed by "
-                    + "a two-digit minute, followed by a two-digit second, all separated by ':' characters, as in 01/01/2017 18:04:15).")
+                    + "a two-digit minute, followed by a two-digit second, all separated by ':' characters, as in 01/25/2017 18:04:15).")
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .addValidator(new SimpleDateFormatValidator())
             .required(false)
@@ -266,6 +284,8 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
         descriptors.add(RECORD_WRITER);
         descriptors.add(LOG_ALL_ERRORS);
         descriptors.add(ID_RECORD_PATH);
+        descriptors.add(AT_TIMESTAMP_RECORD_PATH);
+        descriptors.add(AT_TIMESTAMP);
         descriptors.add(INDEX);
         descriptors.add(TYPE);
         descriptors.add(INDEX_OP);
@@ -292,7 +312,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
         final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext));
         // Since Expression Language is allowed for index operation, we can't guarantee that we can catch
         // all invalid configurations, but we should catch them as soon as we can. For example, if the
-        // Identifier Record Path property is empty, the Index Operation must evaluate to "index".
+        // Identifier Record Path property is empty, the Index Operation must evaluate to "index" or "create".
         String idPath = validationContext.getProperty(ID_RECORD_PATH).getValue();
         String indexOp = validationContext.getProperty(INDEX_OP).getValue();
 
@@ -363,7 +383,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
         if (StringUtils.isEmpty(baseUrl)) {
             throw new ProcessException("Elasticsearch URL is empty or null, this indicates an invalid Expression (missing variables, e.g.)");
         }
-        HttpUrl.Builder urlBuilder = HttpUrl.parse(baseUrl).newBuilder().addPathSegment("_bulk");
+        HttpUrl.Builder urlBuilder = Objects.requireNonNull(HttpUrl.parse(baseUrl)).newBuilder().addPathSegment("_bulk");
 
         // Find the user-added properties and set them as query parameters on the URL
         for (Map.Entry<PropertyDescriptor, String> property : context.getProperties().entrySet()) {
@@ -378,14 +398,14 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
 
         final String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
         if (StringUtils.isEmpty(index)) {
-            logger.error("No value for index in for {}, transferring to failure", new Object[]{flowFile});
+            logger.error("No value for index in for {}, transferring to failure", flowFile);
             session.transfer(flowFile, REL_FAILURE);
             return;
         }
         final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue();
         String indexOp = context.getProperty(INDEX_OP).evaluateAttributeExpressions(flowFile).getValue();
         if (StringUtils.isEmpty(indexOp)) {
-            logger.error("No Index operation specified for {}, transferring to failure.", new Object[]{flowFile});
+            logger.error("No Index operation specified for {}, transferring to failure.", flowFile);
             session.transfer(flowFile, REL_FAILURE);
             return;
         }
@@ -398,18 +418,22 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
             case "delete":
                 break;
             default:
-                logger.error("Index operation {} not supported for {}, transferring to failure.", new Object[]{indexOp, flowFile});
+                logger.error("Index operation {} not supported for {}, transferring to failure.", indexOp, flowFile);
                 session.transfer(flowFile, REL_FAILURE);
                 return;
         }
 
         this.nullSuppression = context.getProperty(SUPPRESS_NULLS).getValue();
 
-        final String id_path = context.getProperty(ID_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue();
-        final RecordPath recordPath = StringUtils.isEmpty(id_path) ? null : recordPathCache.getCompiled(id_path);
+        final String idPath = context.getProperty(ID_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue();
+        final RecordPath recordPath = StringUtils.isEmpty(idPath) ? null : recordPathCache.getCompiled(idPath);
         final StringBuilder sb = new StringBuilder();
         final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
 
+        final String atTimestamp = context.getProperty(AT_TIMESTAMP).evaluateAttributeExpressions(flowFile).getValue();
+        final String atTimestampPath = context.getProperty(AT_TIMESTAMP_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue();
+        final RecordPath atPath = StringUtils.isEmpty(atTimestampPath) ? null : recordPathCache.getCompiled(atTimestampPath);
+
         int recordCount = 0;
         try (final InputStream in = session.read(flowFile);
              final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
@@ -428,6 +452,14 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
                     id = null;
                 }
 
+                final Object timestamp;
+                if (atPath != null) {
+                    final Optional<FieldValue> atPathValue = atPath.evaluate(record).getSelectedFields().findFirst();
+                    timestamp = !atPathValue.isPresent() || atPathValue.get().getValue() == null ? atTimestamp : atPathValue.get();
+                } else {
+                    timestamp = atTimestamp;
+                }
+
                 // The ID must be valid for all operations except "index" or "create". For that case,
                 // a missing ID indicates one is to be auto-generated by Elasticsearch
                 if (id == null && !(indexOp.equalsIgnoreCase("index") || indexOp.equalsIgnoreCase("create"))) {
@@ -437,8 +469,8 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
                 final StringBuilder json = new StringBuilder();
 
                 ByteArrayOutputStream out = new ByteArrayOutputStream();
-                JsonGenerator generator = factory.createJsonGenerator(out);
-                writeRecord(record, record.getSchema(), generator);
+                JsonGenerator generator = factory.createGenerator(out);
+                writeRecord(record, generator, timestamp);
                 generator.flush();
                 generator.close();
                 json.append(out.toString(charset.name()));
@@ -447,7 +479,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
                 recordCount++;
             }
         } catch (IdentifierNotFoundException infe) {
-            logger.error(infe.getMessage(), new Object[]{flowFile});
+            logger.error(infe.getMessage(), flowFile);
             flowFile = session.penalize(flowFile);
             session.transfer(flowFile, REL_FAILURE);
             return;
@@ -459,7 +491,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
             return;
         }
 
-        RequestBody requestBody = RequestBody.create(MediaType.parse("application/json"), sb.toString());
+        RequestBody requestBody = RequestBody.create(sb.toString(), MediaType.parse("application/json"));
         final Response getResponse;
         try {
             getResponse = sendRequestToElasticsearch(okHttpClient, url, username, password, "PUT", requestBody);
@@ -474,49 +506,50 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
         final Set<Integer> failures = new HashSet<>();
 
         if (isSuccess(statusCode)) {
-            try (ResponseBody responseBody = getResponse.body()) {
-                final byte[] bodyBytes = responseBody.bytes();
-
-                JsonNode responseJson = parseJsonResponse(new ByteArrayInputStream(bodyBytes));
-                boolean errors = responseJson.get("errors").asBoolean(false);
-                // ES has no rollback, so if errors occur, log them and route the whole flow file to failure
-                if (errors) {
-                    ArrayNode itemNodeArray = (ArrayNode) responseJson.get("items");
-                    if(itemNodeArray != null) {
-                        if (itemNodeArray.size() > 0) {
-                            // All items are returned whether they succeeded or failed, so iterate through the item array
-                            // at the same time as the flow file list, moving each to success or failure accordingly,
-                            // but only keep the first error for logging
-                            String errorReason = null;
-                            for (int i = itemNodeArray.size() - 1; i >= 0; i--) {
-                                JsonNode itemNode = itemNodeArray.get(i);
-                                int status = itemNode.findPath("status").asInt();
-                                if (!isSuccess(status)) {
-                                    if (errorReason == null || logAllErrors) {
-                                        // Use "result" if it is present; this happens for status codes like 404 Not Found, which may not have an error/reason
-                                        String reason = itemNode.findPath("result").asText();
-                                        if (StringUtils.isEmpty(reason)) {
-                                            // If there was no result, we expect an error with a string description in the "reason" field
-                                            reason = itemNode.findPath("reason").asText();
+            try (final ResponseBody responseBody = getResponse.body()) {
+                if (responseBody != null) {
+                    final byte[] bodyBytes = responseBody.bytes();
+
+                    JsonNode responseJson = parseJsonResponse(new ByteArrayInputStream(bodyBytes));
+                    boolean errors = responseJson.get("errors").asBoolean(false);
+                    // ES has no rollback, so if errors occur, log them and route the whole flow file to failure
+                    if (errors) {
+                        ArrayNode itemNodeArray = (ArrayNode) responseJson.get("items");
+                        if (itemNodeArray != null) {
+                            if (itemNodeArray.size() > 0) {
+                                // All items are returned whether they succeeded or failed, so iterate through the item array
+                                // at the same time as the flow file list, moving each to success or failure accordingly,
+                                // but only keep the first error for logging
+                                String errorReason = null;
+                                for (int i = itemNodeArray.size() - 1; i >= 0; i--) {
+                                    JsonNode itemNode = itemNodeArray.get(i);
+                                    int status = itemNode.findPath("status").asInt();
+                                    if (!isSuccess(status)) {
+                                        if (errorReason == null || logAllErrors) {
+                                            // Use "result" if it is present; this happens for status codes like 404 Not Found, which may not have an error/reason
+                                            String reason = itemNode.findPath("result").asText();
+                                            if (StringUtils.isEmpty(reason)) {
+                                                // If there was no result, we expect an error with a string description in the "reason" field
+                                                reason = itemNode.findPath("reason").asText();
+                                            }
+                                            errorReason = reason;
+
+                                            logger.error("Failed to process record {} in FlowFile {} due to {}, transferring to failure",
+                                                    i, flowFile, errorReason);
                                         }
-                                        errorReason = reason;
-
-                                        logger.error("Failed to process record {} in FlowFile {} due to {}, transferring to failure",
-                                                new Object[]{i, flowFile, errorReason});
+                                        failures.add(i);
                                     }
-                                    failures.add(i);
                                 }
                             }
                         }
+                    } else {
+                        // Everything succeeded, route FF and end
+                        flowFile = session.putAttribute(flowFile, "record.count", Integer.toString(recordCount));
+                        session.transfer(flowFile, REL_SUCCESS);
+                        session.getProvenanceReporter().send(flowFile, url.toString());
+                        return;
                     }
-                } else {
-                    // Everything succeeded, route FF and end
-                    flowFile = session.putAttribute(flowFile, "record.count", Integer.toString(recordCount));
-                    session.transfer(flowFile, REL_SUCCESS);
-                    session.getProvenanceReporter().send(flowFile, url.toString());
-                    return;
                 }
-
             } catch (IOException ioe) {
                 // Something went wrong when parsing the response, log the error and route to failure
                 logger.error("Error parsing Bulk API response: {}", new Object[]{ioe.getMessage()}, ioe);
@@ -529,12 +562,12 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
         } else if (statusCode / 100 == 5) {
             // 5xx -> RETRY, but a server error might last a while, so yield
             logger.warn("Elasticsearch returned code {} with message {}, transferring flow file to retry. This is likely a server problem, yielding...",
-                    new Object[]{statusCode, getResponse.message()});
+                    statusCode, getResponse.message());
             session.transfer(flowFile, REL_RETRY);
             context.yield();
             return;
         } else {  // 1xx, 3xx, 4xx, etc. -> NO RETRY
-            logger.warn("Elasticsearch returned code {} with message {}, transferring flow file to failure", new Object[]{statusCode, getResponse.message()});
+            logger.warn("Elasticsearch returned code {} with message {}, transferring flow file to failure", statusCode, getResponse.message());
             session.transfer(flowFile, REL_FAILURE);
             return;
         }
@@ -549,17 +582,16 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
             final RecordSetWriterFactory writerFactory = writerFactoryOptional.get();
 
             // We know there are a mixture of successes and failures, create FFs for each and rename input FF to avoid confusion.
-            final FlowFile inputFlowFile = flowFile;
-            final FlowFile successFlowFile = session.create(inputFlowFile);
-            final FlowFile failedFlowFile = session.create(inputFlowFile);
+            final FlowFile successFlowFile = session.create(flowFile);
+            final FlowFile failedFlowFile = session.create(flowFile);
 
             // Set up the reader and writers
             try (final OutputStream successOut = session.write(successFlowFile);
                  final OutputStream failedOut = session.write(failedFlowFile);
-                 final InputStream in = session.read(inputFlowFile);
-                 final RecordReader reader = readerFactory.createRecordReader(inputFlowFile, in, getLogger())) {
+                 final InputStream in = session.read(flowFile);
+                 final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
 
-                final RecordSchema schema = writerFactory.getSchema(inputFlowFile.getAttributes(), reader.getSchema());
+                final RecordSchema schema = writerFactory.getSchema(flowFile.getAttributes(), reader.getSchema());
 
                 try (final RecordSetWriter successWriter = writerFactory.createWriter(getLogger(), schema, successOut, successFlowFile);
                      final RecordSetWriter failedWriter = writerFactory.createWriter(getLogger(), schema, failedOut, failedFlowFile)) {
@@ -581,8 +613,8 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
                 }
             } catch (final IOException | SchemaNotFoundException | MalformedRecordException e) {
                 // We failed while handling individual failures. Not much else we can do other than log, and route the whole thing to failure.
-                getLogger().error("Failed to process {} during individual record failure handling; route whole FF to failure", new Object[] {flowFile, e});
-                session.transfer(inputFlowFile, REL_FAILURE);
+                getLogger().error("Failed to process {} during individual record failure handling; route whole FF to failure", flowFile, e);
+                session.transfer(flowFile, REL_FAILURE);
                 if (successFlowFile != null) {
                     session.remove(successFlowFile);
                 }
@@ -598,15 +630,34 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
             session.putAttribute(failedFlowFile, "failure.count", Integer.toString(failures.size()));
             session.transfer(successFlowFile, REL_SUCCESS);
             session.transfer(failedFlowFile, REL_FAILURE);
-            session.remove(inputFlowFile);
+            session.remove(flowFile);
         }
     }
 
-    private void writeRecord(final Record record, final RecordSchema writeSchema, final JsonGenerator generator)
-            throws IOException {
-        RecordSchema schema = record.getSchema();
+    private void writeRecord(final Record record, final JsonGenerator generator, final Object atTimestamp) throws IOException {
+        final RecordSchema schema = record.getSchema();
 
         generator.writeStartObject();
+
+        if (atTimestamp != null && !(atTimestamp instanceof String && StringUtils.isBlank((String) atTimestamp))) {
+            final DataType atDataType;
+            final Object atValue;
+            if (atTimestamp instanceof FieldValue) {
+                final FieldValue atField = (FieldValue) atTimestamp;
+                atDataType = atField.getField().getDataType();
+                atValue = atField.getValue();
+            } else {
+                atDataType = RecordFieldType.STRING.getDataType();
+                atValue = atTimestamp.toString();
+            }
+
+            final Object outputValue = RecordFieldType.STRING.getDataType().equals(atDataType) ? coerceTimestampStringToLong(atValue.toString()) : atValue;
+            final DataType outputDataType = outputValue.equals(atValue) ? atDataType : RecordFieldType.LONG.getDataType();
+
+            generator.writeFieldName("@timestamp");
+            writeValue(generator, outputValue, "@timestamp", outputDataType);
+        }
+
         for (int i = 0; i < schema.getFieldCount(); i++) {
             final RecordField field = schema.getField(i);
             final String fieldName = field.getFieldName();
@@ -627,6 +678,12 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
         generator.writeEndObject();
     }
 
+    private Object coerceTimestampStringToLong(final String stringValue) {
+        return DataTypeUtils.isLongTypeCompatible(stringValue)
+                ? DataTypeUtils.toLong(stringValue, "@timestamp")
+                : stringValue;
+    }
+
     @SuppressWarnings("unchecked")
     private void writeValue(final JsonGenerator generator, final Object value, final String fieldName, final DataType dataType) throws IOException {
         if (value == null) {
@@ -646,8 +703,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
 
         switch (chosenDataType.getFieldType()) {
             case DATE: {
-                // Use SimpleDateFormat with system default time zone for string conversion
-                final String stringValue = DataTypeUtils.toString(coercedValue, () -> new SimpleDateFormat(dateFormat));
+                final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(this.dateFormat));
                 if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
                     generator.writeNumber(DataTypeUtils.toLong(coercedValue, fieldName));
                 } else {
@@ -711,13 +767,9 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
                     generator.writeString(stringValue);
                 }
                 break;
-            case RECORD: {
-                final Record record = (Record) coercedValue;
-                final RecordDataType recordDataType = (RecordDataType) chosenDataType;
-                final RecordSchema childSchema = recordDataType.getChildSchema();
-                writeRecord(record, childSchema, generator);
+            case RECORD:
+                writeRecord((Record) coercedValue, generator, null);
                 break;
-            }
             case MAP: {
                 final MapDataType mapDataType = (MapDataType) chosenDataType;
                 final DataType valueDataType = mapDataType.getValueType();
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
index 659de9a..38d7aee 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
@@ -49,6 +49,7 @@ import java.net.ConnectException;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.time.format.DateTimeFormatter;
@@ -56,9 +57,11 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.function.Consumer;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
@@ -75,6 +78,7 @@ public class TestPutElasticsearchHttpRecord {
     private static final String ISO_DATE = String.format("%d-%d-%d", DATE_YEAR, DATE_MONTH, DATE_DAY);
     private static final String EXPECTED_DATE = String.format("%d/%d/%d", DATE_DAY, DATE_MONTH, DATE_YEAR);
     private static final LocalDateTime LOCAL_DATE_TIME = LocalDateTime.of(DATE_YEAR, DATE_MONTH, DATE_DAY, TIME_HOUR, TIME_MINUTE);
+    private static final LocalDate LOCAL_DATE = LocalDate.of(DATE_YEAR, DATE_MONTH, DATE_DAY);
     private static final LocalTime LOCAL_TIME = LocalTime.of(TIME_HOUR, TIME_MINUTE);
 
     private TestRunner runner;
@@ -499,6 +503,141 @@ public class TestPutElasticsearchHttpRecord {
     }
 
     @Test
+    public void testPutElasticsearchOnTriggerWithNoAtTimestampPath() throws Exception {
+        PutElasticsearchHttpRecordTestProcessor processor = new PutElasticsearchHttpRecordTestProcessor(false);
+        runner = TestRunners.newTestRunner(processor);
+        generateTestData(1);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
+
+        runner.removeProperty(PutElasticsearchHttpRecord.AT_TIMESTAMP); // no default
+        runner.setProperty(PutElasticsearchHttpRecord.AT_TIMESTAMP_RECORD_PATH, "/none"); // Field does not exist
+        processor.setRecordChecks(record -> assertTimestamp(record, null)); // no @timestamp
+        runner.enqueue(new byte[0]);
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
+        assertNotNull(out);
+        runner.clearTransferState();
+
+        // now add a default @timestamp
+        final String timestamp = "2020-11-27T14:37:00.000Z";
+        runner.setProperty(PutElasticsearchHttpRecord.AT_TIMESTAMP, timestamp);
+        processor.setRecordChecks(record -> assertTimestamp(record, timestamp)); // @timestamp defaulted
+        runner.enqueue(new byte[0]);
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+        final MockFlowFile out2 = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
+        assertNotNull(out2);
+    }
+
+    @Test
+    public void testPutElasticsearchOnTriggerWithAtTimestampFromAttribute() throws IOException {
+        PutElasticsearchHttpRecordTestProcessor processor = new PutElasticsearchHttpRecordTestProcessor(false);
+        runner = TestRunners.newTestRunner(processor);
+        generateTestData(1);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(PutElasticsearchHttpRecord.INDEX, "${i}");
+        runner.setProperty(PutElasticsearchHttpRecord.AT_TIMESTAMP, "${timestamp}");
+
+        final String timestamp = "2020-11-27T15:10:00.000Z";
+        processor.setRecordChecks(record -> assertTimestamp(record, timestamp));
+        runner.enqueue(new byte[0], new HashMap<String, String>() {{
+            put("doc_id", "28039652144");
+            put("i", "doc");
+            put("timestamp", timestamp);
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
+        assertNotNull(out);
+        runner.clearTransferState();
+
+        // Now try an empty attribute value, should be no timestamp
+        processor.setRecordChecks(record -> assertTimestamp(record, null));
+        runner.enqueue(new byte[0], new HashMap<String, String>() {{
+            put("doc_id", "28039652144");
+            put("i", "doc");
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+        final MockFlowFile out2 = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
+        assertNotNull(out2);
+    }
+
+    @Test
+    public void testPutElasticsearchOnTriggerWithAtTimstampPath() throws Exception {
+        PutElasticsearchHttpRecordTestProcessor processor = new PutElasticsearchHttpRecordTestProcessor(false);
+        DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern(RecordFieldType.TIME.getDefaultFormat());
+        DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(RecordFieldType.TIMESTAMP.getDefaultFormat());
+        DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern(RecordFieldType.DATE.getDefaultFormat());
+        runner = TestRunners.newTestRunner(processor);
+        generateTestData(1);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
+
+        runner.setProperty(PutElasticsearchHttpRecord.AT_TIMESTAMP_RECORD_PATH, "/ts"); // TIMESTAMP
+        processor.setRecordChecks(record -> assertTimestamp(record, LOCAL_DATE_TIME.format(dateTimeFormatter)));
+        runner.enqueue(new byte[0]);
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+        assertNotNull(runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0));
+        runner.clearTransferState();
+
+        runner.setProperty(PutElasticsearchHttpRecord.AT_TIMESTAMP_RECORD_PATH, "/date"); // DATE;
+        processor.setRecordChecks(record -> assertTimestamp(record, LOCAL_DATE.format(dateFormatter)));
+        runner.enqueue(new byte[0]);
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+        assertNotNull(runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0));
+        runner.clearTransferState();
+
+        runner.setProperty(PutElasticsearchHttpRecord.AT_TIMESTAMP_RECORD_PATH, "/time"); // TIME
+        processor.setRecordChecks(record -> assertTimestamp(record, LOCAL_TIME.format(timeFormatter)));
+        runner.enqueue(new byte[0]);
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+        assertNotNull(runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0));
+        runner.clearTransferState();
+
+        // these INT/STRING values might not make sense from an Elasticsearch point of view,
+        // but we want to prove we can handle them being selected from the Record
+        runner.setProperty(PutElasticsearchHttpRecord.AT_TIMESTAMP_RECORD_PATH, "/code"); // INT
+        processor.setRecordChecks(record -> assertTimestamp(record, 101));
+        runner.enqueue(new byte[0]);
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+        assertNotNull(runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0));
+        runner.clearTransferState();
+
+        runner.setProperty(PutElasticsearchHttpRecord.AT_TIMESTAMP_RECORD_PATH, "/name"); // STRING
+        processor.setRecordChecks(record -> assertTimestamp(record, "reç1"));
+        runner.enqueue(new byte[0]);
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+        assertNotNull(runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0));
+        runner.clearTransferState();
+
+        runner.setProperty(PutElasticsearchHttpRecord.AT_TIMESTAMP_RECORD_PATH, "/coerce"); // STRING coerced to LONG
+        processor.setRecordChecks(record -> assertTimestamp(record, 1000));
+        runner.enqueue(new byte[0]);
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+        assertNotNull(runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0));
+        runner.clearTransferState();
+    }
+
+    @Test
     public void testPutElasticSearchOnTriggerQueryParameter() throws IOException {
         PutElasticsearchHttpRecordTestProcessor p = new PutElasticsearchHttpRecordTestProcessor(false); // no failures
         p.setExpectedUrl("http://127.0.0.1:9200/_bulk?pipeline=my-pipeline");
@@ -600,7 +739,7 @@ public class TestPutElasticsearchHttpRecord {
         int statusCode = 200;
         String statusMessage = "OK";
         String expectedUrl = null;
-        Consumer<Map<?, ?>>[] recordChecks;
+        Consumer<Map<String, Object>>[] recordChecks;
 
         PutElasticsearchHttpRecordTestProcessor(boolean responseHasFailures) {
             this.numResponseFailures = responseHasFailures ? 1 : 0;
@@ -620,10 +759,11 @@ public class TestPutElasticsearchHttpRecord {
         }
 
         @SafeVarargs
-        final void setRecordChecks(Consumer<Map<?, ?>>... checks) {
+        final void setRecordChecks(Consumer<Map<String, Object>>... checks) {
             recordChecks = checks;
         }
 
+        @SuppressWarnings("unchecked")
         @Override
         protected void createElasticsearchClient(ProcessContext context) throws ProcessException {
             client = mock(OkHttpClient.class);
@@ -636,7 +776,7 @@ public class TestPutElasticsearchHttpRecord {
                     if (recordChecks != null) {
                         final ObjectMapper mapper = new ObjectMapper();
                         Buffer sink = new Buffer();
-                        realRequest.body().writeTo(sink);
+                        Objects.requireNonNull(realRequest.body()).writeTo(sink);
                         String line;
                         int recordIndex = 0;
                         boolean content = false;
@@ -798,13 +938,13 @@ public class TestPutElasticsearchHttpRecord {
         parser.addSchemaField("time", RecordFieldType.TIME);
         parser.addSchemaField("ts", RecordFieldType.TIMESTAMP);
         parser.addSchemaField("amount", RecordFieldType.DECIMAL);
+        parser.addSchemaField("coerce", RecordFieldType.STRING);
 
         final Date date = Date.valueOf(ISO_DATE);
         final Timestamp timestamp = Timestamp.valueOf(LOCAL_DATE_TIME);
         final Time time = Time.valueOf(LOCAL_TIME);
         for(int i=1; i<=numRecords; i++) {
-
-            parser.addRecord(i, "reç" + i, 100 + i, date, time, timestamp, new BigDecimal(Double.MAX_VALUE).multiply(BigDecimal.TEN));
+            parser.addRecord(i, "reç" + i, 100 + i, date, time, timestamp, new BigDecimal(Double.MAX_VALUE).multiply(BigDecimal.TEN), "1000");
         }
     }
 
@@ -818,4 +958,12 @@ public class TestPutElasticsearchHttpRecord {
         runner.enableControllerService(writer);
         runner.setProperty(PutElasticsearchHttpRecord.RECORD_WRITER, "writer");
     }
+
+    private void assertTimestamp(final Map<String, Object> record, final Object timestamp) {
+        if (timestamp == null) {
+            assertFalse(record.containsKey("@timestamp"));
+        } else {
+            assertEquals(timestamp, record.get("@timestamp"));
+        }
+    }
 }
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml
index fa874fb..5389d0f 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml
@@ -20,12 +20,6 @@ language governing permissions and limitations under the License. -->
     <artifactId>nifi-elasticsearch-restapi-processors</artifactId>
     <packaging>jar</packaging>
 
-    <properties>
-        <slf4jversion>2.7</slf4jversion>
-        <es.version>5.6.6</es.version>
-        <lucene.version>6.2.1</lucene.version>
-    </properties>
-
     <dependencies>
         <dependency>
             <groupId>org.apache.nifi</groupId>
@@ -113,6 +107,12 @@ language governing permissions and limitations under the License. -->
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-standard-record-utils</artifactId>
+            <version>1.15.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-mock-record-utils</artifactId>
             <version>1.15.0-SNAPSHOT</version>
             <scope>test</scope>
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
index 49166ff..96c0c21 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
@@ -50,8 +50,11 @@ import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleDateFormatValidator;
+import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.type.ChoiceDataType;
 import org.apache.nifi.serialization.record.util.DataTypeUtils;
 
 import java.io.InputStream;
@@ -89,23 +92,33 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
         .build();
 
     static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder()
-            .name("put-es-record-index-op")
-            .displayName("Index Operation")
-            .description("The type of the operation used to index (create, delete, index, update, upsert)")
-            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
-            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .defaultValue(IndexOperationRequest.Operation.Index.getValue())
-            .required(true)
-            .build();
+        .name("put-es-record-index-op")
+        .displayName("Index Operation")
+        .description("The type of the operation used to index (create, delete, index, update, upsert)")
+        .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .defaultValue(IndexOperationRequest.Operation.Index.getValue())
+        .required(true)
+        .build();
+
+    static final PropertyDescriptor AT_TIMESTAMP = new PropertyDescriptor.Builder()
+        .name("put-es-record-at-timestamp")
+        .displayName("@timestamp Value")
+        .description("The value to use as the @timestamp field (required for Elasticsearch Data Streams)")
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .required(false)
+        .build();
 
     static final PropertyDescriptor INDEX_OP_RECORD_PATH = new PropertyDescriptor.Builder()
-            .name("put-es-record-index-op-path")
-            .displayName("Index Operation Record Path")
-            .description("A record path expression to retrieve the Index Operation field for use with Elasticsearch. If left blank " +
-                    "the Index Operation will be determined using the main Index Operation property.")
-            .addValidator(new RecordPathValidator())
-            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .build();
+        .name("put-es-record-index-op-path")
+        .displayName("Index Operation Record Path")
+        .description("A record path expression to retrieve the Index Operation field for use with Elasticsearch. If left blank " +
+                "the Index Operation will be determined using the main Index Operation property.")
+        .addValidator(new RecordPathValidator())
+        .required(false)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .build();
 
     static final PropertyDescriptor ID_RECORD_PATH = new PropertyDescriptor.Builder()
         .name("put-es-record-id-path")
@@ -113,6 +126,19 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
         .description("A record path expression to retrieve the ID field for use with Elasticsearch. If left blank " +
                 "the ID will be automatically generated by Elasticsearch.")
         .addValidator(new RecordPathValidator())
+        .required(false)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .build();
+
+    static final PropertyDescriptor RETAIN_ID_FIELD = new PropertyDescriptor.Builder()
+        .name("put-es-record-retain-id-field")
+        .displayName("Retain ID (Record Path)")
+        .description("Whether to retain the existing field used as the ID Record Path.")
+        .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+        .allowableValues("true", "false")
+        .defaultValue("false")
+        .required(false)
+        .dependsOn(ID_RECORD_PATH)
         .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
         .build();
 
@@ -136,6 +162,27 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
         .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
         .build();
 
+    static final PropertyDescriptor AT_TIMESTAMP_RECORD_PATH = new PropertyDescriptor.Builder()
+        .name("put-es-record-at-timestamp-path")
+        .displayName("@timestamp Record Path")
+        .description("A RecordPath pointing to a field in the record(s) that contains the @timestamp for the document. " +
+                "If left blank the @timestamp will be determined using the main @timestamp property")
+        .addValidator(new RecordPathValidator())
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .build();
+
+    static final PropertyDescriptor RETAIN_AT_TIMESTAMP_FIELD = new PropertyDescriptor.Builder()
+        .name("put-es-record-retain-at-timestamp-field")
+        .displayName("Retain @timestamp (Record Path)")
+        .description("Whether to retain the existing field used as the @timestamp Record Path.")
+        .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+        .allowableValues("true", "false")
+        .defaultValue("false")
+        .required(false)
+        .dependsOn(AT_TIMESTAMP_RECORD_PATH)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .build();
+
     static final PropertyDescriptor ERROR_RECORD_WRITER = new PropertyDescriptor.Builder()
         .name("put-es-record-error-writer")
         .displayName("Error Record Writer")
@@ -147,9 +194,51 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
         .required(false)
         .build();
 
+    static final PropertyDescriptor AT_TIMESTAMP_DATE_FORMAT = new PropertyDescriptor.Builder()
+        .name("put-es-record-at-timestamp-date-format")
+        .displayName("@Timestamp Record Path Date Format")
+        .description("Specifies the format to use when writing Date field for @timestamp. "
+                + "If not specified, the default format '" + RecordFieldType.DATE.getDefaultFormat() + "' is used. "
+                + "If specified, the value must match the Java Simple Date Format (for example, MM/dd/yyyy for a two-digit month, followed by "
+                + "a two-digit day, followed by a four-digit year, all separated by '/' characters, as in 01/25/2017).")
+        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+        .addValidator(new SimpleDateFormatValidator())
+        .required(false)
+        .dependsOn(AT_TIMESTAMP_RECORD_PATH)
+        .build();
+
+    static final PropertyDescriptor AT_TIMESTAMP_TIME_FORMAT = new PropertyDescriptor.Builder()
+        .name("put-es-record-at-timestamp-time-format")
+        .displayName("@Timestamp Record Path Time Format")
+        .description("Specifies the format to use when writing Time field for @timestamp. "
+                + "If not specified, the default format '" + RecordFieldType.TIME.getDefaultFormat() + "' is used. "
+                + "If specified, the value must match the Java Simple Date Format (for example, HH:mm:ss for a two-digit hour in 24-hour format, followed by "
+                + "a two-digit minute, followed by a two-digit second, all separated by ':' characters, as in 18:04:15).")
+        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+        .addValidator(new SimpleDateFormatValidator())
+        .required(false)
+        .dependsOn(AT_TIMESTAMP_RECORD_PATH)
+        .build();
+
+    static final PropertyDescriptor AT_TIMESTAMP_TIMESTAMP_FORMAT = new PropertyDescriptor.Builder()
+        .name("put-es-record-at-timestamp-timestamp-format")
+        .displayName("@Timestamp Record Path Timestamp Format")
+        .description("Specifies the format to use when writing Timestamp field for @timestamp. "
+                + "If not specified, the default format '" + RecordFieldType.TIMESTAMP.getDefaultFormat() + "' is used. "
+                + "If specified, the value must match the Java Simple Date Format (for example, MM/dd/yyyy HH:mm:ss for a two-digit month, followed by "
+                + "a two-digit day, followed by a four-digit year, all separated by '/' characters; and then followed by a two-digit hour in 24-hour format, followed by "
+                + "a two-digit minute, followed by a two-digit second, all separated by ':' characters, as in 01/25/2017 18:04:15).")
+        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+        .addValidator(new SimpleDateFormatValidator())
+        .required(false)
+        .dependsOn(AT_TIMESTAMP_RECORD_PATH)
+        .build();
+
     static final List<PropertyDescriptor> DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
-        INDEX_OP, INDEX, TYPE, CLIENT_SERVICE, RECORD_READER, BATCH_SIZE, ID_RECORD_PATH, INDEX_OP_RECORD_PATH,
-        INDEX_RECORD_PATH, TYPE_RECORD_PATH, LOG_ERROR_RESPONSES, ERROR_RECORD_WRITER
+        INDEX_OP, INDEX, TYPE, AT_TIMESTAMP, CLIENT_SERVICE, RECORD_READER, BATCH_SIZE, ID_RECORD_PATH, RETAIN_ID_FIELD,
+        INDEX_OP_RECORD_PATH, INDEX_RECORD_PATH, TYPE_RECORD_PATH, AT_TIMESTAMP_RECORD_PATH, RETAIN_AT_TIMESTAMP_FIELD,
+        AT_TIMESTAMP_DATE_FORMAT, AT_TIMESTAMP_TIME_FORMAT, AT_TIMESTAMP_TIMESTAMP_FORMAT, LOG_ERROR_RESPONSES,
+        ERROR_RECORD_WRITER
     ));
     static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
         REL_SUCCESS, REL_FAILURE, REL_RETRY, REL_FAILED_RECORDS
@@ -170,6 +259,9 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
     private ElasticSearchClientService clientService;
     private RecordSetWriterFactory writerFactory;
     private boolean logErrors;
+    private volatile String dateFormat;
+    private volatile String timeFormat;
+    private volatile String timestampFormat;
 
     @OnScheduled
     public void onScheduled(ProcessContext context) {
@@ -178,6 +270,19 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
         this.recordPathCache = new RecordPathCache(16);
         this.writerFactory = context.getProperty(ERROR_RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
         this.logErrors = context.getProperty(LOG_ERROR_RESPONSES).asBoolean();
+
+        this.dateFormat = context.getProperty(AT_TIMESTAMP_DATE_FORMAT).evaluateAttributeExpressions().getValue();
+        if (this.dateFormat == null) {
+            this.dateFormat = RecordFieldType.DATE.getDefaultFormat();
+        }
+        this.timeFormat = context.getProperty(AT_TIMESTAMP_TIME_FORMAT).evaluateAttributeExpressions().getValue();
+        if (this.timeFormat == null) {
+            this.timeFormat = RecordFieldType.TIME.getDefaultFormat();
+        }
+        this.timestampFormat = context.getProperty(AT_TIMESTAMP_TIMESTAMP_FORMAT).evaluateAttributeExpressions().getValue();
+        if (this.timestampFormat == null) {
+            this.timestampFormat = RecordFieldType.TIMESTAMP.getDefaultFormat();
+        }
     }
 
     static final List<String> ALLOWED_INDEX_OPERATIONS = Collections.unmodifiableList(Arrays.asList(
@@ -214,7 +319,7 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
     }
 
     @Override
-    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+    public void onTrigger(ProcessContext context, ProcessSession session) {
         FlowFile input = session.get();
         if (input == null) {
             return;
@@ -223,24 +328,22 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
         final String indexOp = context.getProperty(INDEX_OP).evaluateAttributeExpressions(input).getValue();
         final String index = context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue();
         final String type  = context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue();
+        final String atTimestamp  = context.getProperty(AT_TIMESTAMP).evaluateAttributeExpressions(input).getValue();
 
-        final String indexOpPath = context.getProperty(INDEX_OP_RECORD_PATH).isSet()
-                ? context.getProperty(INDEX_OP_RECORD_PATH).evaluateAttributeExpressions(input).getValue()
-                : null;
-        final String idPath = context.getProperty(ID_RECORD_PATH).isSet()
-                ? context.getProperty(ID_RECORD_PATH).evaluateAttributeExpressions(input).getValue()
-                : null;
-        final String indexPath = context.getProperty(INDEX_RECORD_PATH).isSet()
-                ? context.getProperty(INDEX_RECORD_PATH).evaluateAttributeExpressions(input).getValue()
-                : null;
-        final String typePath = context.getProperty(TYPE_RECORD_PATH).isSet()
-                ? context.getProperty(TYPE_RECORD_PATH).evaluateAttributeExpressions(input).getValue()
-                : null;
+        final String indexOpPath = context.getProperty(INDEX_OP_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+        final String idPath = context.getProperty(ID_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+        final String indexPath = context.getProperty(INDEX_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+        final String typePath = context.getProperty(TYPE_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+        final String atTimestampPath = context.getProperty(AT_TIMESTAMP_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
 
         RecordPath ioPath = indexOpPath != null ? recordPathCache.getCompiled(indexOpPath) : null;
         RecordPath path = idPath != null ? recordPathCache.getCompiled(idPath) : null;
         RecordPath iPath = indexPath != null ? recordPathCache.getCompiled(indexPath) : null;
         RecordPath tPath = typePath != null ? recordPathCache.getCompiled(typePath) : null;
+        RecordPath atPath = atTimestampPath != null ? recordPathCache.getCompiled(atTimestampPath) : null;
+
+        boolean retainId = context.getProperty(RETAIN_ID_FIELD).evaluateAttributeExpressions(input).asBoolean();
+        boolean retainTimestamp = context.getProperty(RETAIN_AT_TIMESTAMP_FIELD).evaluateAttributeExpressions(input).asBoolean();
 
         int batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(input).asInteger();
         List<FlowFile> badRecords = new ArrayList<>();
@@ -252,13 +355,16 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
             List<Record> originals = new ArrayList<>();
 
             while ((record = reader.nextRecord()) != null) {
-                final String idx = getFromRecordPath(record, iPath, index);
-                final String t   = getFromRecordPath(record, tPath, type);
-                final IndexOperationRequest.Operation o = IndexOperationRequest.Operation.forValue(getFromRecordPath(record, ioPath, indexOp));
-                final String id  = path != null ? getFromRecordPath(record, path, null) : null;
+                final String idx = getFromRecordPath(record, iPath, index, false);
+                final String t   = getFromRecordPath(record, tPath, type, false);
+                final IndexOperationRequest.Operation o = IndexOperationRequest.Operation.forValue(getFromRecordPath(record, ioPath, indexOp, false));
+                final String id  = getFromRecordPath(record, path, null, retainId);
+                final Object timestamp = getTimestampFromRecordPath(record, atPath, atTimestamp, retainTimestamp);
 
                 @SuppressWarnings("unchecked")
-                Map<String, Object> contentMap = (Map<String, Object>) DataTypeUtils.convertRecordFieldtoObject(record, RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
+                final Map<String, Object> contentMap = (Map<String, Object>) DataTypeUtils
+                        .convertRecordFieldtoObject(record, RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
+                contentMap.putIfAbsent("@timestamp", timestamp);
 
                 operationList.add(new IndexOperationRequest(idx, t, id, contentMap, o));
                 originals.add(record);
@@ -275,7 +381,7 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
                 }
             }
 
-            if (operationList.size() > 0) {
+            if (!operationList.isEmpty()) {
                 BulkOperation bundle = new BulkOperation(operationList, originals, reader.getSchema());
                 FlowFile bad = indexDocuments(bundle, session, input);
                 if (bad != null) {
@@ -315,7 +421,7 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
                 List<Map<String, Object>> errors = response.getItems();
                 ObjectMapper mapper = new ObjectMapper();
                 mapper.enable(SerializationFeature.INDENT_OUTPUT);
-                String output = String.format("An error was encountered while processing bulk operations. Server response below:\n\n%s", mapper.writeValueAsString(errors));
+                String output = String.format("An error was encountered while processing bulk operations. Server response below:%n%n%s", mapper.writeValueAsString(errors));
 
                 if (logErrors) {
                     getLogger().error(output);
@@ -326,26 +432,26 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
 
             if (writerFactory != null) {
                 FlowFile errorFF = session.create(input);
-                try (OutputStream os = session.write(errorFF);
-                     RecordSetWriter writer = writerFactory.createWriter(getLogger(), bundle.getSchema(), os, errorFF )) {
-
+                try {
                     int added = 0;
-                    writer.beginRecordSet();
-                    for (int index = 0; index < response.getItems().size(); index++) {
-                        Map<String, Object> current = response.getItems().get(index);
-                        if (!current.isEmpty()) {
-                            String key = current.keySet().iterator().next();
-                            @SuppressWarnings("unchecked")
-                            Map<String, Object> inner = (Map<String, Object>) current.get(key);
-                            if (inner.containsKey("error")) {
-                                writer.write(bundle.getOriginalRecords().get(index));
-                                added++;
+                    try (OutputStream os = session.write(errorFF);
+                         RecordSetWriter writer = writerFactory.createWriter(getLogger(), bundle.getSchema(), os, errorFF )) {
+
+                        writer.beginRecordSet();
+                        for (int index = 0; index < response.getItems().size(); index++) {
+                            Map<String, Object> current = response.getItems().get(index);
+                            if (!current.isEmpty()) {
+                                String key = current.keySet().stream().findFirst().orElse(null);
+                                @SuppressWarnings("unchecked")
+                                Map<String, Object> inner = (Map<String, Object>) current.get(key);
+                                if (inner != null && inner.containsKey("error")) {
+                                    writer.write(bundle.getOriginalRecords().get(index));
+                                    added++;
+                                }
                             }
                         }
+                        writer.finishRecordSet();
                     }
-                    writer.finishRecordSet();
-                    writer.close();
-                    os.close();
 
                     errorFF = session.putAttribute(errorFF, ATTR_RECORD_COUNT, String.valueOf(added));
 
@@ -362,7 +468,8 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
         return null;
     }
 
-    private String getFromRecordPath(final Record record, final RecordPath path, final String fallback) {
+    private String getFromRecordPath(final Record record, final RecordPath path, final String fallback,
+                                     final boolean retain) {
         if (path == null) {
             return fallback;
         }
@@ -377,11 +484,97 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
                 );
             }
 
-            fieldValue.updateValue(null);
+            if (!retain) {
+                fieldValue.updateValue(null);
+            }
 
             return fieldValue.getValue().toString();
         } else {
             return fallback;
         }
     }
+
+    private Object getTimestampFromRecordPath(final Record record, final RecordPath path, final String fallback,
+                                              final boolean retain) {
+        if (path == null) {
+            return coerceStringToLong("@timestamp", fallback);
+        }
+
+        final RecordPathResult result = path.evaluate(record);
+        final Optional<FieldValue> value = result.getSelectedFields().findFirst();
+        if (value.isPresent() && value.get().getValue() != null) {
+            final FieldValue fieldValue = value.get();
+
+            final DataType dataType = fieldValue.getField().getDataType();
+            final String fieldName = fieldValue.getField().getFieldName();
+            final DataType chosenDataType = dataType.getFieldType() == RecordFieldType.CHOICE
+                    ? DataTypeUtils.chooseDataType(value, (ChoiceDataType) dataType)
+                    : dataType;
+            final Object coercedValue = DataTypeUtils.convertType(fieldValue.getValue(), chosenDataType, fieldName);
+            if (coercedValue == null) {
+                return null;
+            }
+
+            final Object returnValue;
+            switch (chosenDataType.getFieldType()) {
+                case DATE:
+                case TIME:
+                case TIMESTAMP:
+                    final String format = determineDateFormat(chosenDataType.getFieldType());
+                    returnValue = coerceStringToLong(
+                            fieldName,
+                            DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(format))
+                    );
+                    break;
+                case LONG:
+                    returnValue = DataTypeUtils.toLong(coercedValue, fieldName);
+                    break;
+                case INT:
+                case BYTE:
+                case SHORT:
+                    returnValue = DataTypeUtils.toInteger(coercedValue, fieldName);
+                    break;
+                case CHAR:
+                case STRING:
+                    returnValue = coerceStringToLong(fieldName, coercedValue.toString());
+                    break;
+                case BIGINT:
+                    returnValue = coercedValue;
+                    break;
+                default:
+                    throw new ProcessException(
+                            String.format("Cannot use %s field referenced by %s as @timestamp.", chosenDataType, path.getPath())
+                    );
+            }
+
+            if (!retain) {
+                fieldValue.updateValue(null);
+            }
+
+            return returnValue;
+        } else {
+            return coerceStringToLong("@timestamp", fallback);
+        }
+    }
+
+    private String determineDateFormat(final RecordFieldType recordFieldType) {
+        final String format;
+        switch (recordFieldType) {
+            case DATE:
+                format = this.dateFormat;
+                break;
+            case TIME:
+                format = this.timeFormat;
+                break;
+            default:
+                format = this.timestampFormat;
+        }
+        return format;
+    }
+
+    private Object coerceStringToLong(final String fieldName, final String stringValue) {
+        return DataTypeUtils.isLongTypeCompatible(stringValue)
+                ? DataTypeUtils.toLong(stringValue, fieldName)
+                : stringValue;
+    }
 }
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
index ccd2d70..12df823 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
@@ -28,6 +28,7 @@ import org.apache.nifi.schema.access.SchemaAccessUtils
 import org.apache.nifi.serialization.RecordReaderFactory
 import org.apache.nifi.serialization.record.MockRecordParser
 import org.apache.nifi.serialization.record.MockSchemaRegistry
+import org.apache.nifi.serialization.record.RecordFieldType
 import org.apache.nifi.util.StringUtils
 import org.apache.nifi.util.TestRunner
 import org.apache.nifi.util.TestRunners
@@ -35,10 +36,29 @@ import org.junit.Assert
 import org.junit.Before
 import org.junit.Test
 
+import java.sql.Date
+import java.sql.Time
+import java.sql.Timestamp
+import java.time.LocalDate
+import java.time.LocalDateTime
+import java.time.LocalTime
+import java.time.format.DateTimeFormatter
+
 import static groovy.json.JsonOutput.prettyPrint
 import static groovy.json.JsonOutput.toJson
 
 class PutElasticsearchRecordTest {
+    private static final int DATE_YEAR = 2020
+    private static final int DATE_MONTH = 11
+    private static final int DATE_DAY = 27
+    private static final int TIME_HOUR = 12
+    private static final int TIME_MINUTE = 55
+    private static final int TIME_SECOND = 23
+
+    private static final LocalDateTime LOCAL_DATE_TIME = LocalDateTime.of(DATE_YEAR, DATE_MONTH, DATE_DAY, TIME_HOUR, TIME_MINUTE, TIME_SECOND)
+    private static final LocalDate LOCAL_DATE = LocalDate.of(DATE_YEAR, DATE_MONTH, DATE_DAY)
+    private static final LocalTime LOCAL_TIME = LocalTime.of(TIME_HOUR, TIME_MINUTE, TIME_SECOND)
+
     MockBulkLoadClientService clientService
     MockSchemaRegistry registry
     RecordReaderFactory reader
@@ -73,10 +93,12 @@ class PutElasticsearchRecordTest {
         runner.addControllerService("reader", reader)
         runner.addControllerService("clientService", clientService)
         runner.setProperty(reader, SchemaAccessUtils.SCHEMA_REGISTRY, "registry")
+        runner.setProperty(reader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_NAME_PROPERTY)
         runner.setProperty(PutElasticsearchRecord.RECORD_READER, "reader")
-        runner.setProperty(PutElasticsearchRecord.INDEX_OP, "index")
+        runner.setProperty(PutElasticsearchRecord.INDEX_OP, IndexOperationRequest.Operation.Index.getValue())
         runner.setProperty(PutElasticsearchRecord.INDEX, "test_index")
         runner.setProperty(PutElasticsearchRecord.TYPE, "test_type")
+        runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP, "test_timestamp")
         runner.setProperty(PutElasticsearchRecord.CLIENT_SERVICE, "clientService")
         runner.enableControllerService(registry)
         runner.enableControllerService(reader)
@@ -86,6 +108,23 @@ class PutElasticsearchRecordTest {
     }
 
     void basicTest(int failure, int retry, int success) {
+        def evalClosure = { List<IndexOperationRequest> items ->
+            int timestampDefaultCount = items.findAll { it.fields.get("@timestamp") == "test_timestamp" }.size()
+            int indexCount = items.findAll { it.index == "test_index" }.size()
+            int typeCount = items.findAll { it.type == "test_type" }.size()
+            int opCount = items.findAll { it.operation == IndexOperationRequest.Operation.Index }.size()
+            Assert.assertEquals(2, timestampDefaultCount)
+            Assert.assertEquals(2, indexCount)
+            Assert.assertEquals(2, typeCount)
+            Assert.assertEquals(2, opCount)
+        }
+
+        basicTest(failure, retry, success, evalClosure)
+    }
+
+    void basicTest(int failure, int retry, int success, Closure evalClosure) {
+        clientService.evalClosure = evalClosure
+
         runner.enqueue(flowFileContents, [ "schema.name": "simple" ])
         runner.run()
 
@@ -100,6 +139,17 @@ class PutElasticsearchRecordTest {
     }
 
     @Test
+    void simpleTestCoercedDefaultTimestamp() {
+        def evalClosure = { List<IndexOperationRequest> items ->
+            int timestampDefault = items.findAll { it.fields.get("@timestamp") == 100L }.size()
+            Assert.assertEquals(2, timestampDefault)
+        }
+
+        runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP, "100")
+        basicTest(0, 0, 1, evalClosure)
+    }
+
+    @Test
     void simpleTestWithMockReader() {
         reader = new MockRecordParser()
         runner.addControllerService("mockReader", reader)
@@ -127,14 +177,19 @@ class PutElasticsearchRecordTest {
             name: "RecordPathTestType",
             fields: [
                 [ name: "id", type: "string" ],
+                [ name: "op", type: "string" ],
                 [ name: "index", type: "string" ],
                 [ name: "type", type: "string" ],
-                [ name: "msg", type: ["null", "string"] ]
+                [ name: "msg", type: ["null", "string"] ],
+                [ name: "ts", type: [ type: "long", logicalType: "timestamp-millis" ] ],
+                [ name: "date", type: [ type: "int", logicalType: "date" ] ],
+                [ name: "time", type: [ type: "int", logicalType: "time-millis" ] ],
+                [ name: "code", type: "long" ]
             ]
         ]))
 
         def flowFileContents = prettyPrint(toJson([
-            [ id: "rec-1", op: "index", index: "bulk_a", type: "message", msg: "Hello" ],
+            [ id: "rec-1", op: "index", index: "bulk_a", type: "message", msg: "Hello", ts: Timestamp.valueOf(LOCAL_DATE_TIME).toInstant().toEpochMilli() ],
             [ id: "rec-2", op: "index", index: "bulk_b", type: "message", msg: "Hello" ],
             [ id: "rec-3", op: "index", index: "bulk_a", type: "message", msg: "Hello" ],
             [ id: "rec-4", op: "index", index: "bulk_b", type: "message", msg: "Hello" ],
@@ -143,13 +198,19 @@ class PutElasticsearchRecordTest {
         ]))
 
         def evalClosure = { List<IndexOperationRequest> items ->
-            def a = items.findAll { it.index == "bulk_a" }.size()
-            def b = items.findAll { it.index == "bulk_b" }.size()
+            int a = items.findAll { it.index == "bulk_a" }.size()
+            int b = items.findAll { it.index == "bulk_b" }.size()
             int index = items.findAll { it.operation == IndexOperationRequest.Operation.Index }.size()
             int create = items.findAll { it.operation == IndexOperationRequest.Operation.Create }.size()
             int msg = items.findAll { ("Hello" == it.fields.get("msg")) }.size()
             int empties = items.findAll { ("" == it.fields.get("msg")) }.size()
             int nulls = items.findAll { (null == it.fields.get("msg")) }.size()
+            int timestamp = items.findAll { it.fields.get("@timestamp") ==
+                    LOCAL_DATE_TIME.format(DateTimeFormatter.ofPattern(RecordFieldType.TIMESTAMP.getDefaultFormat()))
+            }.size()
+            int timestampDefault = items.findAll { it.fields.get("@timestamp") == "test_timestamp" }.size()
+            int ts = items.findAll { it.fields.get("ts") != null }.size()
+            int id = items.findAll { it.fields.get("id") != null }.size()
             items.each {
                 Assert.assertNotNull(it.id)
                 Assert.assertTrue(it.id.startsWith("rec-"))
@@ -162,6 +223,10 @@ class PutElasticsearchRecordTest {
             Assert.assertEquals(4, msg)
             Assert.assertEquals(1, empties)
             Assert.assertEquals(1, nulls)
+            Assert.assertEquals(1, timestamp)
+            Assert.assertEquals(5, timestampDefault)
+            Assert.assertEquals(0, ts)
+            Assert.assertEquals(0, id)
         }
 
         clientService.evalClosure = evalClosure
@@ -173,6 +238,7 @@ class PutElasticsearchRecordTest {
         runner.setProperty(PutElasticsearchRecord.ID_RECORD_PATH, "/id")
         runner.setProperty(PutElasticsearchRecord.INDEX_RECORD_PATH, "/index")
         runner.setProperty(PutElasticsearchRecord.TYPE_RECORD_PATH, "/type")
+        runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP_RECORD_PATH, "/ts")
         runner.enqueue(flowFileContents, [
             "schema.name": "recordPathTest"
         ])
@@ -185,12 +251,12 @@ class PutElasticsearchRecordTest {
         runner.clearTransferState()
 
         flowFileContents = prettyPrint(toJson([
-            [ id: "rec-1", op: null, index: null, type: null, msg: "Hello" ],
-            [ id: "rec-2", op: null, index: null, type: null, msg: "Hello" ],
-            [ id: "rec-3", op: null, index: null, type: null, msg: "Hello" ],
-            [ id: "rec-4", op: null, index: null, type: null, msg: "Hello" ],
-            [ id: "rec-5", op: "update", index: null, type: null, msg: "Hello" ],
-            [ id: "rec-6", op: null, index: "bulk_b", type: "message", msg: "Hello" ]
+                [ id: "rec-1", op: null, index: null, type: null, msg: "Hello", date: Date.valueOf(LOCAL_DATE).getTime() ],
+                [ id: "rec-2", op: null, index: null, type: null, msg: "Hello" ],
+                [ id: "rec-3", op: null, index: null, type: null, msg: "Hello" ],
+                [ id: "rec-4", op: null, index: null, type: null, msg: "Hello" ],
+                [ id: "rec-5", op: "update", index: null, type: null, msg: "Hello" ],
+                [ id: "rec-6", op: null, index: "bulk_b", type: "message", msg: "Hello" ]
         ]))
 
         evalClosure = { List<IndexOperationRequest> items ->
@@ -200,17 +266,32 @@ class PutElasticsearchRecordTest {
             def bulkIndexCount = items.findAll { it.index.startsWith("bulk_") }.size()
             def indexOperationCount = items.findAll { it.operation == IndexOperationRequest.Operation.Index }.size()
             def updateOperationCount = items.findAll { it.operation == IndexOperationRequest.Operation.Update }.size()
+            def timestampCount = items.findAll { it.fields.get("@timestamp") ==
+                    LOCAL_DATE.format(DateTimeFormatter.ofPattern("dd/MM/yyyy"))
+            }.size()
+            int dateCount = items.findAll { it.fields.get("date") != null }.size()
+            def idCount = items.findAll { it.fields.get("id") != null }.size()
+            def defaultCoercedTimestampCount = items.findAll { it.fields.get("@timestamp") == 100L }.size()
             Assert.assertEquals(5, testTypeCount)
             Assert.assertEquals(1, messageTypeCount)
             Assert.assertEquals(5, testIndexCount)
             Assert.assertEquals(1, bulkIndexCount)
             Assert.assertEquals(5, indexOperationCount)
             Assert.assertEquals(1, updateOperationCount)
+            Assert.assertEquals(1, timestampCount)
+            Assert.assertEquals(5, defaultCoercedTimestampCount)
+            Assert.assertEquals(1, dateCount)
+            Assert.assertEquals(6, idCount)
         }
 
         clientService.evalClosure = evalClosure
 
         runner.setProperty(PutElasticsearchRecord.INDEX_OP, "\${operation}")
+        runner.setProperty(PutElasticsearchRecord.RETAIN_ID_FIELD, "true")
+        runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP, "100")
+        runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP_RECORD_PATH, "/date")
+        runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP_DATE_FORMAT, "dd/MM/yyyy")
+        runner.setProperty(PutElasticsearchRecord.RETAIN_AT_TIMESTAMP_FIELD, "true")
         runner.enqueue(flowFileContents, [
             "schema.name": "recordPathTest",
             "operation": "index"
@@ -228,7 +309,7 @@ class PutElasticsearchRecordTest {
                 [ id: "rec-3", msg: "Hello" ],
                 [ id: "rec-4", msg: "Hello" ],
                 [ id: "rec-5", msg: "Hello" ],
-                [ id: "rec-6", type: "message", msg: "Hello" ]
+                [ id: "rec-6", type: "message", msg: "Hello", time: Time.valueOf(LOCAL_TIME).getTime() ]
         ]))
 
         evalClosure = { List<IndexOperationRequest> items ->
@@ -236,15 +317,21 @@ class PutElasticsearchRecordTest {
             def messageTypeCount = items.findAll { it.type == "message" }.size()
             def nullIdCount = items.findAll { it.id == null }.size()
             def recIdCount = items.findAll { StringUtils.startsWith(it.id, "rec-") }.size()
+            def timestampCount = items.findAll { it.fields.get("@timestamp") ==
+                    LOCAL_TIME.format(DateTimeFormatter.ofPattern(RecordFieldType.TIME.getDefaultFormat()))
+            }.size()
             Assert.assertEquals("null type", 5, nullTypeCount)
             Assert.assertEquals("message type", 1, messageTypeCount)
             Assert.assertEquals("null id", 2, nullIdCount)
             Assert.assertEquals("rec- id", 4, recIdCount)
+            Assert.assertEquals("@timestamp", 1, timestampCount)
         }
 
         clientService.evalClosure = evalClosure
 
         runner.setProperty(PutElasticsearchRecord.INDEX_OP, "index")
+        runner.removeProperty(PutElasticsearchRecord.AT_TIMESTAMP)
+        runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP_RECORD_PATH, "/time")
         runner.removeProperty(PutElasticsearchRecord.TYPE)
         runner.enqueue(flowFileContents, [
                 "schema.name": "recordPathTest"
@@ -262,7 +349,7 @@ class PutElasticsearchRecordTest {
             [ id: "rec-3", op: "update", index: "bulk_a", type: "message", msg: "Hello" ],
             [ id: "rec-4", op: "upsert", index: "bulk_b", type: "message", msg: "Hello" ],
             [ id: "rec-5", op: "create", index: "bulk_a", type: "message", msg: "Hello" ],
-            [ id: "rec-6", op: "delete", index: "bulk_b", type: "message", msg: "Hello" ]
+            [ id: "rec-6", op: "delete", index: "bulk_b", type: "message", msg: "Hello", code: 101L ]
         ]))
 
         clientService.evalClosure = { List<IndexOperationRequest> items ->
@@ -271,13 +358,18 @@ class PutElasticsearchRecordTest {
             int update = items.findAll { it.operation == IndexOperationRequest.Operation.Update }.size()
             int upsert = items.findAll { it.operation == IndexOperationRequest.Operation.Upsert }.size()
             int delete = items.findAll { it.operation == IndexOperationRequest.Operation.Delete }.size()
+            def timestampCount = items.findAll { it.fields.get("@timestamp") == 101L }.size()
+            def noTimestampCount = items.findAll { it.fields.get("@timestamp") == null }.size()
             Assert.assertEquals(1, index)
             Assert.assertEquals(2, create)
             Assert.assertEquals(1, update)
             Assert.assertEquals(1, upsert)
             Assert.assertEquals(1, delete)
+            Assert.assertEquals(1, timestampCount)
+            Assert.assertEquals(5, noTimestampCount)
         }
 
+        runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP_RECORD_PATH, "/code")
         runner.enqueue(flowFileContents, [
             "schema.name": "recordPathTest"
         ])
@@ -285,6 +377,26 @@ class PutElasticsearchRecordTest {
         runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
         runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
         runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
+
+        runner.clearTransferState()
+
+        flowFileContents = prettyPrint(toJson([
+                [ id: "rec-1", op: "index", index: "bulk_a", type: "message", msg: "Hello" ]
+        ]))
+
+        clientService.evalClosure = { List<IndexOperationRequest> items ->
+            def timestampCount = items.findAll { it.fields.get("@timestamp") == "Hello" }.size()
+            Assert.assertEquals(1, timestampCount)
+        }
+
+        runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP_RECORD_PATH, "/msg")
+        runner.enqueue(flowFileContents, [
+                "schema.name": "recordPathTest"
+        ])
+        runner.run()
+        runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
     }
 
     @Test
@@ -354,4 +466,4 @@ class PutElasticsearchRecordTest {
         def errorFF = runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS)[0]
         assert errorFF.getAttribute(PutElasticsearchRecord.ATTR_RECORD_COUNT) == "1"
     }
-}
\ No newline at end of file
+}