You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2021/12/13 18:43:26 UTC

[nifi] 22/22: NIFI-8153 custom date/time format properties for PutElasticsearchRecord

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.15
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 5e9c09c4c95213ebeaaeff078d02d09fcc8e158a
Author: Chris Sampson <ch...@gmail.com>
AuthorDate: Fri Nov 26 12:31:49 2021 +0000

    NIFI-8153 custom date/time format properties for PutElasticsearchRecord
---
 .../elasticsearch/PutElasticsearchRecord.java      |  61 ++++++----
 .../PutElasticsearchRecordTest.groovy              | 128 ++++++++++++++++++++-
 2 files changed, 168 insertions(+), 21 deletions(-)

diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
index ec3c3f2..900870e 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
@@ -55,6 +55,7 @@ import org.apache.nifi.serialization.SimpleDateFormatValidator;
 import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.PushBackRecordSet;
 import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.type.ChoiceDataType;
 import org.apache.nifi.serialization.record.util.DataTypeUtils;
@@ -202,36 +203,34 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
         .required(false)
         .build();
 
-    static final PropertyDescriptor AT_TIMESTAMP_DATE_FORMAT = new PropertyDescriptor.Builder()
+    static final PropertyDescriptor DATE_FORMAT = new PropertyDescriptor.Builder()
         .name("put-es-record-at-timestamp-date-format")
-        .displayName("@Timestamp Record Path Date Format")
-        .description("Specifies the format to use when writing Date field for @timestamp. "
+        .displayName("Date Format")
+        .description("Specifies the format to use when writing Date fields. "
                 + "If not specified, the default format '" + RecordFieldType.DATE.getDefaultFormat() + "' is used. "
                 + "If specified, the value must match the Java Simple Date Format (for example, MM/dd/yyyy for a two-digit month, followed by "
                 + "a two-digit day, followed by a four-digit year, all separated by '/' characters, as in 01/25/2017).")
         .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
         .addValidator(new SimpleDateFormatValidator())
         .required(false)
-        .dependsOn(AT_TIMESTAMP_RECORD_PATH)
         .build();
 
-    static final PropertyDescriptor AT_TIMESTAMP_TIME_FORMAT = new PropertyDescriptor.Builder()
+    static final PropertyDescriptor TIME_FORMAT = new PropertyDescriptor.Builder()
         .name("put-es-record-at-timestamp-time-format")
-        .displayName("@Timestamp Record Path Time Format")
-        .description("Specifies the format to use when writing Time field for @timestamp. "
+        .displayName("Time Format")
+        .description("Specifies the format to use when writing Time fields. "
                 + "If not specified, the default format '" + RecordFieldType.TIME.getDefaultFormat() + "' is used. "
                 + "If specified, the value must match the Java Simple Date Format (for example, HH:mm:ss for a two-digit hour in 24-hour format, followed by "
                 + "a two-digit minute, followed by a two-digit second, all separated by ':' characters, as in 18:04:15).")
         .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
         .addValidator(new SimpleDateFormatValidator())
         .required(false)
-        .dependsOn(AT_TIMESTAMP_RECORD_PATH)
         .build();
 
-    static final PropertyDescriptor AT_TIMESTAMP_TIMESTAMP_FORMAT = new PropertyDescriptor.Builder()
+    static final PropertyDescriptor TIMESTAMP_FORMAT = new PropertyDescriptor.Builder()
         .name("put-es-record-at-timestamp-timestamp-format")
-        .displayName("@Timestamp Record Path Timestamp Format")
-        .description("Specifies the format to use when writing Timestamp field for @timestamp. "
+        .displayName("Timestamp Format")
+        .description("Specifies the format to use when writing Timestamp fields. "
                 + "If not specified, the default format '" + RecordFieldType.TIMESTAMP.getDefaultFormat() + "' is used. "
                 + "If specified, the value must match the Java Simple Date Format (for example, MM/dd/yyyy HH:mm:ss for a two-digit month, followed by "
                 + "a two-digit day, followed by a four-digit year, all separated by '/' characters; and then followed by a two-digit hour in 24-hour format, followed by "
@@ -239,14 +238,12 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
         .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
         .addValidator(new SimpleDateFormatValidator())
         .required(false)
-        .dependsOn(AT_TIMESTAMP_RECORD_PATH)
         .build();
 
     static final List<PropertyDescriptor> DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
         INDEX_OP, INDEX, TYPE, AT_TIMESTAMP, CLIENT_SERVICE, RECORD_READER, BATCH_SIZE, ID_RECORD_PATH, RETAIN_ID_FIELD,
         INDEX_OP_RECORD_PATH, INDEX_RECORD_PATH, TYPE_RECORD_PATH, AT_TIMESTAMP_RECORD_PATH, RETAIN_AT_TIMESTAMP_FIELD,
-        AT_TIMESTAMP_DATE_FORMAT, AT_TIMESTAMP_TIME_FORMAT, AT_TIMESTAMP_TIMESTAMP_FORMAT, LOG_ERROR_RESPONSES,
-        ERROR_RECORD_WRITER
+        DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT, LOG_ERROR_RESPONSES, ERROR_RECORD_WRITER
     ));
     static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
         REL_SUCCESS, REL_FAILURE, REL_RETRY, REL_FAILED_RECORDS
@@ -300,15 +297,15 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
         this.writerFactory = context.getProperty(ERROR_RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
         this.logErrors = context.getProperty(LOG_ERROR_RESPONSES).asBoolean();
 
-        this.dateFormat = context.getProperty(AT_TIMESTAMP_DATE_FORMAT).evaluateAttributeExpressions().getValue();
+        this.dateFormat = context.getProperty(DATE_FORMAT).evaluateAttributeExpressions().getValue();
         if (this.dateFormat == null) {
             this.dateFormat = RecordFieldType.DATE.getDefaultFormat();
         }
-        this.timeFormat = context.getProperty(AT_TIMESTAMP_TIME_FORMAT).evaluateAttributeExpressions().getValue();
+        this.timeFormat = context.getProperty(TIME_FORMAT).evaluateAttributeExpressions().getValue();
         if (this.timeFormat == null) {
             this.timeFormat = RecordFieldType.TIME.getDefaultFormat();
         }
-        this.timestampFormat = context.getProperty(AT_TIMESTAMP_TIMESTAMP_FORMAT).evaluateAttributeExpressions().getValue();
+        this.timestampFormat = context.getProperty(TIMESTAMP_FORMAT).evaluateAttributeExpressions().getValue();
         if (this.timestampFormat == null) {
             this.timestampFormat = RecordFieldType.TIMESTAMP.getDefaultFormat();
         }
@@ -383,14 +380,15 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
             Record record;
             while ((record = recordSet.next()) != null) {
                 final String idx = getFromRecordPath(record, iPath, index, false);
-                final String t   = getFromRecordPath(record, tPath, type, false);
+                final String t = getFromRecordPath(record, tPath, type, false);
                 final IndexOperationRequest.Operation o = IndexOperationRequest.Operation.forValue(getFromRecordPath(record, ioPath, indexOp, false));
-                final String id  = getFromRecordPath(record, path, null, retainId);
+                final String id = getFromRecordPath(record, path, null, retainId);
                 final Object timestamp = getTimestampFromRecordPath(record, atPath, atTimestamp, retainTimestamp);
 
                 @SuppressWarnings("unchecked")
                 final Map<String, Object> contentMap = (Map<String, Object>) DataTypeUtils
                         .convertRecordFieldtoObject(record, RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
+                formatDateTimeFields(contentMap, record);
                 contentMap.putIfAbsent("@timestamp", timestamp);
 
                 operationList.add(new IndexOperationRequest(idx, t, id, contentMap, o));
@@ -493,6 +491,26 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
         return null;
     }
 
+    private void formatDateTimeFields(final Map<String, Object> contentMap, final Record record) {
+        for (final RecordField recordField : record.getSchema().getFields()) {
+            final Object value = contentMap.get(recordField.getFieldName());
+            if (value != null) {
+                final DataType chosenDataType = recordField.getDataType().getFieldType() == RecordFieldType.CHOICE
+                        ? DataTypeUtils.chooseDataType(record.getValue(recordField), (ChoiceDataType) recordField.getDataType())
+                        : recordField.getDataType();
+
+                final String format = determineDateFormat(chosenDataType.getFieldType());
+                if (format != null) {
+                    final Object formattedValue = coerceStringToLong(
+                            recordField.getFieldName(),
+                            DataTypeUtils.toString(value, () -> DataTypeUtils.getDateFormat(format))
+                    );
+                    contentMap.put(recordField.getFieldName(), formattedValue);
+                }
+            }
+        }
+    }
+
     private String getFromRecordPath(final Record record, final RecordPath path, final String fallback,
                                      final boolean retain) {
         if (path == null) {
@@ -591,8 +609,11 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
             case TIME:
                 format = this.timeFormat;
                 break;
-            default:
+            case TIMESTAMP:
                 format = this.timestampFormat;
+                break;
+            default:
+                format = null;
         }
         return format;
     }
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
index 85a9f2b..2bbd51c 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
@@ -334,7 +334,7 @@ class PutElasticsearchRecordTest {
         runner.setProperty(PutElasticsearchRecord.RETAIN_ID_FIELD, "true")
         runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP, "100")
         runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP_RECORD_PATH, "/date")
-        runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP_DATE_FORMAT, "dd/MM/yyyy")
+        runner.setProperty(PutElasticsearchRecord.DATE_FORMAT, "dd/MM/yyyy")
         runner.setProperty(PutElasticsearchRecord.RETAIN_AT_TIMESTAMP_FIELD, "true")
         runner.enqueue(flowFileContents, [
             "schema.name": "recordPathTest",
@@ -444,6 +444,132 @@ class PutElasticsearchRecordTest {
     }
 
     @Test
+    void testDateTimeFormatting() {
+        def newSchema = prettyPrint(toJson([
+                type: "record",
+                name: "DateTimeFormattingTestType",
+                fields: [
+                        [ name: "msg", type: ["null", "string"] ],
+                        [ name: "ts", type: ["null", [ type: "long", logicalType: "timestamp-millis" ]] ],
+                        [ name: "date", type: ["null", [ type: "int", logicalType: "date" ]] ],
+                        [ name: "time", type: ["null", [ type: "int", logicalType: "time-millis" ]] ],
+                        [ name: "choice_ts", type: ["null", [ type: "long", logicalType: "timestamp-millis" ], "string"] ]
+                ]
+        ]))
+
+        def flowFileContents = prettyPrint(toJson([
+                [ msg: "1", ts: Timestamp.valueOf(LOCAL_DATE_TIME).toInstant().toEpochMilli() ],
+                [ msg: "2", date: Date.valueOf(LOCAL_DATE).getTime() ],
+                [ msg: "3", time: Time.valueOf(LOCAL_TIME).getTime() ],
+                [ msg: "4", choice_ts: Timestamp.valueOf(LOCAL_DATE_TIME).toInstant().toEpochMilli() ],
+                [ msg: "5",
+                  ts: Timestamp.valueOf(LOCAL_DATE_TIME).toInstant().toEpochMilli(),
+                  time: Time.valueOf(LOCAL_TIME).getTime(),
+                  date: Date.valueOf(LOCAL_DATE).getTime(),
+                  choice_ts: "not-timestamp"
+                ]
+        ]))
+
+        def evalClosure = { List<IndexOperationRequest> items ->
+            int msg = items.findAll { (it.fields.get("msg") != null) }.size()
+            int timestamp = items.findAll { it.fields.get("ts") ==
+                    LOCAL_DATE_TIME.format(DateTimeFormatter.ofPattern(RecordFieldType.TIMESTAMP.getDefaultFormat())) // "yyyy-MM-dd HH:mm:ss"
+            }.size()
+            int date = items.findAll { it.fields.get("date") ==
+                    LOCAL_DATE.format(DateTimeFormatter.ofPattern(RecordFieldType.DATE.getDefaultFormat())) // "yyyy-MM-dd"
+            }.size()
+            int time = items.findAll { it.fields.get("time") ==
+                    LOCAL_TIME.format(DateTimeFormatter.ofPattern(RecordFieldType.TIME.getDefaultFormat())) // "HH:mm:ss"
+            }.size()
+            int choiceTs = items.findAll { it.fields.get("choice_ts") ==
+                    LOCAL_DATE_TIME.format(DateTimeFormatter.ofPattern(RecordFieldType.TIMESTAMP.getDefaultFormat()))
+            }.size()
+            int choiceNotTs = items.findAll { it.fields.get("choice_ts") == "not-timestamp" }.size()
+            int atTimestampDefault = items.findAll { it.fields.get("@timestamp") == "test_timestamp" }.size()
+            int tsNull = items.findAll { it.fields.get("ts") == null }.size()
+            int dateNull = items.findAll { it.fields.get("date") == null }.size()
+            int timeNull = items.findAll { it.fields.get("time") == null }.size()
+            int choiceTsNull = items.findAll { it.fields.get("choice_ts") == null }.size()
+            Assert.assertEquals(5, msg)
+            Assert.assertEquals(2, timestamp)
+            Assert.assertEquals(2, date)
+            Assert.assertEquals(2, time)
+            Assert.assertEquals(1, choiceTs)
+            Assert.assertEquals(1, choiceNotTs)
+            Assert.assertEquals(3, tsNull)
+            Assert.assertEquals(3, dateNull)
+            Assert.assertEquals(3, timeNull)
+            Assert.assertEquals(3, choiceTsNull)
+            Assert.assertEquals(5, atTimestampDefault)
+        }
+
+        clientService.evalClosure = evalClosure
+
+        registry.addSchema("dateTimeFormattingTest", AvroTypeUtil.createSchema(new Schema.Parser().parse(newSchema)))
+
+        runner.enqueue(flowFileContents, [
+                "schema.name": "dateTimeFormattingTest"
+        ])
+
+        runner.run()
+        runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
+
+        runner.clearTransferState()
+
+        evalClosure = { List<IndexOperationRequest> items ->
+            String timestampOutput = LOCAL_DATE_TIME.format(DateTimeFormatter.ofPattern("yy MMM D H"))
+            int msg = items.findAll { (it.fields.get("msg") != null) }.size()
+            int timestamp = items.findAll { it.fields.get("ts") == timestampOutput }.size()
+            int date = items.findAll { it.fields.get("date") ==
+                    LOCAL_DATE.format(DateTimeFormatter.ofPattern("dd/MM/yyyy"))
+            }.size()
+            int time = items.findAll { it.fields.get("time") ==
+                    // converted to a Long because the output is completely numerical
+                    Long.parseLong(LOCAL_TIME.format(DateTimeFormatter.ofPattern("HHmmss")))
+            }.size()
+            int choiceTs = items.findAll { it.fields.get("choice_ts") == timestampOutput }.size()
+            int choiceNotTs = items.findAll { it.fields.get("choice_ts") == "not-timestamp" }.size()
+            int atTimestampDefault = items.findAll { it.fields.get("@timestamp") == "test_timestamp" }.size()
+            int atTimestamp = items.findAll { it.fields.get("@timestamp") == timestampOutput }.size()
+            int tsNull = items.findAll { it.fields.get("ts") == null }.size()
+            int dateNull = items.findAll { it.fields.get("date") == null }.size()
+            int timeNull = items.findAll { it.fields.get("time") == null }.size()
+            int choiceTsNull = items.findAll { it.fields.get("choice_ts") == null }.size()
+            Assert.assertEquals(5, msg)
+            Assert.assertEquals(2, timestamp)
+            Assert.assertEquals(2, date)
+            Assert.assertEquals(2, time)
+            Assert.assertEquals(1, choiceTs)
+            Assert.assertEquals(1, choiceNotTs)
+            Assert.assertEquals(3, tsNull)
+            Assert.assertEquals(3, dateNull)
+            Assert.assertEquals(3, timeNull)
+            Assert.assertEquals(3, choiceTsNull)
+            Assert.assertEquals(2, atTimestamp)
+            Assert.assertEquals(3, atTimestampDefault)
+        }
+
+        clientService.evalClosure = evalClosure
+
+        runner.setProperty(PutElasticsearchRecord.TIMESTAMP_FORMAT, "yy MMM D H")
+        runner.setProperty(PutElasticsearchRecord.DATE_FORMAT, "dd/MM/yyyy")
+        runner.setProperty(PutElasticsearchRecord.TIME_FORMAT, "HHmmss")
+        runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP_RECORD_PATH, "/ts")
+        runner.setProperty(PutElasticsearchRecord.RETAIN_AT_TIMESTAMP_FIELD, "true")
+
+        runner.enqueue(flowFileContents, [
+                "schema.name": "dateTimeFormattingTest"
+        ])
+
+        runner.run()
+        runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
+    }
+
+    @Test
     void testInvalidIndexOperation() {
         runner.setProperty(PutElasticsearchRecord.INDEX_OP, "not-valid")
         runner.assertNotValid()