You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/11/27 19:30:55 UTC

[GitHub] [nifi] ChrisSamo632 opened a new pull request #4691: NIFI-7990 add properties to map Record field as @timestamp in output …

ChrisSamo632 opened a new pull request #4691:
URL: https://github.com/apache/nifi/pull/4691


   #### Description of PR
   
   NIFI-7990 allow mapping of Record field to `@timestamp` in output to Elasticsearch for PutElasticsearchRecord and PutElasticsearchHttpRecord processors. Such a field cannot be defined within an Avro schema as the name is invalid, but this field is required for Elasticsearch Data Streams (new in ES 7.9).
   
   NIFI-7474 allow mapped id field to be retained within the Record for PutElasticsearchRecord
   
   ### For all changes:
   - [x] Is there a JIRA ticket associated with this PR? Is it referenced 
        in the commit message?
   
   - [x] Does your PR title start with **NIFI-XXXX** where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [x] Has your PR been rebased against the latest commit within the target branch (typically `main`)?
   
   - [x] Is your initial contribution a single, squashed commit? _Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not `squash` or use `--force` when pushing to allow for clean monitoring of changes._
   
   ### For code changes:
   - [x] Have you ensured that the full suite of tests is executed via `mvn -Pcontrib-check clean install` at the root `nifi` folder?
   - [x] Have you written or updated unit tests to verify your changes?
   - [x] Have you verified that the full build is successful on JDK 8?
   - [x] Have you verified that the full build is successful on JDK 11?
   - [ ] ~~If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?~~
   - [ ] ~~If applicable, have you updated the `LICENSE` file, including the main `LICENSE` file under `nifi-assembly`?~~
   - [ ] ~~If applicable, have you updated the `NOTICE` file, including the main `NOTICE` file found under `nifi-assembly`?~~
   - [x] If adding new Properties, have you added `.displayName` in addition to .name (programmatic access) for each of the new properties?
   
   ### For documentation related changes:
   - [ ] ~~Have you ensured that format looks appropriate for the output in which it is rendered?~~
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI for build issues and submit an update to your PR as soon as possible.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #4691: NIFI-7990 add properties to map Record field as @timestamp in output …

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on a change in pull request #4691:
URL: https://github.com/apache/nifi/pull/4691#discussion_r532728519



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
##########
@@ -376,13 +483,92 @@ private String getFromRecordPath(Record record, RecordPath path, final String fa
                 );
             }
 
-            fieldValue.updateValue(null);
+            if (!retain) {
+                fieldValue.updateValue(null);
+            }
+
+            return fieldValue.getValue().toString();
+        } else {
+            return fallback;
+        }
+    }
+
+    private Object getTimestampFromRecordPath(final Record record, final RecordPath path, final String fallback,
+                                              final boolean retain) {
+        if (path == null) {
+            return fallback;
+        }
+
+        final RecordPathResult result = path.evaluate(record);
+        final Optional<FieldValue> value = result.getSelectedFields().findFirst();
+        if (value.isPresent() && value.get().getValue() != null) {
+            final FieldValue fieldValue = value.get();
+
+            final DataType dataType = fieldValue.getField().getDataType();
+            final String fieldName = fieldValue.getField().getFieldName();
+            final DataType chosenDataType = dataType.getFieldType() == RecordFieldType.CHOICE
+                    ? DataTypeUtils.chooseDataType(value, (ChoiceDataType) dataType)
+                    : dataType;
+            final Object coercedValue = DataTypeUtils.convertType(fieldValue.getValue(), chosenDataType, fieldName);
+            if (coercedValue == null) {
+                return null;
+            }
+
+            final Object returnValue;
+            switch (chosenDataType.getFieldType()) {
+                case DATE:
+                case TIME:
+                case TIMESTAMP:
+                    final String format;
+                    switch (chosenDataType.getFieldType()) {
+                        case DATE:
+                            format = this.dateFormat;
+                            break;
+                        case TIME:
+                            format = this.timeFormat;
+                            break;
+                        default:
+                            format = this.timestampFormat;
+                    }
+                    returnValue = coerceStringToLong(
+                            fieldName,
+                            DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(format))
+                    );
+                    break;
+                case LONG:
+                    returnValue = DataTypeUtils.toLong(coercedValue, fieldName);
+                    break;
+                case INT:
+                case BYTE:
+                case SHORT:
+                    returnValue = DataTypeUtils.toInteger(coercedValue, fieldName);
+                    break;
+                case CHAR:
+                case STRING:
+                    returnValue = coerceStringToLong(fieldName, coercedValue.toString());
+                    break;
+                case BIGINT:
+                    returnValue = coercedValue;
+                    break;
+                default:
+                    throw new ProcessException(
+                            String.format("Cannot use %s field referenced by %s as @timestamp.", chosenDataType.toString(), path.getPath())
+                    );
+            }
 
-            String retVal = fieldValue.getValue().toString();
+            if (!retain) {
+                fieldValue.updateValue(null);
+            }
 
-            return retVal;
+            return returnValue;
         } else {
             return fallback;

Review comment:
       I decided that doing the coercion was probably the sensible thing to be consistent throughout the PutElasticsearchRecord processor and also because if I specify an epoch timestamp as a FlowFile attribute, I'd want it to be output as a Long rather than a String (although Elasticsearch is capable of doing this coercion itself if we didn't do this in NiFi)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #4691: NIFI-7990 add properties to map Record field as @timestamp in output …

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on a change in pull request #4691:
URL: https://github.com/apache/nifi/pull/4691#discussion_r723727008



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
##########
@@ -266,9 +283,11 @@
         descriptors.add(RECORD_WRITER);
         descriptors.add(LOG_ALL_ERRORS);
         descriptors.add(ID_RECORD_PATH);
+        descriptors.add(AT_TIMESTAMP_RECORD_PATH);
         descriptors.add(INDEX);
         descriptors.add(TYPE);
         descriptors.add(INDEX_OP);
+        descriptors.add(AT_TIMESTAMP);

Review comment:
       I was following the existing pattern of properties - the direct value properties (e.g. index, type) all appear together at the top of the processor; the Record Path lookup properties then appear in a separate group later on
   
   We could re-organise all the properties, but I didn't really want to do that initially at least
   
   What do you think?

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
##########
@@ -209,6 +207,25 @@
             .required(true)
             .build();
 
+    static final PropertyDescriptor AT_TIMESTAMP = new PropertyDescriptor.Builder()
+            .name("put-es-record-at-timestamp")
+            .displayName("@timestamp Value")
+            .description("The value to use as the @timestamp field (required for Elasticsearch Data Streams)")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor AT_TIMESTAMP_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("put-es-record-at-timestamp-path")
+            .displayName("@timestamp Record Path")
+            .description("A RecordPath pointing to a field in the record(s) that contains the @timestamp for the document " +
+                    "(required for Elasticsearch Data Streams). If left blank the @timestamp will be determined using the main property type")
+            .required(false)
+            .addValidator(new RecordPathValidator())
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();

Review comment:
       Not quite sure I follow what you mean, but I've re-worded the property description to be more inline with the other existing properties on the processor, does it make more sense now?

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
##########
@@ -209,6 +207,25 @@
             .required(true)
             .build();
 
+    static final PropertyDescriptor AT_TIMESTAMP = new PropertyDescriptor.Builder()
+            .name("put-es-record-at-timestamp")
+            .displayName("@timestamp Value")
+            .description("The value to use as the @timestamp field (required for Elasticsearch Data Streams)")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor AT_TIMESTAMP_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("put-es-record-at-timestamp-path")
+            .displayName("@timestamp Record Path")
+            .description("A RecordPath pointing to a field in the record(s) that contains the @timestamp for the document " +
+                    "(required for Elasticsearch Data Streams). If left blank the @timestamp will be determined using the main property type")

Review comment:
       Copy & paste error here I think... tried to keep the property descriptions more or less the same, but think I messed something up a little - updated to match the existing Index/Type properties, does this make more sense?

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
##########
@@ -405,11 +424,17 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
 
         this.nullSuppression = context.getProperty(SUPPRESS_NULLS).getValue();
 
-        final String id_path = context.getProperty(ID_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue();
-        final RecordPath recordPath = StringUtils.isEmpty(id_path) ? null : recordPathCache.getCompiled(id_path);
+        final String idPath = context.getProperty(ID_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue();
+        final RecordPath recordPath = StringUtils.isEmpty(idPath) ? null : recordPathCache.getCompiled(idPath);
         final StringBuilder sb = new StringBuilder();
         final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
 
+        final String atTimestamp = context.getProperty(AT_TIMESTAMP).evaluateAttributeExpressions(flowFile).getValue();
+        final String atTimestampPath = context.getProperty(AT_TIMESTAMP_RECORD_PATH).isSet()
+                ? context.getProperty(AT_TIMESTAMP_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue()
+                : null;

Review comment:
       I just copied the existing code for the other property values to be fair, but I think you could be right and all the code like this could be simplified

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
##########
@@ -209,6 +207,25 @@
             .required(true)
             .build();
 
+    static final PropertyDescriptor AT_TIMESTAMP = new PropertyDescriptor.Builder()
+            .name("put-es-record-at-timestamp")
+            .displayName("@timestamp Value")
+            .description("The value to use as the @timestamp field (required for Elasticsearch Data Streams)")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor AT_TIMESTAMP_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("put-es-record-at-timestamp-path")
+            .displayName("@timestamp Record Path")
+            .description("A RecordPath pointing to a field in the record(s) that contains the @timestamp for the document " +
+                    "(required for Elasticsearch Data Streams). If left blank the @timestamp will be determined using the main property type")

Review comment:
       Silly me, forgot that I had the two processors with the same properties and descriptions! 🙄 

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
##########
@@ -266,9 +283,11 @@
         descriptors.add(RECORD_WRITER);
         descriptors.add(LOG_ALL_ERRORS);
         descriptors.add(ID_RECORD_PATH);
+        descriptors.add(AT_TIMESTAMP_RECORD_PATH);
         descriptors.add(INDEX);
         descriptors.add(TYPE);
         descriptors.add(INDEX_OP);
+        descriptors.add(AT_TIMESTAMP);

Review comment:
       I was looking at the wrong processor, d'oh, my bad! 🙄 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #4691: NIFI-7990 add properties to map Record field as @timestamp in output …

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on a change in pull request #4691:
URL: https://github.com/apache/nifi/pull/4691#discussion_r723727008



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
##########
@@ -266,9 +283,11 @@
         descriptors.add(RECORD_WRITER);
         descriptors.add(LOG_ALL_ERRORS);
         descriptors.add(ID_RECORD_PATH);
+        descriptors.add(AT_TIMESTAMP_RECORD_PATH);
         descriptors.add(INDEX);
         descriptors.add(TYPE);
         descriptors.add(INDEX_OP);
+        descriptors.add(AT_TIMESTAMP);

Review comment:
       I was following the existing pattern of properties - the direct value properties (e.g. index, type) all appear together at the top of the processor; the Record Path lookup properties then appear in a separate group later on
   
   We could re-organise all the properties, but I didn't really want to do that initially at least
   
   What do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] gresockj commented on a change in pull request #4691: NIFI-7990 add properties to map Record field as @timestamp in output …

Posted by GitBox <gi...@apache.org>.
gresockj commented on a change in pull request #4691:
URL: https://github.com/apache/nifi/pull/4691#discussion_r722097750



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
##########
@@ -209,6 +207,25 @@
             .required(true)
             .build();
 
+    static final PropertyDescriptor AT_TIMESTAMP = new PropertyDescriptor.Builder()
+            .name("put-es-record-at-timestamp")
+            .displayName("@timestamp Value")
+            .description("The value to use as the @timestamp field (required for Elasticsearch Data Streams)")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor AT_TIMESTAMP_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("put-es-record-at-timestamp-path")
+            .displayName("@timestamp Record Path")
+            .description("A RecordPath pointing to a field in the record(s) that contains the @timestamp for the document " +
+                    "(required for Elasticsearch Data Streams). If left blank the @timestamp will be determined using the main property type")
+            .required(false)
+            .addValidator(new RecordPathValidator())
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();

Review comment:
       It looks like you use `@timestamp Value` as a default timestamp in the unit test below -- should we rename it to `Default @timestamp`?  Regardless, I think we should make it clearer in the property descriptions what the fallback logic is -- right now it looks like both are required properties for Data Streams.

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
##########
@@ -405,11 +424,17 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
 
         this.nullSuppression = context.getProperty(SUPPRESS_NULLS).getValue();
 
-        final String id_path = context.getProperty(ID_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue();
-        final RecordPath recordPath = StringUtils.isEmpty(id_path) ? null : recordPathCache.getCompiled(id_path);
+        final String idPath = context.getProperty(ID_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue();
+        final RecordPath recordPath = StringUtils.isEmpty(idPath) ? null : recordPathCache.getCompiled(idPath);
         final StringBuilder sb = new StringBuilder();
         final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
 
+        final String atTimestamp = context.getProperty(AT_TIMESTAMP).evaluateAttributeExpressions(flowFile).getValue();
+        final String atTimestampPath = context.getProperty(AT_TIMESTAMP_RECORD_PATH).isSet()
+                ? context.getProperty(AT_TIMESTAMP_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue()
+                : null;

Review comment:
       If I'm reading `StandardPropertyValue` correctly, this could be simplified to the following since `evaluateAttributeExrpessions` gracefully handles a null `rawValue`:
   ```suggestion
           final String atTimestampPath = context.getProperty(AT_TIMESTAMP_RECORD_PATH)
                   .evaluateAttributeExpressions(flowFile).getValue();
   ```

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
##########
@@ -209,6 +207,25 @@
             .required(true)
             .build();
 
+    static final PropertyDescriptor AT_TIMESTAMP = new PropertyDescriptor.Builder()
+            .name("put-es-record-at-timestamp")
+            .displayName("@timestamp Value")
+            .description("The value to use as the @timestamp field (required for Elasticsearch Data Streams)")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor AT_TIMESTAMP_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("put-es-record-at-timestamp-path")
+            .displayName("@timestamp Record Path")
+            .description("A RecordPath pointing to a field in the record(s) that contains the @timestamp for the document " +
+                    "(required for Elasticsearch Data Streams). If left blank the @timestamp will be determined using the main property type")

Review comment:
       What do you mean by "determined using the main property type"?

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
##########
@@ -498,6 +501,132 @@ public void testPutElasticSearchOnTriggerWithInvalidIndexOp() throws IOException
         assertNotNull(out);
     }
 
+    @Test
+    public void testPutElasticsearchOnTriggerWithNoAtTimestampPath() throws Exception {
+        PutElasticsearchHttpRecordTestProcessor processor = new PutElasticsearchHttpRecordTestProcessor(false);
+        runner = TestRunners.newTestRunner(processor);
+        generateTestData(1);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
+
+        runner.removeProperty(PutElasticsearchHttpRecord.AT_TIMESTAMP); // no default
+        runner.setProperty(PutElasticsearchHttpRecord.AT_TIMESTAMP_RECORD_PATH, "/none"); // Field does not exist
+        processor.setRecordChecks(record -> assertTimestamp(record, null)); // no @timestamp
+        runner.enqueue(new byte[0]);
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
+        assertNotNull(out);
+        runner.clearTransferState();
+
+        // now add a default @timestamp
+        final String timestamp = "2020-11-27T14:37:00.000Z";
+        runner.setProperty(PutElasticsearchHttpRecord.AT_TIMESTAMP, timestamp);
+        processor.setRecordChecks(record -> assertTimestamp(record, timestamp)); // @timestamp defaulted
+        runner.enqueue(new byte[0]);
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+        final MockFlowFile out2 = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
+        assertNotNull(out2);
+    }
+
+    @Test
+    public void testPutElasticsearchOnTriggerWithAtTimestampFromAttribute() throws IOException {
+        PutElasticsearchHttpRecordTestProcessor processor = new PutElasticsearchHttpRecordTestProcessor(false);
+        runner = TestRunners.newTestRunner(processor);
+        generateTestData(1);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(PutElasticsearchHttpRecord.INDEX, "${i}");
+        runner.setProperty(PutElasticsearchHttpRecord.AT_TIMESTAMP, "${timestamp}");
+
+        final String timestamp = "2020-11-27T15:10:00.000Z";
+        processor.setRecordChecks(record -> assertTimestamp(record, timestamp));
+        runner.enqueue(new byte[0], new HashMap<String, String>() {{
+            put("doc_id", "28039652144");
+            put("i", "doc");
+            put("timestamp", timestamp);
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
+        assertNotNull(out);
+        runner.clearTransferState();
+
+        // Now try an empty attribute value, should be no timestamp
+        processor.setRecordChecks(record -> assertTimestamp(record, null));
+        runner.enqueue(new byte[0], new HashMap<String, String>() {{
+            put("doc_id", "28039652144");
+            put("i", "doc");
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+        final MockFlowFile out2 = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
+        assertNotNull(out2);
+    }
+
+    @Test
+    public void testPutElasticsearchOnTriggerWithAtTimstampPath() throws Exception {
+        PutElasticsearchHttpRecordTestProcessor processor = new PutElasticsearchHttpRecordTestProcessor(false);
+        DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern(RecordFieldType.TIME.getDefaultFormat());
+        DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(RecordFieldType.TIMESTAMP.getDefaultFormat());
+        DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern(RecordFieldType.DATE.getDefaultFormat());
+        runner = TestRunners.newTestRunner(processor);
+        generateTestData(1);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
+
+        runner.setProperty(PutElasticsearchHttpRecord.AT_TIMESTAMP_RECORD_PATH, "/ts"); // TIMESTAMP
+        processor.setRecordChecks(record -> assertTimestamp(record, LOCAL_DATE_TIME.format(dateTimeFormatter)));
+        runner.enqueue(new byte[0]);
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+        assertNotNull(runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0));
+        runner.clearTransferState();
+
+        runner.setProperty(PutElasticsearchHttpRecord.AT_TIMESTAMP_RECORD_PATH, "/date"); // DATE;
+        processor.setRecordChecks(record -> assertTimestamp(record, LOCAL_DATE.format(dateFormatter)));
+        runner.enqueue(new byte[0]);
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+        assertNotNull(runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0));
+        runner.clearTransferState();
+
+        runner.setProperty(PutElasticsearchHttpRecord.AT_TIMESTAMP_RECORD_PATH, "/time"); // TIME
+        processor.setRecordChecks(record -> assertTimestamp(record, LOCAL_TIME.format(timeFormatter)));
+        runner.enqueue(new byte[0]);
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+        assertNotNull(runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0));
+        runner.clearTransferState();
+
+        // these INT/STRING values might not make sense from an Elasticsearch point of view,
+        // but we want to prove we can handle them being selected from teh Record

Review comment:
       the*

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
##########
@@ -377,11 +494,92 @@ private String getFromRecordPath(final Record record, final RecordPath path, fin
                 );
             }
 
-            fieldValue.updateValue(null);
+            if (!retain) {
+                fieldValue.updateValue(null);
+            }
 
             return fieldValue.getValue().toString();
         } else {
             return fallback;
         }
     }
+
+    private Object getTimestampFromRecordPath(final Record record, final RecordPath path, final String fallback,
+                                              final boolean retain) {
+        if (path == null) {
+            return coerceStringToLong("@timestamp", fallback);
+        }
+
+        final RecordPathResult result = path.evaluate(record);
+        final Optional<FieldValue> value = result.getSelectedFields().findFirst();
+        if (value.isPresent() && value.get().getValue() != null) {
+            final FieldValue fieldValue = value.get();
+
+            final DataType dataType = fieldValue.getField().getDataType();
+            final String fieldName = fieldValue.getField().getFieldName();
+            final DataType chosenDataType = dataType.getFieldType() == RecordFieldType.CHOICE
+                    ? DataTypeUtils.chooseDataType(value, (ChoiceDataType) dataType)
+                    : dataType;
+            final Object coercedValue = DataTypeUtils.convertType(fieldValue.getValue(), chosenDataType, fieldName);
+            if (coercedValue == null) {
+                return null;
+            }
+
+            final Object returnValue;
+            switch (chosenDataType.getFieldType()) {
+                case DATE:
+                case TIME:
+                case TIMESTAMP:
+                    final String format;
+                    switch (chosenDataType.getFieldType()) {

Review comment:
       How about moving the nested `switch` to a separate method for readabilty?

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
##########
@@ -147,9 +194,51 @@
         .required(false)
         .build();
 
+    static final PropertyDescriptor AT_TIMESTAMP_DATE_FORMAT = new PropertyDescriptor.Builder()
+        .name("put-es-record-at-timestamp-date-format")
+        .displayName("@Timestamp Record Path Date Format")
+        .description("Specifies the format to use when writing Date field for @timestamp. "
+                + "If not specified, the default format '" + RecordFieldType.DATE.getDefaultFormat() + "' is used. "
+                + "If specified, the value must match the Java Simple Date Format (for example, MM/dd/yyyy for a two-digit month, followed by "
+                + "a two-digit day, followed by a four-digit year, all separated by '/' characters, as in 01/01/2017).")

Review comment:
       Just to make the example immediately clear, let's use a date like `01/25/2017`

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
##########
@@ -266,9 +283,11 @@
         descriptors.add(RECORD_WRITER);
         descriptors.add(LOG_ALL_ERRORS);
         descriptors.add(ID_RECORD_PATH);
+        descriptors.add(AT_TIMESTAMP_RECORD_PATH);
         descriptors.add(INDEX);
         descriptors.add(TYPE);
         descriptors.add(INDEX_OP);
+        descriptors.add(AT_TIMESTAMP);

Review comment:
       Could we put `AT_TIMESTAMP` right below `AT_TIMESTAMP_RECORD_PATH`?

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
##########
@@ -236,11 +342,18 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
         final String typePath = context.getProperty(TYPE_RECORD_PATH).isSet()
                 ? context.getProperty(TYPE_RECORD_PATH).evaluateAttributeExpressions(input).getValue()
                 : null;
+        final String atTimestampPath = context.getProperty(AT_TIMESTAMP_RECORD_PATH).isSet()

Review comment:
       Same comment as above




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] asfgit closed pull request #4691: NIFI-7990 add properties to map Record field as @timestamp in output …

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #4691:
URL: https://github.com/apache/nifi/pull/4691


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] gresockj commented on a change in pull request #4691: NIFI-7990 add properties to map Record field as @timestamp in output …

Posted by GitBox <gi...@apache.org>.
gresockj commented on a change in pull request #4691:
URL: https://github.com/apache/nifi/pull/4691#discussion_r723795065



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
##########
@@ -209,6 +207,25 @@
             .required(true)
             .build();
 
+    static final PropertyDescriptor AT_TIMESTAMP = new PropertyDescriptor.Builder()
+            .name("put-es-record-at-timestamp")
+            .displayName("@timestamp Value")
+            .description("The value to use as the @timestamp field (required for Elasticsearch Data Streams)")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor AT_TIMESTAMP_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("put-es-record-at-timestamp-path")
+            .displayName("@timestamp Record Path")
+            .description("A RecordPath pointing to a field in the record(s) that contains the @timestamp for the document " +
+                    "(required for Elasticsearch Data Streams). If left blank the @timestamp will be determined using the main property type")

Review comment:
       Yep, I see the update in `PutElasticsearchRecord`, which does clarify it.  Can you apply the same description in  `PutElasticsearchHttpRecord`?

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
##########
@@ -209,6 +207,25 @@
             .required(true)
             .build();
 
+    static final PropertyDescriptor AT_TIMESTAMP = new PropertyDescriptor.Builder()
+            .name("put-es-record-at-timestamp")
+            .displayName("@timestamp Value")
+            .description("The value to use as the @timestamp field (required for Elasticsearch Data Streams)")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor AT_TIMESTAMP_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("put-es-record-at-timestamp-path")
+            .displayName("@timestamp Record Path")
+            .description("A RecordPath pointing to a field in the record(s) that contains the @timestamp for the document " +
+                    "(required for Elasticsearch Data Streams). If left blank the @timestamp will be determined using the main property type")
+            .required(false)
+            .addValidator(new RecordPathValidator())
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();

Review comment:
       The property descriptions in `PutElasticsearchRecords` are clear to me now, let's just bring the descriptions over to `PutElasticsearchHttpRecord`.

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
##########
@@ -266,9 +283,11 @@
         descriptors.add(RECORD_WRITER);
         descriptors.add(LOG_ALL_ERRORS);
         descriptors.add(ID_RECORD_PATH);
+        descriptors.add(AT_TIMESTAMP_RECORD_PATH);
         descriptors.add(INDEX);
         descriptors.add(TYPE);
         descriptors.add(INDEX_OP);
+        descriptors.add(AT_TIMESTAMP);

Review comment:
       What you're describing sounds like the `PutElasticsearchRecord` processor, whose property order does look natural to me.  Here the order seems different, with the record path properties both together, followed by the index/type/op/@timestamp properties.  It seems to me that `AT_TIMESTAMP` could be moved to just below `AT_TIMESTAMP_RECORD_PATH` and still keep the same grouping -- does that make sense?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] gresockj commented on a change in pull request #4691: NIFI-7990 add properties to map Record field as @timestamp in output …

Posted by GitBox <gi...@apache.org>.
gresockj commented on a change in pull request #4691:
URL: https://github.com/apache/nifi/pull/4691#discussion_r723795065



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
##########
@@ -209,6 +207,25 @@
             .required(true)
             .build();
 
+    static final PropertyDescriptor AT_TIMESTAMP = new PropertyDescriptor.Builder()
+            .name("put-es-record-at-timestamp")
+            .displayName("@timestamp Value")
+            .description("The value to use as the @timestamp field (required for Elasticsearch Data Streams)")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor AT_TIMESTAMP_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("put-es-record-at-timestamp-path")
+            .displayName("@timestamp Record Path")
+            .description("A RecordPath pointing to a field in the record(s) that contains the @timestamp for the document " +
+                    "(required for Elasticsearch Data Streams). If left blank the @timestamp will be determined using the main property type")

Review comment:
       Yep, I see the update in `PutElasticsearchRecord`, which does clarify it.  Can you apply the same description in  `PutElasticsearchHttpRecord`?

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
##########
@@ -209,6 +207,25 @@
             .required(true)
             .build();
 
+    static final PropertyDescriptor AT_TIMESTAMP = new PropertyDescriptor.Builder()
+            .name("put-es-record-at-timestamp")
+            .displayName("@timestamp Value")
+            .description("The value to use as the @timestamp field (required for Elasticsearch Data Streams)")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor AT_TIMESTAMP_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("put-es-record-at-timestamp-path")
+            .displayName("@timestamp Record Path")
+            .description("A RecordPath pointing to a field in the record(s) that contains the @timestamp for the document " +
+                    "(required for Elasticsearch Data Streams). If left blank the @timestamp will be determined using the main property type")
+            .required(false)
+            .addValidator(new RecordPathValidator())
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();

Review comment:
       The property descriptions in `PutElasticsearchRecords` are clear to me now, let's just bring the descriptions over to `PutElasticsearchHttpRecord`.

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
##########
@@ -266,9 +283,11 @@
         descriptors.add(RECORD_WRITER);
         descriptors.add(LOG_ALL_ERRORS);
         descriptors.add(ID_RECORD_PATH);
+        descriptors.add(AT_TIMESTAMP_RECORD_PATH);
         descriptors.add(INDEX);
         descriptors.add(TYPE);
         descriptors.add(INDEX_OP);
+        descriptors.add(AT_TIMESTAMP);

Review comment:
       What you're describing sounds like the `PutElasticsearchRecord` processor, whose property order does look natural to me.  Here the order seems different, with the record path properties both together, followed by the index/type/op/@timestamp properties.  It seems to me that `AT_TIMESTAMP` could be moved to just below `AT_TIMESTAMP_RECORD_PATH` and still keep the same grouping -- does that make sense?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #4691: NIFI-7990 add properties to map Record field as @timestamp in output …

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on a change in pull request #4691:
URL: https://github.com/apache/nifi/pull/4691#discussion_r723727411



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
##########
@@ -209,6 +207,25 @@
             .required(true)
             .build();
 
+    static final PropertyDescriptor AT_TIMESTAMP = new PropertyDescriptor.Builder()
+            .name("put-es-record-at-timestamp")
+            .displayName("@timestamp Value")
+            .description("The value to use as the @timestamp field (required for Elasticsearch Data Streams)")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor AT_TIMESTAMP_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("put-es-record-at-timestamp-path")
+            .displayName("@timestamp Record Path")
+            .description("A RecordPath pointing to a field in the record(s) that contains the @timestamp for the document " +
+                    "(required for Elasticsearch Data Streams). If left blank the @timestamp will be determined using the main property type")
+            .required(false)
+            .addValidator(new RecordPathValidator())
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();

Review comment:
       Not quite sure I follow what you mean, but I've re-worded the property description to be more inline with the other existing properties on the processor, does it make more sense now?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #4691: NIFI-7990 add properties to map Record field as @timestamp in output …

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on a change in pull request #4691:
URL: https://github.com/apache/nifi/pull/4691#discussion_r723727794



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
##########
@@ -209,6 +207,25 @@
             .required(true)
             .build();
 
+    static final PropertyDescriptor AT_TIMESTAMP = new PropertyDescriptor.Builder()
+            .name("put-es-record-at-timestamp")
+            .displayName("@timestamp Value")
+            .description("The value to use as the @timestamp field (required for Elasticsearch Data Streams)")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor AT_TIMESTAMP_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("put-es-record-at-timestamp-path")
+            .displayName("@timestamp Record Path")
+            .description("A RecordPath pointing to a field in the record(s) that contains the @timestamp for the document " +
+                    "(required for Elasticsearch Data Streams). If left blank the @timestamp will be determined using the main property type")

Review comment:
       Copy & paste error here I think... tried to keep the property descriptions more or less the same, but think I messed something up a little - updated to match the existing Index/Type properties, does this make more sense?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] asfgit closed pull request #4691: NIFI-7990 add properties to map Record field as @timestamp in output …

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #4691:
URL: https://github.com/apache/nifi/pull/4691


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on pull request #4691: NIFI-7990 add properties to map Record field as @timestamp in output …

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on pull request #4691:
URL: https://github.com/apache/nifi/pull/4691#issuecomment-937314291


   > I noticed that using the NIFI-7990.json flow definition, using `PutElasticsearchHttpRecord`, `@timestamp` for record "1" appears to be a numeric type but for record "2" it appears to be a string type:
   > ...
   > However, using `PutElasticsearchRecord`, `@timestamp` is consistently a numeric type:
   > ...
   > I don't know if this is actually a problem in ES, but wanted to point it out in case we need to update the @timestamp code to make it consistent in the two cases.
   
   Good spot! I think this was likely due to the `PutElasticsearchHttpRecord` processor **not** attempting to coerce the `@timestamp` field value if it had been provided via the direct `@timestamp` property _or_ from a Record Path field that was of DataType STRING. I intentionally changed `PutElasticsearchRecord` to `coerceStringToLong` - basically this is an addition to a [question I previously asked myself on this PR](https://github.com/apache/nifi/pull/4691#discussion_r532728519)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #4691: NIFI-7990 add properties to map Record field as @timestamp in output …

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on a change in pull request #4691:
URL: https://github.com/apache/nifi/pull/4691#discussion_r531746450



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
##########
@@ -376,13 +483,92 @@ private String getFromRecordPath(Record record, RecordPath path, final String fa
                 );
             }
 
-            fieldValue.updateValue(null);
+            if (!retain) {
+                fieldValue.updateValue(null);
+            }
+
+            return fieldValue.getValue().toString();
+        } else {
+            return fallback;
+        }
+    }
+
+    private Object getTimestampFromRecordPath(final Record record, final RecordPath path, final String fallback,
+                                              final boolean retain) {
+        if (path == null) {
+            return fallback;
+        }
+
+        final RecordPathResult result = path.evaluate(record);
+        final Optional<FieldValue> value = result.getSelectedFields().findFirst();
+        if (value.isPresent() && value.get().getValue() != null) {
+            final FieldValue fieldValue = value.get();
+
+            final DataType dataType = fieldValue.getField().getDataType();
+            final String fieldName = fieldValue.getField().getFieldName();
+            final DataType chosenDataType = dataType.getFieldType() == RecordFieldType.CHOICE
+                    ? DataTypeUtils.chooseDataType(value, (ChoiceDataType) dataType)
+                    : dataType;
+            final Object coercedValue = DataTypeUtils.convertType(fieldValue.getValue(), chosenDataType, fieldName);
+            if (coercedValue == null) {
+                return null;
+            }
+
+            final Object returnValue;
+            switch (chosenDataType.getFieldType()) {
+                case DATE:
+                case TIME:
+                case TIMESTAMP:
+                    final String format;
+                    switch (chosenDataType.getFieldType()) {
+                        case DATE:
+                            format = this.dateFormat;
+                            break;
+                        case TIME:
+                            format = this.timeFormat;
+                            break;
+                        default:
+                            format = this.timestampFormat;
+                    }
+                    returnValue = coerceStringToLong(
+                            fieldName,
+                            DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(format))
+                    );
+                    break;
+                case LONG:
+                    returnValue = DataTypeUtils.toLong(coercedValue, fieldName);
+                    break;
+                case INT:
+                case BYTE:
+                case SHORT:
+                    returnValue = DataTypeUtils.toInteger(coercedValue, fieldName);
+                    break;
+                case CHAR:
+                case STRING:
+                    returnValue = coerceStringToLong(fieldName, coercedValue.toString());
+                    break;
+                case BIGINT:
+                    returnValue = coercedValue;
+                    break;
+                default:
+                    throw new ProcessException(
+                            String.format("Cannot use %s field referenced by %s as @timestamp.", chosenDataType.toString(), path.getPath())
+                    );
+            }
 
-            String retVal = fieldValue.getValue().toString();
+            if (!retain) {
+                fieldValue.updateValue(null);
+            }
 
-            return retVal;
+            return returnValue;
         } else {
             return fallback;

Review comment:
       Question: should we coerce this to a long as for a string field *or* leave this as a string, which is more consistent with `PutElasticsearchHttpRecord`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #4691: NIFI-7990 add properties to map Record field as @timestamp in output …

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on a change in pull request #4691:
URL: https://github.com/apache/nifi/pull/4691#discussion_r723918044



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
##########
@@ -209,6 +207,25 @@
             .required(true)
             .build();
 
+    static final PropertyDescriptor AT_TIMESTAMP = new PropertyDescriptor.Builder()
+            .name("put-es-record-at-timestamp")
+            .displayName("@timestamp Value")
+            .description("The value to use as the @timestamp field (required for Elasticsearch Data Streams)")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor AT_TIMESTAMP_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("put-es-record-at-timestamp-path")
+            .displayName("@timestamp Record Path")
+            .description("A RecordPath pointing to a field in the record(s) that contains the @timestamp for the document " +
+                    "(required for Elasticsearch Data Streams). If left blank the @timestamp will be determined using the main property type")

Review comment:
       Silly me, forgot that I had the two processors with the same properties and descriptions! 🙄 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #4691: NIFI-7990 add properties to map Record field as @timestamp in output …

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on a change in pull request #4691:
URL: https://github.com/apache/nifi/pull/4691#discussion_r723728282



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
##########
@@ -405,11 +424,17 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
 
         this.nullSuppression = context.getProperty(SUPPRESS_NULLS).getValue();
 
-        final String id_path = context.getProperty(ID_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue();
-        final RecordPath recordPath = StringUtils.isEmpty(id_path) ? null : recordPathCache.getCompiled(id_path);
+        final String idPath = context.getProperty(ID_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue();
+        final RecordPath recordPath = StringUtils.isEmpty(idPath) ? null : recordPathCache.getCompiled(idPath);
         final StringBuilder sb = new StringBuilder();
         final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
 
+        final String atTimestamp = context.getProperty(AT_TIMESTAMP).evaluateAttributeExpressions(flowFile).getValue();
+        final String atTimestampPath = context.getProperty(AT_TIMESTAMP_RECORD_PATH).isSet()
+                ? context.getProperty(AT_TIMESTAMP_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue()
+                : null;

Review comment:
       I just copied the existing code for the other property values to be fair, but I think you could be right and all the code like this could be simplified




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #4691: NIFI-7990 add properties to map Record field as @timestamp in output …

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on a change in pull request #4691:
URL: https://github.com/apache/nifi/pull/4691#discussion_r723942512



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
##########
@@ -266,9 +283,11 @@
         descriptors.add(RECORD_WRITER);
         descriptors.add(LOG_ALL_ERRORS);
         descriptors.add(ID_RECORD_PATH);
+        descriptors.add(AT_TIMESTAMP_RECORD_PATH);
         descriptors.add(INDEX);
         descriptors.add(TYPE);
         descriptors.add(INDEX_OP);
+        descriptors.add(AT_TIMESTAMP);

Review comment:
       I was looking at the wrong processor, d'oh, my bad! 🙄 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on pull request #4691: NIFI-7990 add properties to map Record field as @timestamp in output …

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on pull request #4691:
URL: https://github.com/apache/nifi/pull/4691#issuecomment-937314291


   > I noticed that using the NIFI-7990.json flow definition, using `PutElasticsearchHttpRecord`, `@timestamp` for record "1" appears to be a numeric type but for record "2" it appears to be a string type:
   > ...
   > However, using `PutElasticsearchRecord`, `@timestamp` is consistently a numeric type:
   > ...
   > I don't know if this is actually a problem in ES, but wanted to point it out in case we need to update the @timestamp code to make it consistent in the two cases.
   
   Good spot! I think this was likely due to the `PutElasticsearchHttpRecord` processor **not** attempting to coerce the `@timestamp` field value if it had been provided via the direct `@timestamp` property _or_ from a Record Path field that was of DataType STRING. I intentionally changed `PutElasticsearchRecord` to `coerceStringToLong` - basically this is an addition to a [question I previously asked myself on this PR](https://github.com/apache/nifi/pull/4691#discussion_r532728519)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org