You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2019/01/10 02:57:00 UTC

[GitHub] bdesert closed pull request #3227: NIFI-5909 PutElasticsearchHttpRecord doesn't allow to customize the timestamp format

bdesert closed pull request #3227: NIFI-5909 PutElasticsearchHttpRecord doesn't allow to customize the timestamp format
URL: https://github.com/apache/nifi/pull/3227
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml
index d4536cd037..4db59f6078 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml
@@ -122,6 +122,11 @@ language governing permissions and limitations under the License. -->
             <artifactId>jackson-databind</artifactId>
             <version>${jackson.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-standard-record-utils</artifactId>
+            <version>1.9.0-SNAPSHOT</version>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
index 52de42442a..d431960d3d 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
@@ -55,6 +55,7 @@
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SimpleDateFormatValidator;
 import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordField;
@@ -178,6 +179,38 @@
             .required(true)
             .build();
 
+    static final PropertyDescriptor DATE_FORMAT = new PropertyDescriptor.Builder()
+            .name("Date Format")
+            .description("Specifies the format to use when reading/writing Date fields. "
+                    + "If not specified, the default format '" + RecordFieldType.DATE.getDefaultFormat() + "' is used. "
+                    + "If specified, the value must match the Java Simple Date Format (for example, MM/dd/yyyy for a two-digit month, followed by "
+                    + "a two-digit day, followed by a four-digit year, all separated by '/' characters, as in 01/01/2017).")
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(new SimpleDateFormatValidator())
+            .required(false)
+            .build();
+    static final PropertyDescriptor TIME_FORMAT = new PropertyDescriptor.Builder()
+            .name("Time Format")
+            .description("Specifies the format to use when reading/writing Time fields. "
+                    + "If not specified, the default format '" + RecordFieldType.TIME.getDefaultFormat() + "' is used. "
+                    + "If specified, the value must match the Java Simple Date Format (for example, HH:mm:ss for a two-digit hour in 24-hour format, followed by "
+                    + "a two-digit minute, followed by a two-digit second, all separated by ':' characters, as in 18:04:15).")
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(new SimpleDateFormatValidator())
+            .required(false)
+            .build();
+    static final PropertyDescriptor TIMESTAMP_FORMAT = new PropertyDescriptor.Builder()
+            .name("Timestamp Format")
+            .description("Specifies the format to use when reading/writing Timestamp fields. "
+                    + "If not specified, the default format '" + RecordFieldType.TIMESTAMP.getDefaultFormat() + "' is used. "
+                    + "If specified, the value must match the Java Simple Date Format (for example, MM/dd/yyyy HH:mm:ss for a two-digit month, followed by "
+                    + "a two-digit day, followed by a four-digit year, all separated by '/' characters; and then followed by a two-digit hour in 24-hour format, followed by "
+                    + "a two-digit minute, followed by a two-digit second, all separated by ':' characters, as in 01/01/2017 18:04:15).")
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(new SimpleDateFormatValidator())
+            .required(false)
+            .build();
+
     private static final Set<Relationship> relationships;
     private static final List<PropertyDescriptor> propertyDescriptors;
 
@@ -186,6 +219,9 @@
     private final JsonFactory factory = new JsonFactory();
 
     private volatile String nullSuppression;
+    private volatile String dateFormat;
+    private volatile String timeFormat;
+    private volatile String timestampFormat;
 
     static {
         final Set<Relationship> _rels = new HashSet<>();
@@ -202,6 +238,9 @@
         descriptors.add(CHARSET);
         descriptors.add(INDEX_OP);
         descriptors.add(SUPPRESS_NULLS);
+        descriptors.add(DATE_FORMAT);
+        descriptors.add(TIME_FORMAT);
+        descriptors.add(TIMESTAMP_FORMAT);
 
         propertyDescriptors = Collections.unmodifiableList(descriptors);
     }
@@ -248,6 +287,18 @@
     public void setup(ProcessContext context) {
         super.setup(context);
         recordPathCache = new RecordPathCache(10);
+        this.dateFormat = context.getProperty(DATE_FORMAT).evaluateAttributeExpressions().getValue();
+        if (this.dateFormat == null) {
+            this.dateFormat = RecordFieldType.DATE.getDefaultFormat();
+        }
+        this.timeFormat = context.getProperty(TIME_FORMAT).evaluateAttributeExpressions().getValue();
+        if (this.timeFormat == null) {
+            this.timeFormat = RecordFieldType.TIME.getDefaultFormat();
+        }
+        this.timestampFormat = context.getProperty(TIMESTAMP_FORMAT).evaluateAttributeExpressions().getValue();
+        if (this.timestampFormat == null) {
+            this.timestampFormat = RecordFieldType.TIMESTAMP.getDefaultFormat();
+        }
     }
 
     @Override
@@ -486,7 +537,7 @@ private void writeValue(final JsonGenerator generator, final Object value, final
 
         switch (chosenDataType.getFieldType()) {
             case DATE: {
-                final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat()));
+                final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(this.dateFormat));
                 if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
                     generator.writeNumber(DataTypeUtils.toLong(coercedValue, fieldName));
                 } else {
@@ -495,7 +546,7 @@ private void writeValue(final JsonGenerator generator, final Object value, final
                 break;
             }
             case TIME: {
-                final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.TIME.getDefaultFormat()));
+                final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(this.timeFormat));
                 if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
                     generator.writeNumber(DataTypeUtils.toLong(coercedValue, fieldName));
                 } else {
@@ -504,7 +555,7 @@ private void writeValue(final JsonGenerator generator, final Object value, final
                 break;
             }
             case TIMESTAMP: {
-                final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat()));
+                final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(this.timestampFormat));
                 if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
                     generator.writeNumber(DataTypeUtils.toLong(coercedValue, fieldName));
                 } else {
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
index 2cc16c1aba..992e6159bc 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
@@ -41,6 +41,9 @@
 
 import java.io.IOException;
 import java.net.ConnectException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -68,18 +71,30 @@ public void testPutElasticSearchOnTriggerIndex() throws IOException {
             assertEquals(1, record.get("id"));
             assertEquals("reç1", record.get("name"));
             assertEquals(101, record.get("code"));
+            assertEquals("20/12/2018", record.get("date"));
+            assertEquals("6:55 PM", record.get("time"));
+            assertEquals("20/12/2018 6:55 PM", record.get("ts"));
         }, record -> {
             assertEquals(2, record.get("id"));
             assertEquals("ræc2", record.get("name"));
             assertEquals(102, record.get("code"));
+            assertEquals("20/12/2018", record.get("date"));
+            assertEquals("6:55 PM", record.get("time"));
+            assertEquals("20/12/2018 6:55 PM", record.get("ts"));
         }, record -> {
             assertEquals(3, record.get("id"));
             assertEquals("rèc3", record.get("name"));
             assertEquals(103, record.get("code"));
+            assertEquals("20/12/2018", record.get("date"));
+            assertEquals("6:55 PM", record.get("time"));
+            assertEquals("20/12/2018 6:55 PM", record.get("ts"));
         }, record -> {
             assertEquals(4, record.get("id"));
             assertEquals("rëc4", record.get("name"));
             assertEquals(104, record.get("code"));
+            assertEquals("20/12/2018", record.get("date"));
+            assertEquals("6:55 PM", record.get("time"));
+            assertEquals("20/12/2018 6:55 PM", record.get("ts"));
         });
         runner = TestRunners.newTestRunner(processor); // no failures
         generateTestData();
@@ -88,6 +103,9 @@ public void testPutElasticSearchOnTriggerIndex() throws IOException {
         runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
         runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
         runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
+        runner.setProperty(PutElasticsearchHttpRecord.DATE_FORMAT, "d/M/yyyy");
+        runner.setProperty(PutElasticsearchHttpRecord.TIME_FORMAT, "h:m a");
+        runner.setProperty(PutElasticsearchHttpRecord.TIMESTAMP_FORMAT, "d/M/yyyy h:m a");
 
         runner.enqueue(new byte[0], new HashMap<String, String>() {{
             put("doc_id", "28039652140");
@@ -564,10 +582,13 @@ private void generateTestData() throws IOException {
         parser.addSchemaField("id", RecordFieldType.INT);
         parser.addSchemaField("name", RecordFieldType.STRING);
         parser.addSchemaField("code", RecordFieldType.INT);
-
-        parser.addRecord(1, "reç1", 101);
-        parser.addRecord(2, "ræc2", 102);
-        parser.addRecord(3, "rèc3", 103);
-        parser.addRecord(4, "rëc4", 104);
+        parser.addSchemaField("date", RecordFieldType.DATE);
+        parser.addSchemaField("time", RecordFieldType.TIME);
+        parser.addSchemaField("ts", RecordFieldType.TIMESTAMP);
+
+        parser.addRecord(1, "reç1", 101, new Date(1545282000000L), new Time(68150000), new Timestamp(1545332150000L));
+        parser.addRecord(2, "ræc2", 102, new Date(1545282000000L), new Time(68150000), new Timestamp(1545332150000L));
+        parser.addRecord(3, "rèc3", 103, new Date(1545282000000L), new Time(68150000), new Timestamp(1545332150000L));
+        parser.addRecord(4, "rëc4", 104, new Date(1545282000000L), new Time(68150000), new Timestamp(1545332150000L));
     }
 }
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
index 81916c65fd..affbe11e28 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
@@ -17,11 +17,9 @@
 package org.apache.nifi.processors.hive;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hive.common.util.ShutdownHookManager;
 import org.apache.hive.streaming.ConnectionError;
 import org.apache.hive.streaming.HiveStreamingConnection;
 import org.apache.hive.streaming.InvalidTable;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services