You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2021/12/13 18:43:26 UTC
[nifi] 22/22: NIFI-8153 custom date/time format properties for PutElasticsearchRecord
This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch support/nifi-1.15
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 5e9c09c4c95213ebeaaeff078d02d09fcc8e158a
Author: Chris Sampson <ch...@gmail.com>
AuthorDate: Fri Nov 26 12:31:49 2021 +0000
NIFI-8153 custom date/time format properties for PutElasticsearchRecord
---
.../elasticsearch/PutElasticsearchRecord.java | 61 ++++++----
.../PutElasticsearchRecordTest.groovy | 128 ++++++++++++++++++++-
2 files changed, 168 insertions(+), 21 deletions(-)
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
index ec3c3f2..900870e 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
@@ -55,6 +55,7 @@ import org.apache.nifi.serialization.SimpleDateFormatValidator;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.PushBackRecordSet;
import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
@@ -202,36 +203,34 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
.required(false)
.build();
- static final PropertyDescriptor AT_TIMESTAMP_DATE_FORMAT = new PropertyDescriptor.Builder()
+ static final PropertyDescriptor DATE_FORMAT = new PropertyDescriptor.Builder()
.name("put-es-record-at-timestamp-date-format")
- .displayName("@Timestamp Record Path Date Format")
- .description("Specifies the format to use when writing Date field for @timestamp. "
+ .displayName("Date Format")
+ .description("Specifies the format to use when writing Date fields. "
+ "If not specified, the default format '" + RecordFieldType.DATE.getDefaultFormat() + "' is used. "
+ "If specified, the value must match the Java Simple Date Format (for example, MM/dd/yyyy for a two-digit month, followed by "
+ "a two-digit day, followed by a four-digit year, all separated by '/' characters, as in 01/25/2017).")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(new SimpleDateFormatValidator())
.required(false)
- .dependsOn(AT_TIMESTAMP_RECORD_PATH)
.build();
- static final PropertyDescriptor AT_TIMESTAMP_TIME_FORMAT = new PropertyDescriptor.Builder()
+ static final PropertyDescriptor TIME_FORMAT = new PropertyDescriptor.Builder()
.name("put-es-record-at-timestamp-time-format")
- .displayName("@Timestamp Record Path Time Format")
- .description("Specifies the format to use when writing Time field for @timestamp. "
+ .displayName("Time Format")
+ .description("Specifies the format to use when writing Time fields. "
+ "If not specified, the default format '" + RecordFieldType.TIME.getDefaultFormat() + "' is used. "
+ "If specified, the value must match the Java Simple Date Format (for example, HH:mm:ss for a two-digit hour in 24-hour format, followed by "
+ "a two-digit minute, followed by a two-digit second, all separated by ':' characters, as in 18:04:15).")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(new SimpleDateFormatValidator())
.required(false)
- .dependsOn(AT_TIMESTAMP_RECORD_PATH)
.build();
- static final PropertyDescriptor AT_TIMESTAMP_TIMESTAMP_FORMAT = new PropertyDescriptor.Builder()
+ static final PropertyDescriptor TIMESTAMP_FORMAT = new PropertyDescriptor.Builder()
.name("put-es-record-at-timestamp-timestamp-format")
- .displayName("@Timestamp Record Path Timestamp Format")
- .description("Specifies the format to use when writing Timestamp field for @timestamp. "
+ .displayName("Timestamp Format")
+ .description("Specifies the format to use when writing Timestamp fields. "
+ "If not specified, the default format '" + RecordFieldType.TIMESTAMP.getDefaultFormat() + "' is used. "
+ "If specified, the value must match the Java Simple Date Format (for example, MM/dd/yyyy HH:mm:ss for a two-digit month, followed by "
+ "a two-digit day, followed by a four-digit year, all separated by '/' characters; and then followed by a two-digit hour in 24-hour format, followed by "
@@ -239,14 +238,12 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(new SimpleDateFormatValidator())
.required(false)
- .dependsOn(AT_TIMESTAMP_RECORD_PATH)
.build();
static final List<PropertyDescriptor> DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
INDEX_OP, INDEX, TYPE, AT_TIMESTAMP, CLIENT_SERVICE, RECORD_READER, BATCH_SIZE, ID_RECORD_PATH, RETAIN_ID_FIELD,
INDEX_OP_RECORD_PATH, INDEX_RECORD_PATH, TYPE_RECORD_PATH, AT_TIMESTAMP_RECORD_PATH, RETAIN_AT_TIMESTAMP_FIELD,
- AT_TIMESTAMP_DATE_FORMAT, AT_TIMESTAMP_TIME_FORMAT, AT_TIMESTAMP_TIMESTAMP_FORMAT, LOG_ERROR_RESPONSES,
- ERROR_RECORD_WRITER
+ DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT, LOG_ERROR_RESPONSES, ERROR_RECORD_WRITER
));
static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
REL_SUCCESS, REL_FAILURE, REL_RETRY, REL_FAILED_RECORDS
@@ -300,15 +297,15 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
this.writerFactory = context.getProperty(ERROR_RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
this.logErrors = context.getProperty(LOG_ERROR_RESPONSES).asBoolean();
- this.dateFormat = context.getProperty(AT_TIMESTAMP_DATE_FORMAT).evaluateAttributeExpressions().getValue();
+ this.dateFormat = context.getProperty(DATE_FORMAT).evaluateAttributeExpressions().getValue();
if (this.dateFormat == null) {
this.dateFormat = RecordFieldType.DATE.getDefaultFormat();
}
- this.timeFormat = context.getProperty(AT_TIMESTAMP_TIME_FORMAT).evaluateAttributeExpressions().getValue();
+ this.timeFormat = context.getProperty(TIME_FORMAT).evaluateAttributeExpressions().getValue();
if (this.timeFormat == null) {
this.timeFormat = RecordFieldType.TIME.getDefaultFormat();
}
- this.timestampFormat = context.getProperty(AT_TIMESTAMP_TIMESTAMP_FORMAT).evaluateAttributeExpressions().getValue();
+ this.timestampFormat = context.getProperty(TIMESTAMP_FORMAT).evaluateAttributeExpressions().getValue();
if (this.timestampFormat == null) {
this.timestampFormat = RecordFieldType.TIMESTAMP.getDefaultFormat();
}
@@ -383,14 +380,15 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
Record record;
while ((record = recordSet.next()) != null) {
final String idx = getFromRecordPath(record, iPath, index, false);
- final String t = getFromRecordPath(record, tPath, type, false);
+ final String t = getFromRecordPath(record, tPath, type, false);
final IndexOperationRequest.Operation o = IndexOperationRequest.Operation.forValue(getFromRecordPath(record, ioPath, indexOp, false));
- final String id = getFromRecordPath(record, path, null, retainId);
+ final String id = getFromRecordPath(record, path, null, retainId);
final Object timestamp = getTimestampFromRecordPath(record, atPath, atTimestamp, retainTimestamp);
@SuppressWarnings("unchecked")
final Map<String, Object> contentMap = (Map<String, Object>) DataTypeUtils
.convertRecordFieldtoObject(record, RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
+ formatDateTimeFields(contentMap, record);
contentMap.putIfAbsent("@timestamp", timestamp);
operationList.add(new IndexOperationRequest(idx, t, id, contentMap, o));
@@ -493,6 +491,26 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
return null;
}
+ private void formatDateTimeFields(final Map<String, Object> contentMap, final Record record) {
+ for (final RecordField recordField : record.getSchema().getFields()) {
+ final Object value = contentMap.get(recordField.getFieldName());
+ if (value != null) {
+ final DataType chosenDataType = recordField.getDataType().getFieldType() == RecordFieldType.CHOICE
+ ? DataTypeUtils.chooseDataType(record.getValue(recordField), (ChoiceDataType) recordField.getDataType())
+ : recordField.getDataType();
+
+ final String format = determineDateFormat(chosenDataType.getFieldType());
+ if (format != null) {
+ final Object formattedValue = coerceStringToLong(
+ recordField.getFieldName(),
+ DataTypeUtils.toString(value, () -> DataTypeUtils.getDateFormat(format))
+ );
+ contentMap.put(recordField.getFieldName(), formattedValue);
+ }
+ }
+ }
+ }
+
private String getFromRecordPath(final Record record, final RecordPath path, final String fallback,
final boolean retain) {
if (path == null) {
@@ -591,8 +609,11 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
case TIME:
format = this.timeFormat;
break;
- default:
+ case TIMESTAMP:
format = this.timestampFormat;
+ break;
+ default:
+ format = null;
}
return format;
}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
index 85a9f2b..2bbd51c 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
@@ -334,7 +334,7 @@ class PutElasticsearchRecordTest {
runner.setProperty(PutElasticsearchRecord.RETAIN_ID_FIELD, "true")
runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP, "100")
runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP_RECORD_PATH, "/date")
- runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP_DATE_FORMAT, "dd/MM/yyyy")
+ runner.setProperty(PutElasticsearchRecord.DATE_FORMAT, "dd/MM/yyyy")
runner.setProperty(PutElasticsearchRecord.RETAIN_AT_TIMESTAMP_FIELD, "true")
runner.enqueue(flowFileContents, [
"schema.name": "recordPathTest",
@@ -444,6 +444,132 @@ class PutElasticsearchRecordTest {
}
@Test
+ void testDateTimeFormatting() {
+ def newSchema = prettyPrint(toJson([
+ type: "record",
+ name: "DateTimeFormattingTestType",
+ fields: [
+ [ name: "msg", type: ["null", "string"] ],
+ [ name: "ts", type: ["null", [ type: "long", logicalType: "timestamp-millis" ]] ],
+ [ name: "date", type: ["null", [ type: "int", logicalType: "date" ]] ],
+ [ name: "time", type: ["null", [ type: "int", logicalType: "time-millis" ]] ],
+ [ name: "choice_ts", type: ["null", [ type: "long", logicalType: "timestamp-millis" ], "string"] ]
+ ]
+ ]))
+
+ def flowFileContents = prettyPrint(toJson([
+ [ msg: "1", ts: Timestamp.valueOf(LOCAL_DATE_TIME).toInstant().toEpochMilli() ],
+ [ msg: "2", date: Date.valueOf(LOCAL_DATE).getTime() ],
+ [ msg: "3", time: Time.valueOf(LOCAL_TIME).getTime() ],
+ [ msg: "4", choice_ts: Timestamp.valueOf(LOCAL_DATE_TIME).toInstant().toEpochMilli() ],
+ [ msg: "5",
+ ts: Timestamp.valueOf(LOCAL_DATE_TIME).toInstant().toEpochMilli(),
+ time: Time.valueOf(LOCAL_TIME).getTime(),
+ date: Date.valueOf(LOCAL_DATE).getTime(),
+ choice_ts: "not-timestamp"
+ ]
+ ]))
+
+ def evalClosure = { List<IndexOperationRequest> items ->
+ int msg = items.findAll { (it.fields.get("msg") != null) }.size()
+ int timestamp = items.findAll { it.fields.get("ts") ==
+ LOCAL_DATE_TIME.format(DateTimeFormatter.ofPattern(RecordFieldType.TIMESTAMP.getDefaultFormat())) // "yyyy-MM-dd HH:mm:ss"
+ }.size()
+ int date = items.findAll { it.fields.get("date") ==
+ LOCAL_DATE.format(DateTimeFormatter.ofPattern(RecordFieldType.DATE.getDefaultFormat())) // "yyyy-MM-dd"
+ }.size()
+ int time = items.findAll { it.fields.get("time") ==
+ LOCAL_TIME.format(DateTimeFormatter.ofPattern(RecordFieldType.TIME.getDefaultFormat())) // "HH:mm:ss"
+ }.size()
+ int choiceTs = items.findAll { it.fields.get("choice_ts") ==
+ LOCAL_DATE_TIME.format(DateTimeFormatter.ofPattern(RecordFieldType.TIMESTAMP.getDefaultFormat()))
+ }.size()
+ int choiceNotTs = items.findAll { it.fields.get("choice_ts") == "not-timestamp" }.size()
+ int atTimestampDefault = items.findAll { it.fields.get("@timestamp") == "test_timestamp" }.size()
+ int tsNull = items.findAll { it.fields.get("ts") == null }.size()
+ int dateNull = items.findAll { it.fields.get("date") == null }.size()
+ int timeNull = items.findAll { it.fields.get("time") == null }.size()
+ int choiceTsNull = items.findAll { it.fields.get("choice_ts") == null }.size()
+ Assert.assertEquals(5, msg)
+ Assert.assertEquals(2, timestamp)
+ Assert.assertEquals(2, date)
+ Assert.assertEquals(2, time)
+ Assert.assertEquals(1, choiceTs)
+ Assert.assertEquals(1, choiceNotTs)
+ Assert.assertEquals(3, tsNull)
+ Assert.assertEquals(3, dateNull)
+ Assert.assertEquals(3, timeNull)
+ Assert.assertEquals(3, choiceTsNull)
+ Assert.assertEquals(5, atTimestampDefault)
+ }
+
+ clientService.evalClosure = evalClosure
+
+ registry.addSchema("dateTimeFormattingTest", AvroTypeUtil.createSchema(new Schema.Parser().parse(newSchema)))
+
+ runner.enqueue(flowFileContents, [
+ "schema.name": "dateTimeFormattingTest"
+ ])
+
+ runner.run()
+ runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
+
+ runner.clearTransferState()
+
+ evalClosure = { List<IndexOperationRequest> items ->
+ String timestampOutput = LOCAL_DATE_TIME.format(DateTimeFormatter.ofPattern("yy MMM D H"))
+ int msg = items.findAll { (it.fields.get("msg") != null) }.size()
+ int timestamp = items.findAll { it.fields.get("ts") == timestampOutput }.size()
+ int date = items.findAll { it.fields.get("date") ==
+ LOCAL_DATE.format(DateTimeFormatter.ofPattern("dd/MM/yyyy"))
+ }.size()
+ int time = items.findAll { it.fields.get("time") ==
+ // converted to a Long because the output is completely numerical
+ Long.parseLong(LOCAL_TIME.format(DateTimeFormatter.ofPattern("HHmmss")))
+ }.size()
+ int choiceTs = items.findAll { it.fields.get("choice_ts") == timestampOutput }.size()
+ int choiceNotTs = items.findAll { it.fields.get("choice_ts") == "not-timestamp" }.size()
+ int atTimestampDefault = items.findAll { it.fields.get("@timestamp") == "test_timestamp" }.size()
+ int atTimestamp = items.findAll { it.fields.get("@timestamp") == timestampOutput }.size()
+ int tsNull = items.findAll { it.fields.get("ts") == null }.size()
+ int dateNull = items.findAll { it.fields.get("date") == null }.size()
+ int timeNull = items.findAll { it.fields.get("time") == null }.size()
+ int choiceTsNull = items.findAll { it.fields.get("choice_ts") == null }.size()
+ Assert.assertEquals(5, msg)
+ Assert.assertEquals(2, timestamp)
+ Assert.assertEquals(2, date)
+ Assert.assertEquals(2, time)
+ Assert.assertEquals(1, choiceTs)
+ Assert.assertEquals(1, choiceNotTs)
+ Assert.assertEquals(3, tsNull)
+ Assert.assertEquals(3, dateNull)
+ Assert.assertEquals(3, timeNull)
+ Assert.assertEquals(3, choiceTsNull)
+ Assert.assertEquals(2, atTimestamp)
+ Assert.assertEquals(3, atTimestampDefault)
+ }
+
+ clientService.evalClosure = evalClosure
+
+ runner.setProperty(PutElasticsearchRecord.TIMESTAMP_FORMAT, "yy MMM D H")
+ runner.setProperty(PutElasticsearchRecord.DATE_FORMAT, "dd/MM/yyyy")
+ runner.setProperty(PutElasticsearchRecord.TIME_FORMAT, "HHmmss")
+ runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP_RECORD_PATH, "/ts")
+ runner.setProperty(PutElasticsearchRecord.RETAIN_AT_TIMESTAMP_FIELD, "true")
+
+ runner.enqueue(flowFileContents, [
+ "schema.name": "dateTimeFormattingTest"
+ ])
+
+ runner.run()
+ runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
+ }
+
+ @Test
void testInvalidIndexOperation() {
runner.setProperty(PutElasticsearchRecord.INDEX_OP, "not-valid")
runner.assertNotValid()