You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jg...@apache.org on 2021/10/07 09:58:29 UTC
[nifi] branch main updated: NIFI-7990: Add properties to map Record
field as @timestamp in output to Elasticsearch for PutElasticsearchRecord
and PutElasticsearchHttpRecord processors;
NIFI-7474 allow mapped id field to be retained within the Record for
PutElasticsearchRecord
This is an automated email from the ASF dual-hosted git repository.
jgresock pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 3892e50 NIFI-7990: Add properties to map Record field as @timestamp in output to Elasticsearch for PutElasticsearchRecord and PutElasticsearchHttpRecord processors; NIFI-7474 allow mapped id field to be retained within the Record for PutElasticsearchRecord
3892e50 is described below
commit 3892e50991ff803b762c26aec6067aecacda548e
Author: Chris Sampson <ch...@gmail.com>
AuthorDate: Thu Sep 2 11:57:57 2021 +0100
NIFI-7990: Add properties to map Record field as @timestamp in output to Elasticsearch for PutElasticsearchRecord and PutElasticsearchHttpRecord processors; NIFI-7474 allow mapped id field to be retained within the Record for PutElasticsearchRecord
Signed-off-by: Joe Gresock <jg...@gmail.com>
This closes #4691.
---
.../nifi/elasticsearch/IndexOperationRequest.java | 8 +-
.../elasticsearch/PutElasticsearchHttpRecord.java | 200 +++++++++-----
.../TestPutElasticsearchHttpRecord.java | 158 ++++++++++-
.../nifi-elasticsearch-restapi-processors/pom.xml | 12 +-
.../elasticsearch/PutElasticsearchRecord.java | 303 +++++++++++++++++----
.../PutElasticsearchRecordTest.groovy | 140 +++++++++-
6 files changed, 665 insertions(+), 156 deletions(-)
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java
index 7de8807..be79416 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java
@@ -65,9 +65,9 @@ public class IndexOperationRequest {
Index("index"),
Update("update"),
Upsert("upsert");
- String value;
+ private final String value;
- Operation(String value) {
+ Operation(final String value) {
this.value = value;
}
@@ -80,6 +80,10 @@ public class IndexOperationRequest {
.filter(o -> o.getValue().equalsIgnoreCase(value)).findFirst()
.orElseThrow(() -> new IllegalArgumentException(String.format("Unknown Index Operation %s", value)));
}
+
+ public static String[] allValues() {
+ return Arrays.stream(Operation.values()).map(Operation::getValue).sorted().toArray(String[]::new);
+ }
}
@Override
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
index b9f2e27..6333be3 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
@@ -66,7 +66,6 @@ import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.apache.nifi.serialization.record.type.MapDataType;
-import org.apache.nifi.serialization.record.type.RecordDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.util.StringUtils;
@@ -78,13 +77,13 @@ import java.io.OutputStream;
import java.math.BigInteger;
import java.net.URL;
import java.nio.charset.Charset;
-import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
@@ -153,7 +152,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
static final PropertyDescriptor ID_RECORD_PATH = new PropertyDescriptor.Builder()
.name("put-es-record-id-path")
.displayName("Identifier Record Path")
- .description("A RecordPath pointing to a field in the record(s) that contains the identifier for the document. If the Index Operation is \"index\", "
+ .description("A RecordPath pointing to a field in the record(s) that contains the identifier for the document. If the Index Operation is \"index\" or \"create\", "
+ "this property may be left empty or evaluate to an empty value, in which case the document's identifier will be "
+ "auto-generated by Elasticsearch. For all other Index Operations, the field's value must be non-empty.")
.required(false)
@@ -209,12 +208,31 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
.required(true)
.build();
+ static final PropertyDescriptor AT_TIMESTAMP = new PropertyDescriptor.Builder()
+ .name("put-es-record-at-timestamp")
+ .displayName("@timestamp Value")
+ .description("The value to use as the @timestamp field (required for Elasticsearch Data Streams)")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor AT_TIMESTAMP_RECORD_PATH = new PropertyDescriptor.Builder()
+ .name("put-es-record-at-timestamp-path")
+ .displayName("@timestamp Record Path")
+ .description("A RecordPath pointing to a field in the record(s) that contains the @timestamp for the document. " +
+ "If left blank the @timestamp will be determined using the main @timestamp property")
+ .required(false)
+ .addValidator(new RecordPathValidator())
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
static final PropertyDescriptor DATE_FORMAT = new PropertyDescriptor.Builder()
.name("Date Format")
.description("Specifies the format to use when reading/writing Date fields. "
+ "If not specified, the default format '" + RecordFieldType.DATE.getDefaultFormat() + "' is used. "
+ "If specified, the value must match the Java Simple Date Format (for example, MM/dd/yyyy for a two-digit month, followed by "
- + "a two-digit day, followed by a four-digit year, all separated by '/' characters, as in 01/01/2017).")
+ + "a two-digit day, followed by a four-digit year, all separated by '/' characters, as in 01/25/2017).")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(new SimpleDateFormatValidator())
.required(false)
@@ -235,7 +253,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
+ "If not specified, the default format '" + RecordFieldType.TIMESTAMP.getDefaultFormat() + "' is used. "
+ "If specified, the value must match the Java Simple Date Format (for example, MM/dd/yyyy HH:mm:ss for a two-digit month, followed by "
+ "a two-digit day, followed by a four-digit year, all separated by '/' characters; and then followed by a two-digit hour in 24-hour format, followed by "
- + "a two-digit minute, followed by a two-digit second, all separated by ':' characters, as in 01/01/2017 18:04:15).")
+ + "a two-digit minute, followed by a two-digit second, all separated by ':' characters, as in 01/25/2017 18:04:15).")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(new SimpleDateFormatValidator())
.required(false)
@@ -266,6 +284,8 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
descriptors.add(RECORD_WRITER);
descriptors.add(LOG_ALL_ERRORS);
descriptors.add(ID_RECORD_PATH);
+ descriptors.add(AT_TIMESTAMP_RECORD_PATH);
+ descriptors.add(AT_TIMESTAMP);
descriptors.add(INDEX);
descriptors.add(TYPE);
descriptors.add(INDEX_OP);
@@ -292,7 +312,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext));
// Since Expression Language is allowed for index operation, we can't guarantee that we can catch
// all invalid configurations, but we should catch them as soon as we can. For example, if the
- // Identifier Record Path property is empty, the Index Operation must evaluate to "index".
+ // Identifier Record Path property is empty, the Index Operation must evaluate to "index" or "create".
String idPath = validationContext.getProperty(ID_RECORD_PATH).getValue();
String indexOp = validationContext.getProperty(INDEX_OP).getValue();
@@ -363,7 +383,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
if (StringUtils.isEmpty(baseUrl)) {
throw new ProcessException("Elasticsearch URL is empty or null, this indicates an invalid Expression (missing variables, e.g.)");
}
- HttpUrl.Builder urlBuilder = HttpUrl.parse(baseUrl).newBuilder().addPathSegment("_bulk");
+ HttpUrl.Builder urlBuilder = Objects.requireNonNull(HttpUrl.parse(baseUrl)).newBuilder().addPathSegment("_bulk");
// Find the user-added properties and set them as query parameters on the URL
for (Map.Entry<PropertyDescriptor, String> property : context.getProperties().entrySet()) {
@@ -378,14 +398,14 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
final String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
if (StringUtils.isEmpty(index)) {
- logger.error("No value for index in for {}, transferring to failure", new Object[]{flowFile});
+ logger.error("No value for index in for {}, transferring to failure", flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
}
final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue();
String indexOp = context.getProperty(INDEX_OP).evaluateAttributeExpressions(flowFile).getValue();
if (StringUtils.isEmpty(indexOp)) {
- logger.error("No Index operation specified for {}, transferring to failure.", new Object[]{flowFile});
+ logger.error("No Index operation specified for {}, transferring to failure.", flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
}
@@ -398,18 +418,22 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
case "delete":
break;
default:
- logger.error("Index operation {} not supported for {}, transferring to failure.", new Object[]{indexOp, flowFile});
+ logger.error("Index operation {} not supported for {}, transferring to failure.", indexOp, flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
}
this.nullSuppression = context.getProperty(SUPPRESS_NULLS).getValue();
- final String id_path = context.getProperty(ID_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue();
- final RecordPath recordPath = StringUtils.isEmpty(id_path) ? null : recordPathCache.getCompiled(id_path);
+ final String idPath = context.getProperty(ID_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue();
+ final RecordPath recordPath = StringUtils.isEmpty(idPath) ? null : recordPathCache.getCompiled(idPath);
final StringBuilder sb = new StringBuilder();
final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
+ final String atTimestamp = context.getProperty(AT_TIMESTAMP).evaluateAttributeExpressions(flowFile).getValue();
+ final String atTimestampPath = context.getProperty(AT_TIMESTAMP_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue();
+ final RecordPath atPath = StringUtils.isEmpty(atTimestampPath) ? null : recordPathCache.getCompiled(atTimestampPath);
+
int recordCount = 0;
try (final InputStream in = session.read(flowFile);
final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
@@ -428,6 +452,14 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
id = null;
}
+ final Object timestamp;
+ if (atPath != null) {
+ final Optional<FieldValue> atPathValue = atPath.evaluate(record).getSelectedFields().findFirst();
+ timestamp = !atPathValue.isPresent() || atPathValue.get().getValue() == null ? atTimestamp : atPathValue.get();
+ } else {
+ timestamp = atTimestamp;
+ }
+
// The ID must be valid for all operations except "index" or "create". For that case,
// a missing ID indicates one is to be auto-generated by Elasticsearch
if (id == null && !(indexOp.equalsIgnoreCase("index") || indexOp.equalsIgnoreCase("create"))) {
@@ -437,8 +469,8 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
final StringBuilder json = new StringBuilder();
ByteArrayOutputStream out = new ByteArrayOutputStream();
- JsonGenerator generator = factory.createJsonGenerator(out);
- writeRecord(record, record.getSchema(), generator);
+ JsonGenerator generator = factory.createGenerator(out);
+ writeRecord(record, generator, timestamp);
generator.flush();
generator.close();
json.append(out.toString(charset.name()));
@@ -447,7 +479,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
recordCount++;
}
} catch (IdentifierNotFoundException infe) {
- logger.error(infe.getMessage(), new Object[]{flowFile});
+ logger.error(infe.getMessage(), flowFile);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
@@ -459,7 +491,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
return;
}
- RequestBody requestBody = RequestBody.create(MediaType.parse("application/json"), sb.toString());
+ RequestBody requestBody = RequestBody.create(sb.toString(), MediaType.parse("application/json"));
final Response getResponse;
try {
getResponse = sendRequestToElasticsearch(okHttpClient, url, username, password, "PUT", requestBody);
@@ -474,49 +506,50 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
final Set<Integer> failures = new HashSet<>();
if (isSuccess(statusCode)) {
- try (ResponseBody responseBody = getResponse.body()) {
- final byte[] bodyBytes = responseBody.bytes();
-
- JsonNode responseJson = parseJsonResponse(new ByteArrayInputStream(bodyBytes));
- boolean errors = responseJson.get("errors").asBoolean(false);
- // ES has no rollback, so if errors occur, log them and route the whole flow file to failure
- if (errors) {
- ArrayNode itemNodeArray = (ArrayNode) responseJson.get("items");
- if(itemNodeArray != null) {
- if (itemNodeArray.size() > 0) {
- // All items are returned whether they succeeded or failed, so iterate through the item array
- // at the same time as the flow file list, moving each to success or failure accordingly,
- // but only keep the first error for logging
- String errorReason = null;
- for (int i = itemNodeArray.size() - 1; i >= 0; i--) {
- JsonNode itemNode = itemNodeArray.get(i);
- int status = itemNode.findPath("status").asInt();
- if (!isSuccess(status)) {
- if (errorReason == null || logAllErrors) {
- // Use "result" if it is present; this happens for status codes like 404 Not Found, which may not have an error/reason
- String reason = itemNode.findPath("result").asText();
- if (StringUtils.isEmpty(reason)) {
- // If there was no result, we expect an error with a string description in the "reason" field
- reason = itemNode.findPath("reason").asText();
+ try (final ResponseBody responseBody = getResponse.body()) {
+ if (responseBody != null) {
+ final byte[] bodyBytes = responseBody.bytes();
+
+ JsonNode responseJson = parseJsonResponse(new ByteArrayInputStream(bodyBytes));
+ boolean errors = responseJson.get("errors").asBoolean(false);
+ // ES has no rollback, so if errors occur, log them and route the whole flow file to failure
+ if (errors) {
+ ArrayNode itemNodeArray = (ArrayNode) responseJson.get("items");
+ if (itemNodeArray != null) {
+ if (itemNodeArray.size() > 0) {
+ // All items are returned whether they succeeded or failed, so iterate through the item array
+ // at the same time as the flow file list, moving each to success or failure accordingly,
+ // but only keep the first error for logging
+ String errorReason = null;
+ for (int i = itemNodeArray.size() - 1; i >= 0; i--) {
+ JsonNode itemNode = itemNodeArray.get(i);
+ int status = itemNode.findPath("status").asInt();
+ if (!isSuccess(status)) {
+ if (errorReason == null || logAllErrors) {
+ // Use "result" if it is present; this happens for status codes like 404 Not Found, which may not have an error/reason
+ String reason = itemNode.findPath("result").asText();
+ if (StringUtils.isEmpty(reason)) {
+ // If there was no result, we expect an error with a string description in the "reason" field
+ reason = itemNode.findPath("reason").asText();
+ }
+ errorReason = reason;
+
+ logger.error("Failed to process record {} in FlowFile {} due to {}, transferring to failure",
+ i, flowFile, errorReason);
}
- errorReason = reason;
-
- logger.error("Failed to process record {} in FlowFile {} due to {}, transferring to failure",
- new Object[]{i, flowFile, errorReason});
+ failures.add(i);
}
- failures.add(i);
}
}
}
+ } else {
+ // Everything succeeded, route FF and end
+ flowFile = session.putAttribute(flowFile, "record.count", Integer.toString(recordCount));
+ session.transfer(flowFile, REL_SUCCESS);
+ session.getProvenanceReporter().send(flowFile, url.toString());
+ return;
}
- } else {
- // Everything succeeded, route FF and end
- flowFile = session.putAttribute(flowFile, "record.count", Integer.toString(recordCount));
- session.transfer(flowFile, REL_SUCCESS);
- session.getProvenanceReporter().send(flowFile, url.toString());
- return;
}
-
} catch (IOException ioe) {
// Something went wrong when parsing the response, log the error and route to failure
logger.error("Error parsing Bulk API response: {}", new Object[]{ioe.getMessage()}, ioe);
@@ -529,12 +562,12 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
} else if (statusCode / 100 == 5) {
// 5xx -> RETRY, but a server error might last a while, so yield
logger.warn("Elasticsearch returned code {} with message {}, transferring flow file to retry. This is likely a server problem, yielding...",
- new Object[]{statusCode, getResponse.message()});
+ statusCode, getResponse.message());
session.transfer(flowFile, REL_RETRY);
context.yield();
return;
} else { // 1xx, 3xx, 4xx, etc. -> NO RETRY
- logger.warn("Elasticsearch returned code {} with message {}, transferring flow file to failure", new Object[]{statusCode, getResponse.message()});
+ logger.warn("Elasticsearch returned code {} with message {}, transferring flow file to failure", statusCode, getResponse.message());
session.transfer(flowFile, REL_FAILURE);
return;
}
@@ -549,17 +582,16 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
final RecordSetWriterFactory writerFactory = writerFactoryOptional.get();
// We know there are a mixture of successes and failures, create FFs for each and rename input FF to avoid confusion.
- final FlowFile inputFlowFile = flowFile;
- final FlowFile successFlowFile = session.create(inputFlowFile);
- final FlowFile failedFlowFile = session.create(inputFlowFile);
+ final FlowFile successFlowFile = session.create(flowFile);
+ final FlowFile failedFlowFile = session.create(flowFile);
// Set up the reader and writers
try (final OutputStream successOut = session.write(successFlowFile);
final OutputStream failedOut = session.write(failedFlowFile);
- final InputStream in = session.read(inputFlowFile);
- final RecordReader reader = readerFactory.createRecordReader(inputFlowFile, in, getLogger())) {
+ final InputStream in = session.read(flowFile);
+ final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
- final RecordSchema schema = writerFactory.getSchema(inputFlowFile.getAttributes(), reader.getSchema());
+ final RecordSchema schema = writerFactory.getSchema(flowFile.getAttributes(), reader.getSchema());
try (final RecordSetWriter successWriter = writerFactory.createWriter(getLogger(), schema, successOut, successFlowFile);
final RecordSetWriter failedWriter = writerFactory.createWriter(getLogger(), schema, failedOut, failedFlowFile)) {
@@ -581,8 +613,8 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
}
} catch (final IOException | SchemaNotFoundException | MalformedRecordException e) {
// We failed while handling individual failures. Not much else we can do other than log, and route the whole thing to failure.
- getLogger().error("Failed to process {} during individual record failure handling; route whole FF to failure", new Object[] {flowFile, e});
- session.transfer(inputFlowFile, REL_FAILURE);
+ getLogger().error("Failed to process {} during individual record failure handling; route whole FF to failure", flowFile, e);
+ session.transfer(flowFile, REL_FAILURE);
if (successFlowFile != null) {
session.remove(successFlowFile);
}
@@ -598,15 +630,34 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
session.putAttribute(failedFlowFile, "failure.count", Integer.toString(failures.size()));
session.transfer(successFlowFile, REL_SUCCESS);
session.transfer(failedFlowFile, REL_FAILURE);
- session.remove(inputFlowFile);
+ session.remove(flowFile);
}
}
- private void writeRecord(final Record record, final RecordSchema writeSchema, final JsonGenerator generator)
- throws IOException {
- RecordSchema schema = record.getSchema();
+ private void writeRecord(final Record record, final JsonGenerator generator, final Object atTimestamp) throws IOException {
+ final RecordSchema schema = record.getSchema();
generator.writeStartObject();
+
+ if (atTimestamp != null && !(atTimestamp instanceof String && StringUtils.isBlank((String) atTimestamp))) {
+ final DataType atDataType;
+ final Object atValue;
+ if (atTimestamp instanceof FieldValue) {
+ final FieldValue atField = (FieldValue) atTimestamp;
+ atDataType = atField.getField().getDataType();
+ atValue = atField.getValue();
+ } else {
+ atDataType = RecordFieldType.STRING.getDataType();
+ atValue = atTimestamp.toString();
+ }
+
+ final Object outputValue = RecordFieldType.STRING.getDataType().equals(atDataType) ? coerceTimestampStringToLong(atValue.toString()) : atValue;
+ final DataType outputDataType = outputValue.equals(atValue) ? atDataType : RecordFieldType.LONG.getDataType();
+
+ generator.writeFieldName("@timestamp");
+ writeValue(generator, outputValue, "@timestamp", outputDataType);
+ }
+
for (int i = 0; i < schema.getFieldCount(); i++) {
final RecordField field = schema.getField(i);
final String fieldName = field.getFieldName();
@@ -627,6 +678,12 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
generator.writeEndObject();
}
+ private Object coerceTimestampStringToLong(final String stringValue) {
+ return DataTypeUtils.isLongTypeCompatible(stringValue)
+ ? DataTypeUtils.toLong(stringValue, "@timestamp")
+ : stringValue;
+ }
+
@SuppressWarnings("unchecked")
private void writeValue(final JsonGenerator generator, final Object value, final String fieldName, final DataType dataType) throws IOException {
if (value == null) {
@@ -646,8 +703,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
switch (chosenDataType.getFieldType()) {
case DATE: {
- // Use SimpleDateFormat with system default time zone for string conversion
- final String stringValue = DataTypeUtils.toString(coercedValue, () -> new SimpleDateFormat(dateFormat));
+ final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(this.dateFormat));
if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
generator.writeNumber(DataTypeUtils.toLong(coercedValue, fieldName));
} else {
@@ -711,13 +767,9 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
generator.writeString(stringValue);
}
break;
- case RECORD: {
- final Record record = (Record) coercedValue;
- final RecordDataType recordDataType = (RecordDataType) chosenDataType;
- final RecordSchema childSchema = recordDataType.getChildSchema();
- writeRecord(record, childSchema, generator);
+ case RECORD:
+ writeRecord((Record) coercedValue, generator, null);
break;
- }
case MAP: {
final MapDataType mapDataType = (MapDataType) chosenDataType;
final DataType valueDataType = mapDataType.getValueType();
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
index 659de9a..38d7aee 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
@@ -49,6 +49,7 @@ import java.net.ConnectException;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
+import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
@@ -56,9 +57,11 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.function.Consumer;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
@@ -75,6 +78,7 @@ public class TestPutElasticsearchHttpRecord {
private static final String ISO_DATE = String.format("%d-%d-%d", DATE_YEAR, DATE_MONTH, DATE_DAY);
private static final String EXPECTED_DATE = String.format("%d/%d/%d", DATE_DAY, DATE_MONTH, DATE_YEAR);
private static final LocalDateTime LOCAL_DATE_TIME = LocalDateTime.of(DATE_YEAR, DATE_MONTH, DATE_DAY, TIME_HOUR, TIME_MINUTE);
+ private static final LocalDate LOCAL_DATE = LocalDate.of(DATE_YEAR, DATE_MONTH, DATE_DAY);
private static final LocalTime LOCAL_TIME = LocalTime.of(TIME_HOUR, TIME_MINUTE);
private TestRunner runner;
@@ -499,6 +503,141 @@ public class TestPutElasticsearchHttpRecord {
}
@Test
+ public void testPutElasticsearchOnTriggerWithNoAtTimestampPath() throws Exception {
+ PutElasticsearchHttpRecordTestProcessor processor = new PutElasticsearchHttpRecordTestProcessor(false);
+ runner = TestRunners.newTestRunner(processor);
+ generateTestData(1);
+ runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+ runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
+
+ runner.removeProperty(PutElasticsearchHttpRecord.AT_TIMESTAMP); // no default
+ runner.setProperty(PutElasticsearchHttpRecord.AT_TIMESTAMP_RECORD_PATH, "/none"); // Field does not exist
+ processor.setRecordChecks(record -> assertTimestamp(record, null)); // no @timestamp
+ runner.enqueue(new byte[0]);
+ runner.run(1, true, true);
+
+ runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+ final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
+ assertNotNull(out);
+ runner.clearTransferState();
+
+ // now add a default @timestamp
+ final String timestamp = "2020-11-27T14:37:00.000Z";
+ runner.setProperty(PutElasticsearchHttpRecord.AT_TIMESTAMP, timestamp);
+ processor.setRecordChecks(record -> assertTimestamp(record, timestamp)); // @timestamp defaulted
+ runner.enqueue(new byte[0]);
+ runner.run(1, true, true);
+
+ runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+ final MockFlowFile out2 = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
+ assertNotNull(out2);
+ }
+
+ @Test
+ public void testPutElasticsearchOnTriggerWithAtTimestampFromAttribute() throws IOException {
+ PutElasticsearchHttpRecordTestProcessor processor = new PutElasticsearchHttpRecordTestProcessor(false);
+ runner = TestRunners.newTestRunner(processor);
+ generateTestData(1);
+ runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+ runner.setProperty(PutElasticsearchHttpRecord.INDEX, "${i}");
+ runner.setProperty(PutElasticsearchHttpRecord.AT_TIMESTAMP, "${timestamp}");
+
+ final String timestamp = "2020-11-27T15:10:00.000Z";
+ processor.setRecordChecks(record -> assertTimestamp(record, timestamp));
+ runner.enqueue(new byte[0], new HashMap<String, String>() {{
+ put("doc_id", "28039652144");
+ put("i", "doc");
+ put("timestamp", timestamp);
+ }});
+ runner.run(1, true, true);
+
+ runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+ final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
+ assertNotNull(out);
+ runner.clearTransferState();
+
+ // Now try an empty attribute value, should be no timestamp
+ processor.setRecordChecks(record -> assertTimestamp(record, null));
+ runner.enqueue(new byte[0], new HashMap<String, String>() {{
+ put("doc_id", "28039652144");
+ put("i", "doc");
+ }});
+ runner.run(1, true, true);
+
+ runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+ final MockFlowFile out2 = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
+ assertNotNull(out2);
+ }
+
+ @Test
+ public void testPutElasticsearchOnTriggerWithAtTimstampPath() throws Exception {
+ PutElasticsearchHttpRecordTestProcessor processor = new PutElasticsearchHttpRecordTestProcessor(false);
+ DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern(RecordFieldType.TIME.getDefaultFormat());
+ DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(RecordFieldType.TIMESTAMP.getDefaultFormat());
+ DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern(RecordFieldType.DATE.getDefaultFormat());
+ runner = TestRunners.newTestRunner(processor);
+ generateTestData(1);
+ runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+ runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
+
+ runner.setProperty(PutElasticsearchHttpRecord.AT_TIMESTAMP_RECORD_PATH, "/ts"); // TIMESTAMP
+ processor.setRecordChecks(record -> assertTimestamp(record, LOCAL_DATE_TIME.format(dateTimeFormatter)));
+ runner.enqueue(new byte[0]);
+ runner.run(1, true, true);
+
+ runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+ assertNotNull(runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0));
+ runner.clearTransferState();
+
+ runner.setProperty(PutElasticsearchHttpRecord.AT_TIMESTAMP_RECORD_PATH, "/date"); // DATE;
+ processor.setRecordChecks(record -> assertTimestamp(record, LOCAL_DATE.format(dateFormatter)));
+ runner.enqueue(new byte[0]);
+ runner.run(1, true, true);
+
+ runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+ assertNotNull(runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0));
+ runner.clearTransferState();
+
+ runner.setProperty(PutElasticsearchHttpRecord.AT_TIMESTAMP_RECORD_PATH, "/time"); // TIME
+ processor.setRecordChecks(record -> assertTimestamp(record, LOCAL_TIME.format(timeFormatter)));
+ runner.enqueue(new byte[0]);
+ runner.run(1, true, true);
+
+ runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+ assertNotNull(runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0));
+ runner.clearTransferState();
+
+ // these INT/STRING values might not make sense from an Elasticsearch point of view,
+ // but we want to prove we can handle them being selected from the Record
+ runner.setProperty(PutElasticsearchHttpRecord.AT_TIMESTAMP_RECORD_PATH, "/code"); // INT
+ processor.setRecordChecks(record -> assertTimestamp(record, 101));
+ runner.enqueue(new byte[0]);
+ runner.run(1, true, true);
+
+ runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+ assertNotNull(runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0));
+ runner.clearTransferState();
+
+ runner.setProperty(PutElasticsearchHttpRecord.AT_TIMESTAMP_RECORD_PATH, "/name"); // STRING
+ processor.setRecordChecks(record -> assertTimestamp(record, "reç1"));
+ runner.enqueue(new byte[0]);
+ runner.run(1, true, true);
+
+ runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+ assertNotNull(runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0));
+ runner.clearTransferState();
+
+ runner.setProperty(PutElasticsearchHttpRecord.AT_TIMESTAMP_RECORD_PATH, "/coerce"); // STRING coerced to LONG
+ processor.setRecordChecks(record -> assertTimestamp(record, 1000));
+ runner.enqueue(new byte[0]);
+ runner.run(1, true, true);
+
+ runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+ assertNotNull(runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0));
+ runner.clearTransferState();
+ }
+
+ @Test
public void testPutElasticSearchOnTriggerQueryParameter() throws IOException {
PutElasticsearchHttpRecordTestProcessor p = new PutElasticsearchHttpRecordTestProcessor(false); // no failures
p.setExpectedUrl("http://127.0.0.1:9200/_bulk?pipeline=my-pipeline");
@@ -600,7 +739,7 @@ public class TestPutElasticsearchHttpRecord {
int statusCode = 200;
String statusMessage = "OK";
String expectedUrl = null;
- Consumer<Map<?, ?>>[] recordChecks;
+ Consumer<Map<String, Object>>[] recordChecks;
PutElasticsearchHttpRecordTestProcessor(boolean responseHasFailures) {
this.numResponseFailures = responseHasFailures ? 1 : 0;
@@ -620,10 +759,11 @@ public class TestPutElasticsearchHttpRecord {
}
@SafeVarargs
- final void setRecordChecks(Consumer<Map<?, ?>>... checks) {
+ final void setRecordChecks(Consumer<Map<String, Object>>... checks) {
recordChecks = checks;
}
+ @SuppressWarnings("unchecked")
@Override
protected void createElasticsearchClient(ProcessContext context) throws ProcessException {
client = mock(OkHttpClient.class);
@@ -636,7 +776,7 @@ public class TestPutElasticsearchHttpRecord {
if (recordChecks != null) {
final ObjectMapper mapper = new ObjectMapper();
Buffer sink = new Buffer();
- realRequest.body().writeTo(sink);
+ Objects.requireNonNull(realRequest.body()).writeTo(sink);
String line;
int recordIndex = 0;
boolean content = false;
@@ -798,13 +938,13 @@ public class TestPutElasticsearchHttpRecord {
parser.addSchemaField("time", RecordFieldType.TIME);
parser.addSchemaField("ts", RecordFieldType.TIMESTAMP);
parser.addSchemaField("amount", RecordFieldType.DECIMAL);
+ parser.addSchemaField("coerce", RecordFieldType.STRING);
final Date date = Date.valueOf(ISO_DATE);
final Timestamp timestamp = Timestamp.valueOf(LOCAL_DATE_TIME);
final Time time = Time.valueOf(LOCAL_TIME);
for(int i=1; i<=numRecords; i++) {
-
- parser.addRecord(i, "reç" + i, 100 + i, date, time, timestamp, new BigDecimal(Double.MAX_VALUE).multiply(BigDecimal.TEN));
+ parser.addRecord(i, "reç" + i, 100 + i, date, time, timestamp, new BigDecimal(Double.MAX_VALUE).multiply(BigDecimal.TEN), "1000");
}
}
@@ -818,4 +958,12 @@ public class TestPutElasticsearchHttpRecord {
runner.enableControllerService(writer);
runner.setProperty(PutElasticsearchHttpRecord.RECORD_WRITER, "writer");
}
+
+ private void assertTimestamp(final Map<String, Object> record, final Object timestamp) {
+ if (timestamp == null) {
+ assertFalse(record.containsKey("@timestamp"));
+ } else {
+ assertEquals(timestamp, record.get("@timestamp"));
+ }
+ }
}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml
index fa874fb..5389d0f 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml
@@ -20,12 +20,6 @@ language governing permissions and limitations under the License. -->
<artifactId>nifi-elasticsearch-restapi-processors</artifactId>
<packaging>jar</packaging>
- <properties>
- <slf4jversion>2.7</slf4jversion>
- <es.version>5.6.6</es.version>
- <lucene.version>6.2.1</lucene.version>
- </properties>
-
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
@@ -113,6 +107,12 @@ language governing permissions and limitations under the License. -->
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-standard-record-utils</artifactId>
+ <version>1.15.0-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock-record-utils</artifactId>
<version>1.15.0-SNAPSHOT</version>
<scope>test</scope>
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
index 49166ff..96c0c21 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
@@ -50,8 +50,11 @@ import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleDateFormatValidator;
+import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import java.io.InputStream;
@@ -89,23 +92,33 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
.build();
static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder()
- .name("put-es-record-index-op")
- .displayName("Index Operation")
- .description("The type of the operation used to index (create, delete, index, update, upsert)")
- .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .defaultValue(IndexOperationRequest.Operation.Index.getValue())
- .required(true)
- .build();
+ .name("put-es-record-index-op")
+ .displayName("Index Operation")
+ .description("The type of the operation used to index (create, delete, index, update, upsert)")
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .defaultValue(IndexOperationRequest.Operation.Index.getValue())
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor AT_TIMESTAMP = new PropertyDescriptor.Builder()
+ .name("put-es-record-at-timestamp")
+ .displayName("@timestamp Value")
+ .description("The value to use as the @timestamp field (required for Elasticsearch Data Streams)")
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .required(false)
+ .build();
static final PropertyDescriptor INDEX_OP_RECORD_PATH = new PropertyDescriptor.Builder()
- .name("put-es-record-index-op-path")
- .displayName("Index Operation Record Path")
- .description("A record path expression to retrieve the Index Operation field for use with Elasticsearch. If left blank " +
- "the Index Operation will be determined using the main Index Operation property.")
- .addValidator(new RecordPathValidator())
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .build();
+ .name("put-es-record-index-op-path")
+ .displayName("Index Operation Record Path")
+ .description("A record path expression to retrieve the Index Operation field for use with Elasticsearch. If left blank " +
+ "the Index Operation will be determined using the main Index Operation property.")
+ .addValidator(new RecordPathValidator())
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
static final PropertyDescriptor ID_RECORD_PATH = new PropertyDescriptor.Builder()
.name("put-es-record-id-path")
@@ -113,6 +126,19 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
.description("A record path expression to retrieve the ID field for use with Elasticsearch. If left blank " +
"the ID will be automatically generated by Elasticsearch.")
.addValidator(new RecordPathValidator())
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ static final PropertyDescriptor RETAIN_ID_FIELD = new PropertyDescriptor.Builder()
+ .name("put-es-record-retain-id-field")
+ .displayName("Retain ID (Record Path)")
+ .description("Whether to retain the existing field used as the ID Record Path.")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .required(false)
+ .dependsOn(ID_RECORD_PATH)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
@@ -136,6 +162,27 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
+ static final PropertyDescriptor AT_TIMESTAMP_RECORD_PATH = new PropertyDescriptor.Builder()
+ .name("put-es-record-at-timestamp-path")
+ .displayName("@timestamp Record Path")
+ .description("A RecordPath pointing to a field in the record(s) that contains the @timestamp for the document. " +
+ "If left blank the @timestamp will be determined using the main @timestamp property")
+ .addValidator(new RecordPathValidator())
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ static final PropertyDescriptor RETAIN_AT_TIMESTAMP_FIELD = new PropertyDescriptor.Builder()
+ .name("put-es-record-retain-at-timestamp-field")
+ .displayName("Retain @timestamp (Record Path)")
+ .description("Whether to retain the existing field used as the @timestamp Record Path.")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .required(false)
+ .dependsOn(AT_TIMESTAMP_RECORD_PATH)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
static final PropertyDescriptor ERROR_RECORD_WRITER = new PropertyDescriptor.Builder()
.name("put-es-record-error-writer")
.displayName("Error Record Writer")
@@ -147,9 +194,51 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
.required(false)
.build();
+ static final PropertyDescriptor AT_TIMESTAMP_DATE_FORMAT = new PropertyDescriptor.Builder()
+ .name("put-es-record-at-timestamp-date-format")
+ .displayName("@Timestamp Record Path Date Format")
+ .description("Specifies the format to use when writing Date field for @timestamp. "
+ + "If not specified, the default format '" + RecordFieldType.DATE.getDefaultFormat() + "' is used. "
+ + "If specified, the value must match the Java Simple Date Format (for example, MM/dd/yyyy for a two-digit month, followed by "
+ + "a two-digit day, followed by a four-digit year, all separated by '/' characters, as in 01/25/2017).")
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(new SimpleDateFormatValidator())
+ .required(false)
+ .dependsOn(AT_TIMESTAMP_RECORD_PATH)
+ .build();
+
+ static final PropertyDescriptor AT_TIMESTAMP_TIME_FORMAT = new PropertyDescriptor.Builder()
+ .name("put-es-record-at-timestamp-time-format")
+ .displayName("@Timestamp Record Path Time Format")
+ .description("Specifies the format to use when writing Time field for @timestamp. "
+ + "If not specified, the default format '" + RecordFieldType.TIME.getDefaultFormat() + "' is used. "
+ + "If specified, the value must match the Java Simple Date Format (for example, HH:mm:ss for a two-digit hour in 24-hour format, followed by "
+ + "a two-digit minute, followed by a two-digit second, all separated by ':' characters, as in 18:04:15).")
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(new SimpleDateFormatValidator())
+ .required(false)
+ .dependsOn(AT_TIMESTAMP_RECORD_PATH)
+ .build();
+
+ static final PropertyDescriptor AT_TIMESTAMP_TIMESTAMP_FORMAT = new PropertyDescriptor.Builder()
+ .name("put-es-record-at-timestamp-timestamp-format")
+ .displayName("@Timestamp Record Path Timestamp Format")
+ .description("Specifies the format to use when writing Timestamp field for @timestamp. "
+ + "If not specified, the default format '" + RecordFieldType.TIMESTAMP.getDefaultFormat() + "' is used. "
+ + "If specified, the value must match the Java Simple Date Format (for example, MM/dd/yyyy HH:mm:ss for a two-digit month, followed by "
+ + "a two-digit day, followed by a four-digit year, all separated by '/' characters; and then followed by a two-digit hour in 24-hour format, followed by "
+ + "a two-digit minute, followed by a two-digit second, all separated by ':' characters, as in 01/25/2017 18:04:15).")
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(new SimpleDateFormatValidator())
+ .required(false)
+ .dependsOn(AT_TIMESTAMP_RECORD_PATH)
+ .build();
+
static final List<PropertyDescriptor> DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
- INDEX_OP, INDEX, TYPE, CLIENT_SERVICE, RECORD_READER, BATCH_SIZE, ID_RECORD_PATH, INDEX_OP_RECORD_PATH,
- INDEX_RECORD_PATH, TYPE_RECORD_PATH, LOG_ERROR_RESPONSES, ERROR_RECORD_WRITER
+ INDEX_OP, INDEX, TYPE, AT_TIMESTAMP, CLIENT_SERVICE, RECORD_READER, BATCH_SIZE, ID_RECORD_PATH, RETAIN_ID_FIELD,
+ INDEX_OP_RECORD_PATH, INDEX_RECORD_PATH, TYPE_RECORD_PATH, AT_TIMESTAMP_RECORD_PATH, RETAIN_AT_TIMESTAMP_FIELD,
+ AT_TIMESTAMP_DATE_FORMAT, AT_TIMESTAMP_TIME_FORMAT, AT_TIMESTAMP_TIMESTAMP_FORMAT, LOG_ERROR_RESPONSES,
+ ERROR_RECORD_WRITER
));
static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
REL_SUCCESS, REL_FAILURE, REL_RETRY, REL_FAILED_RECORDS
@@ -170,6 +259,9 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
private ElasticSearchClientService clientService;
private RecordSetWriterFactory writerFactory;
private boolean logErrors;
+ private volatile String dateFormat;
+ private volatile String timeFormat;
+ private volatile String timestampFormat;
@OnScheduled
public void onScheduled(ProcessContext context) {
@@ -178,6 +270,19 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
this.recordPathCache = new RecordPathCache(16);
this.writerFactory = context.getProperty(ERROR_RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
this.logErrors = context.getProperty(LOG_ERROR_RESPONSES).asBoolean();
+
+ this.dateFormat = context.getProperty(AT_TIMESTAMP_DATE_FORMAT).evaluateAttributeExpressions().getValue();
+ if (this.dateFormat == null) {
+ this.dateFormat = RecordFieldType.DATE.getDefaultFormat();
+ }
+ this.timeFormat = context.getProperty(AT_TIMESTAMP_TIME_FORMAT).evaluateAttributeExpressions().getValue();
+ if (this.timeFormat == null) {
+ this.timeFormat = RecordFieldType.TIME.getDefaultFormat();
+ }
+ this.timestampFormat = context.getProperty(AT_TIMESTAMP_TIMESTAMP_FORMAT).evaluateAttributeExpressions().getValue();
+ if (this.timestampFormat == null) {
+ this.timestampFormat = RecordFieldType.TIMESTAMP.getDefaultFormat();
+ }
}
static final List<String> ALLOWED_INDEX_OPERATIONS = Collections.unmodifiableList(Arrays.asList(
@@ -214,7 +319,7 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
}
@Override
- public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+ public void onTrigger(ProcessContext context, ProcessSession session) {
FlowFile input = session.get();
if (input == null) {
return;
@@ -223,24 +328,22 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
final String indexOp = context.getProperty(INDEX_OP).evaluateAttributeExpressions(input).getValue();
final String index = context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue();
final String type = context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue();
+ final String atTimestamp = context.getProperty(AT_TIMESTAMP).evaluateAttributeExpressions(input).getValue();
- final String indexOpPath = context.getProperty(INDEX_OP_RECORD_PATH).isSet()
- ? context.getProperty(INDEX_OP_RECORD_PATH).evaluateAttributeExpressions(input).getValue()
- : null;
- final String idPath = context.getProperty(ID_RECORD_PATH).isSet()
- ? context.getProperty(ID_RECORD_PATH).evaluateAttributeExpressions(input).getValue()
- : null;
- final String indexPath = context.getProperty(INDEX_RECORD_PATH).isSet()
- ? context.getProperty(INDEX_RECORD_PATH).evaluateAttributeExpressions(input).getValue()
- : null;
- final String typePath = context.getProperty(TYPE_RECORD_PATH).isSet()
- ? context.getProperty(TYPE_RECORD_PATH).evaluateAttributeExpressions(input).getValue()
- : null;
+ final String indexOpPath = context.getProperty(INDEX_OP_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+ final String idPath = context.getProperty(ID_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+ final String indexPath = context.getProperty(INDEX_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+ final String typePath = context.getProperty(TYPE_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+ final String atTimestampPath = context.getProperty(AT_TIMESTAMP_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
RecordPath ioPath = indexOpPath != null ? recordPathCache.getCompiled(indexOpPath) : null;
RecordPath path = idPath != null ? recordPathCache.getCompiled(idPath) : null;
RecordPath iPath = indexPath != null ? recordPathCache.getCompiled(indexPath) : null;
RecordPath tPath = typePath != null ? recordPathCache.getCompiled(typePath) : null;
+ RecordPath atPath = atTimestampPath != null ? recordPathCache.getCompiled(atTimestampPath) : null;
+
+ boolean retainId = context.getProperty(RETAIN_ID_FIELD).evaluateAttributeExpressions(input).asBoolean();
+ boolean retainTimestamp = context.getProperty(RETAIN_AT_TIMESTAMP_FIELD).evaluateAttributeExpressions(input).asBoolean();
int batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(input).asInteger();
List<FlowFile> badRecords = new ArrayList<>();
@@ -252,13 +355,16 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
List<Record> originals = new ArrayList<>();
while ((record = reader.nextRecord()) != null) {
- final String idx = getFromRecordPath(record, iPath, index);
- final String t = getFromRecordPath(record, tPath, type);
- final IndexOperationRequest.Operation o = IndexOperationRequest.Operation.forValue(getFromRecordPath(record, ioPath, indexOp));
- final String id = path != null ? getFromRecordPath(record, path, null) : null;
+ final String idx = getFromRecordPath(record, iPath, index, false);
+ final String t = getFromRecordPath(record, tPath, type, false);
+ final IndexOperationRequest.Operation o = IndexOperationRequest.Operation.forValue(getFromRecordPath(record, ioPath, indexOp, false));
+ final String id = getFromRecordPath(record, path, null, retainId);
+ final Object timestamp = getTimestampFromRecordPath(record, atPath, atTimestamp, retainTimestamp);
@SuppressWarnings("unchecked")
- Map<String, Object> contentMap = (Map<String, Object>) DataTypeUtils.convertRecordFieldtoObject(record, RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
+ final Map<String, Object> contentMap = (Map<String, Object>) DataTypeUtils
+ .convertRecordFieldtoObject(record, RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
+ contentMap.putIfAbsent("@timestamp", timestamp);
operationList.add(new IndexOperationRequest(idx, t, id, contentMap, o));
originals.add(record);
@@ -275,7 +381,7 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
}
}
- if (operationList.size() > 0) {
+ if (!operationList.isEmpty()) {
BulkOperation bundle = new BulkOperation(operationList, originals, reader.getSchema());
FlowFile bad = indexDocuments(bundle, session, input);
if (bad != null) {
@@ -315,7 +421,7 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
List<Map<String, Object>> errors = response.getItems();
ObjectMapper mapper = new ObjectMapper();
mapper.enable(SerializationFeature.INDENT_OUTPUT);
- String output = String.format("An error was encountered while processing bulk operations. Server response below:\n\n%s", mapper.writeValueAsString(errors));
+ String output = String.format("An error was encountered while processing bulk operations. Server response below:%n%n%s", mapper.writeValueAsString(errors));
if (logErrors) {
getLogger().error(output);
@@ -326,26 +432,26 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
if (writerFactory != null) {
FlowFile errorFF = session.create(input);
- try (OutputStream os = session.write(errorFF);
- RecordSetWriter writer = writerFactory.createWriter(getLogger(), bundle.getSchema(), os, errorFF )) {
-
+ try {
int added = 0;
- writer.beginRecordSet();
- for (int index = 0; index < response.getItems().size(); index++) {
- Map<String, Object> current = response.getItems().get(index);
- if (!current.isEmpty()) {
- String key = current.keySet().iterator().next();
- @SuppressWarnings("unchecked")
- Map<String, Object> inner = (Map<String, Object>) current.get(key);
- if (inner.containsKey("error")) {
- writer.write(bundle.getOriginalRecords().get(index));
- added++;
+ try (OutputStream os = session.write(errorFF);
+ RecordSetWriter writer = writerFactory.createWriter(getLogger(), bundle.getSchema(), os, errorFF )) {
+
+ writer.beginRecordSet();
+ for (int index = 0; index < response.getItems().size(); index++) {
+ Map<String, Object> current = response.getItems().get(index);
+ if (!current.isEmpty()) {
+ String key = current.keySet().stream().findFirst().orElse(null);
+ @SuppressWarnings("unchecked")
+ Map<String, Object> inner = (Map<String, Object>) current.get(key);
+ if (inner != null && inner.containsKey("error")) {
+ writer.write(bundle.getOriginalRecords().get(index));
+ added++;
+ }
}
}
+ writer.finishRecordSet();
}
- writer.finishRecordSet();
- writer.close();
- os.close();
errorFF = session.putAttribute(errorFF, ATTR_RECORD_COUNT, String.valueOf(added));
@@ -362,7 +468,8 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
return null;
}
- private String getFromRecordPath(final Record record, final RecordPath path, final String fallback) {
+ private String getFromRecordPath(final Record record, final RecordPath path, final String fallback,
+ final boolean retain) {
if (path == null) {
return fallback;
}
@@ -377,11 +484,97 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
);
}
- fieldValue.updateValue(null);
+ if (!retain) {
+ fieldValue.updateValue(null);
+ }
return fieldValue.getValue().toString();
} else {
return fallback;
}
}
+
+ private Object getTimestampFromRecordPath(final Record record, final RecordPath path, final String fallback,
+ final boolean retain) {
+ if (path == null) {
+ return coerceStringToLong("@timestamp", fallback);
+ }
+
+ final RecordPathResult result = path.evaluate(record);
+ final Optional<FieldValue> value = result.getSelectedFields().findFirst();
+ if (value.isPresent() && value.get().getValue() != null) {
+ final FieldValue fieldValue = value.get();
+
+ final DataType dataType = fieldValue.getField().getDataType();
+ final String fieldName = fieldValue.getField().getFieldName();
+ final DataType chosenDataType = dataType.getFieldType() == RecordFieldType.CHOICE
+ ? DataTypeUtils.chooseDataType(value, (ChoiceDataType) dataType)
+ : dataType;
+ final Object coercedValue = DataTypeUtils.convertType(fieldValue.getValue(), chosenDataType, fieldName);
+ if (coercedValue == null) {
+ return null;
+ }
+
+ final Object returnValue;
+ switch (chosenDataType.getFieldType()) {
+ case DATE:
+ case TIME:
+ case TIMESTAMP:
+ final String format = determineDateFormat(chosenDataType.getFieldType());
+ returnValue = coerceStringToLong(
+ fieldName,
+ DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(format))
+ );
+ break;
+ case LONG:
+ returnValue = DataTypeUtils.toLong(coercedValue, fieldName);
+ break;
+ case INT:
+ case BYTE:
+ case SHORT:
+ returnValue = DataTypeUtils.toInteger(coercedValue, fieldName);
+ break;
+ case CHAR:
+ case STRING:
+ returnValue = coerceStringToLong(fieldName, coercedValue.toString());
+ break;
+ case BIGINT:
+ returnValue = coercedValue;
+ break;
+ default:
+ throw new ProcessException(
+ String.format("Cannot use %s field referenced by %s as @timestamp.", chosenDataType, path.getPath())
+ );
+ }
+
+ if (!retain) {
+ fieldValue.updateValue(null);
+ }
+
+ return returnValue;
+ } else {
+ return coerceStringToLong("@timestamp", fallback);
+ }
+ }
+
+ private String determineDateFormat(final RecordFieldType recordFieldType) {
+ final String format;
+ switch (recordFieldType) {
+ case DATE:
+ format = this.dateFormat;
+ break;
+ case TIME:
+ format = this.timeFormat;
+ break;
+ default:
+ format = this.timestampFormat;
+ }
+ return format;
+ }
+
+ private Object coerceStringToLong(final String fieldName, final String stringValue) {
+ return DataTypeUtils.isLongTypeCompatible(stringValue)
+ ? DataTypeUtils.toLong(stringValue, fieldName)
+ : stringValue;
+ }
}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
index ccd2d70..12df823 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
@@ -28,6 +28,7 @@ import org.apache.nifi.schema.access.SchemaAccessUtils
import org.apache.nifi.serialization.RecordReaderFactory
import org.apache.nifi.serialization.record.MockRecordParser
import org.apache.nifi.serialization.record.MockSchemaRegistry
+import org.apache.nifi.serialization.record.RecordFieldType
import org.apache.nifi.util.StringUtils
import org.apache.nifi.util.TestRunner
import org.apache.nifi.util.TestRunners
@@ -35,10 +36,29 @@ import org.junit.Assert
import org.junit.Before
import org.junit.Test
+import java.sql.Date
+import java.sql.Time
+import java.sql.Timestamp
+import java.time.LocalDate
+import java.time.LocalDateTime
+import java.time.LocalTime
+import java.time.format.DateTimeFormatter
+
import static groovy.json.JsonOutput.prettyPrint
import static groovy.json.JsonOutput.toJson
class PutElasticsearchRecordTest {
+ private static final int DATE_YEAR = 2020
+ private static final int DATE_MONTH = 11
+ private static final int DATE_DAY = 27
+ private static final int TIME_HOUR = 12
+ private static final int TIME_MINUTE = 55
+ private static final int TIME_SECOND = 23
+
+ private static final LocalDateTime LOCAL_DATE_TIME = LocalDateTime.of(DATE_YEAR, DATE_MONTH, DATE_DAY, TIME_HOUR, TIME_MINUTE, TIME_SECOND)
+ private static final LocalDate LOCAL_DATE = LocalDate.of(DATE_YEAR, DATE_MONTH, DATE_DAY)
+ private static final LocalTime LOCAL_TIME = LocalTime.of(TIME_HOUR, TIME_MINUTE, TIME_SECOND)
+
MockBulkLoadClientService clientService
MockSchemaRegistry registry
RecordReaderFactory reader
@@ -73,10 +93,12 @@ class PutElasticsearchRecordTest {
runner.addControllerService("reader", reader)
runner.addControllerService("clientService", clientService)
runner.setProperty(reader, SchemaAccessUtils.SCHEMA_REGISTRY, "registry")
+ runner.setProperty(reader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_NAME_PROPERTY)
runner.setProperty(PutElasticsearchRecord.RECORD_READER, "reader")
- runner.setProperty(PutElasticsearchRecord.INDEX_OP, "index")
+ runner.setProperty(PutElasticsearchRecord.INDEX_OP, IndexOperationRequest.Operation.Index.getValue())
runner.setProperty(PutElasticsearchRecord.INDEX, "test_index")
runner.setProperty(PutElasticsearchRecord.TYPE, "test_type")
+ runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP, "test_timestamp")
runner.setProperty(PutElasticsearchRecord.CLIENT_SERVICE, "clientService")
runner.enableControllerService(registry)
runner.enableControllerService(reader)
@@ -86,6 +108,23 @@ class PutElasticsearchRecordTest {
}
void basicTest(int failure, int retry, int success) {
+ def evalClosure = { List<IndexOperationRequest> items ->
+ int timestampDefaultCount = items.findAll { it.fields.get("@timestamp") == "test_timestamp" }.size()
+ int indexCount = items.findAll { it.index == "test_index" }.size()
+ int typeCount = items.findAll { it.type == "test_type" }.size()
+ int opCount = items.findAll { it.operation == IndexOperationRequest.Operation.Index }.size()
+ Assert.assertEquals(2, timestampDefaultCount)
+ Assert.assertEquals(2, indexCount)
+ Assert.assertEquals(2, typeCount)
+ Assert.assertEquals(2, opCount)
+ }
+
+ basicTest(failure, retry, success, evalClosure)
+ }
+
+ void basicTest(int failure, int retry, int success, Closure evalClosure) {
+ clientService.evalClosure = evalClosure
+
runner.enqueue(flowFileContents, [ "schema.name": "simple" ])
runner.run()
@@ -100,6 +139,17 @@ class PutElasticsearchRecordTest {
}
@Test
+ void simpleTestCoercedDefaultTimestamp() {
+ def evalClosure = { List<IndexOperationRequest> items ->
+ int timestampDefault = items.findAll { it.fields.get("@timestamp") == 100L }.size()
+ Assert.assertEquals(2, timestampDefault)
+ }
+
+ runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP, "100")
+ basicTest(0, 0, 1, evalClosure)
+ }
+
+ @Test
void simpleTestWithMockReader() {
reader = new MockRecordParser()
runner.addControllerService("mockReader", reader)
@@ -127,14 +177,19 @@ class PutElasticsearchRecordTest {
name: "RecordPathTestType",
fields: [
[ name: "id", type: "string" ],
+ [ name: "op", type: "string" ],
[ name: "index", type: "string" ],
[ name: "type", type: "string" ],
- [ name: "msg", type: ["null", "string"] ]
+ [ name: "msg", type: ["null", "string"] ],
+ [ name: "ts", type: [ type: "long", logicalType: "timestamp-millis" ] ],
+ [ name: "date", type: [ type: "int", logicalType: "date" ] ],
+ [ name: "time", type: [ type: "int", logicalType: "time-millis" ] ],
+ [ name: "code", type: "long" ]
]
]))
def flowFileContents = prettyPrint(toJson([
- [ id: "rec-1", op: "index", index: "bulk_a", type: "message", msg: "Hello" ],
+ [ id: "rec-1", op: "index", index: "bulk_a", type: "message", msg: "Hello", ts: Timestamp.valueOf(LOCAL_DATE_TIME).toInstant().toEpochMilli() ],
[ id: "rec-2", op: "index", index: "bulk_b", type: "message", msg: "Hello" ],
[ id: "rec-3", op: "index", index: "bulk_a", type: "message", msg: "Hello" ],
[ id: "rec-4", op: "index", index: "bulk_b", type: "message", msg: "Hello" ],
@@ -143,13 +198,19 @@ class PutElasticsearchRecordTest {
]))
def evalClosure = { List<IndexOperationRequest> items ->
- def a = items.findAll { it.index == "bulk_a" }.size()
- def b = items.findAll { it.index == "bulk_b" }.size()
+ int a = items.findAll { it.index == "bulk_a" }.size()
+ int b = items.findAll { it.index == "bulk_b" }.size()
int index = items.findAll { it.operation == IndexOperationRequest.Operation.Index }.size()
int create = items.findAll { it.operation == IndexOperationRequest.Operation.Create }.size()
int msg = items.findAll { ("Hello" == it.fields.get("msg")) }.size()
int empties = items.findAll { ("" == it.fields.get("msg")) }.size()
int nulls = items.findAll { (null == it.fields.get("msg")) }.size()
+ int timestamp = items.findAll { it.fields.get("@timestamp") ==
+ LOCAL_DATE_TIME.format(DateTimeFormatter.ofPattern(RecordFieldType.TIMESTAMP.getDefaultFormat()))
+ }.size()
+ int timestampDefault = items.findAll { it.fields.get("@timestamp") == "test_timestamp" }.size()
+ int ts = items.findAll { it.fields.get("ts") != null }.size()
+ int id = items.findAll { it.fields.get("id") != null }.size()
items.each {
Assert.assertNotNull(it.id)
Assert.assertTrue(it.id.startsWith("rec-"))
@@ -162,6 +223,10 @@ class PutElasticsearchRecordTest {
Assert.assertEquals(4, msg)
Assert.assertEquals(1, empties)
Assert.assertEquals(1, nulls)
+ Assert.assertEquals(1, timestamp)
+ Assert.assertEquals(5, timestampDefault)
+ Assert.assertEquals(0, ts)
+ Assert.assertEquals(0, id)
}
clientService.evalClosure = evalClosure
@@ -173,6 +238,7 @@ class PutElasticsearchRecordTest {
runner.setProperty(PutElasticsearchRecord.ID_RECORD_PATH, "/id")
runner.setProperty(PutElasticsearchRecord.INDEX_RECORD_PATH, "/index")
runner.setProperty(PutElasticsearchRecord.TYPE_RECORD_PATH, "/type")
+ runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP_RECORD_PATH, "/ts")
runner.enqueue(flowFileContents, [
"schema.name": "recordPathTest"
])
@@ -185,12 +251,12 @@ class PutElasticsearchRecordTest {
runner.clearTransferState()
flowFileContents = prettyPrint(toJson([
- [ id: "rec-1", op: null, index: null, type: null, msg: "Hello" ],
- [ id: "rec-2", op: null, index: null, type: null, msg: "Hello" ],
- [ id: "rec-3", op: null, index: null, type: null, msg: "Hello" ],
- [ id: "rec-4", op: null, index: null, type: null, msg: "Hello" ],
- [ id: "rec-5", op: "update", index: null, type: null, msg: "Hello" ],
- [ id: "rec-6", op: null, index: "bulk_b", type: "message", msg: "Hello" ]
+ [ id: "rec-1", op: null, index: null, type: null, msg: "Hello", date: Date.valueOf(LOCAL_DATE).getTime() ],
+ [ id: "rec-2", op: null, index: null, type: null, msg: "Hello" ],
+ [ id: "rec-3", op: null, index: null, type: null, msg: "Hello" ],
+ [ id: "rec-4", op: null, index: null, type: null, msg: "Hello" ],
+ [ id: "rec-5", op: "update", index: null, type: null, msg: "Hello" ],
+ [ id: "rec-6", op: null, index: "bulk_b", type: "message", msg: "Hello" ]
]))
evalClosure = { List<IndexOperationRequest> items ->
@@ -200,17 +266,32 @@ class PutElasticsearchRecordTest {
def bulkIndexCount = items.findAll { it.index.startsWith("bulk_") }.size()
def indexOperationCount = items.findAll { it.operation == IndexOperationRequest.Operation.Index }.size()
def updateOperationCount = items.findAll { it.operation == IndexOperationRequest.Operation.Update }.size()
+ def timestampCount = items.findAll { it.fields.get("@timestamp") ==
+ LOCAL_DATE.format(DateTimeFormatter.ofPattern("dd/MM/yyyy"))
+ }.size()
+ int dateCount = items.findAll { it.fields.get("date") != null }.size()
+ def idCount = items.findAll { it.fields.get("id") != null }.size()
+ def defaultCoercedTimestampCount = items.findAll { it.fields.get("@timestamp") == 100L }.size()
Assert.assertEquals(5, testTypeCount)
Assert.assertEquals(1, messageTypeCount)
Assert.assertEquals(5, testIndexCount)
Assert.assertEquals(1, bulkIndexCount)
Assert.assertEquals(5, indexOperationCount)
Assert.assertEquals(1, updateOperationCount)
+ Assert.assertEquals(1, timestampCount)
+ Assert.assertEquals(5, defaultCoercedTimestampCount)
+ Assert.assertEquals(1, dateCount)
+ Assert.assertEquals(6, idCount)
}
clientService.evalClosure = evalClosure
runner.setProperty(PutElasticsearchRecord.INDEX_OP, "\${operation}")
+ runner.setProperty(PutElasticsearchRecord.RETAIN_ID_FIELD, "true")
+ runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP, "100")
+ runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP_RECORD_PATH, "/date")
+ runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP_DATE_FORMAT, "dd/MM/yyyy")
+ runner.setProperty(PutElasticsearchRecord.RETAIN_AT_TIMESTAMP_FIELD, "true")
runner.enqueue(flowFileContents, [
"schema.name": "recordPathTest",
"operation": "index"
@@ -228,7 +309,7 @@ class PutElasticsearchRecordTest {
[ id: "rec-3", msg: "Hello" ],
[ id: "rec-4", msg: "Hello" ],
[ id: "rec-5", msg: "Hello" ],
- [ id: "rec-6", type: "message", msg: "Hello" ]
+ [ id: "rec-6", type: "message", msg: "Hello", time: Time.valueOf(LOCAL_TIME).getTime() ]
]))
evalClosure = { List<IndexOperationRequest> items ->
@@ -236,15 +317,21 @@ class PutElasticsearchRecordTest {
def messageTypeCount = items.findAll { it.type == "message" }.size()
def nullIdCount = items.findAll { it.id == null }.size()
def recIdCount = items.findAll { StringUtils.startsWith(it.id, "rec-") }.size()
+ def timestampCount = items.findAll { it.fields.get("@timestamp") ==
+ LOCAL_TIME.format(DateTimeFormatter.ofPattern(RecordFieldType.TIME.getDefaultFormat()))
+ }.size()
Assert.assertEquals("null type", 5, nullTypeCount)
Assert.assertEquals("message type", 1, messageTypeCount)
Assert.assertEquals("null id", 2, nullIdCount)
Assert.assertEquals("rec- id", 4, recIdCount)
+ Assert.assertEquals("@timestamp", 1, timestampCount)
}
clientService.evalClosure = evalClosure
runner.setProperty(PutElasticsearchRecord.INDEX_OP, "index")
+ runner.removeProperty(PutElasticsearchRecord.AT_TIMESTAMP)
+ runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP_RECORD_PATH, "/time")
runner.removeProperty(PutElasticsearchRecord.TYPE)
runner.enqueue(flowFileContents, [
"schema.name": "recordPathTest"
@@ -262,7 +349,7 @@ class PutElasticsearchRecordTest {
[ id: "rec-3", op: "update", index: "bulk_a", type: "message", msg: "Hello" ],
[ id: "rec-4", op: "upsert", index: "bulk_b", type: "message", msg: "Hello" ],
[ id: "rec-5", op: "create", index: "bulk_a", type: "message", msg: "Hello" ],
- [ id: "rec-6", op: "delete", index: "bulk_b", type: "message", msg: "Hello" ]
+ [ id: "rec-6", op: "delete", index: "bulk_b", type: "message", msg: "Hello", code: 101L ]
]))
clientService.evalClosure = { List<IndexOperationRequest> items ->
@@ -271,13 +358,18 @@ class PutElasticsearchRecordTest {
int update = items.findAll { it.operation == IndexOperationRequest.Operation.Update }.size()
int upsert = items.findAll { it.operation == IndexOperationRequest.Operation.Upsert }.size()
int delete = items.findAll { it.operation == IndexOperationRequest.Operation.Delete }.size()
+ def timestampCount = items.findAll { it.fields.get("@timestamp") == 101L }.size()
+ def noTimestampCount = items.findAll { it.fields.get("@timestamp") == null }.size()
Assert.assertEquals(1, index)
Assert.assertEquals(2, create)
Assert.assertEquals(1, update)
Assert.assertEquals(1, upsert)
Assert.assertEquals(1, delete)
+ Assert.assertEquals(1, timestampCount)
+ Assert.assertEquals(5, noTimestampCount)
}
+ runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP_RECORD_PATH, "/code")
runner.enqueue(flowFileContents, [
"schema.name": "recordPathTest"
])
@@ -285,6 +377,26 @@ class PutElasticsearchRecordTest {
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
+
+ runner.clearTransferState()
+
+ flowFileContents = prettyPrint(toJson([
+ [ id: "rec-1", op: "index", index: "bulk_a", type: "message", msg: "Hello" ]
+ ]))
+
+ clientService.evalClosure = { List<IndexOperationRequest> items ->
+ def timestampCount = items.findAll { it.fields.get("@timestamp") == "Hello" }.size()
+ Assert.assertEquals(1, timestampCount)
+ }
+
+ runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP_RECORD_PATH, "/msg")
+ runner.enqueue(flowFileContents, [
+ "schema.name": "recordPathTest"
+ ])
+ runner.run()
+ runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
}
@Test
@@ -354,4 +466,4 @@ class PutElasticsearchRecordTest {
def errorFF = runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS)[0]
assert errorFF.getAttribute(PutElasticsearchRecord.ATTR_RECORD_COUNT) == "1"
}
-}
\ No newline at end of file
+}