You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/02/13 15:27:41 UTC

[GitHub] [nifi] granthenke opened a new pull request #4053: NIFI-7142: Automatically handle schema drift in the PutKudu processor

granthenke opened a new pull request #4053: NIFI-7142: Automatically handle schema drift in the PutKudu processor
URL: https://github.com/apache/nifi/pull/4053
 
 
   Adds a boolean property to the PutKudu processor to optionally
   enable automatic schema drift handling.
   
   If set to true, when fields with names that are not in the target
   Kudu table are encountered, the Kudu table will be altered to
   include new columns for those fields.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] granthenke commented on a change in pull request #4053: NIFI-7142: Automatically handle schema drift in the PutKudu processor

Posted by GitBox <gi...@apache.org>.
granthenke commented on a change in pull request #4053: NIFI-7142: Automatically handle schema drift in the PutKudu processor
URL: https://github.com/apache/nifi/pull/4053#discussion_r379713014
 
 

 ##########
 File path: nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
 ##########
 @@ -251,16 +267,50 @@ private void trigger(final ProcessContext context, final ProcessSession session,
         OperationType prevOperationType = OperationType.INSERT;
         final List<RowError> pendingRowErrors = new ArrayList<>();
         for (FlowFile flowFile : flowFiles) {
-            final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
-            final OperationType operationType = OperationType.valueOf(context.getProperty(INSERT_OPERATION).evaluateAttributeExpressions(flowFile).getValue());
-            final Boolean ignoreNull = Boolean.valueOf(context.getProperty(IGNORE_NULL).evaluateAttributeExpressions(flowFile).getValue());
-            final Boolean lowercaseFields = Boolean.valueOf(context.getProperty(LOWERCASE_FIELD_NAMES).evaluateAttributeExpressions(flowFile).getValue());
-
             try (final InputStream in = session.read(flowFile);
                 final RecordReader recordReader = recordReaderFactory.createRecordReader(flowFile, in, getLogger())) {
+
+                final String tableName = getEvaluatedProperty(TABLE_NAME, context, flowFile);
+                final OperationType operationType = OperationType.valueOf(getEvaluatedProperty(INSERT_OPERATION, context, flowFile));
+                final Boolean ignoreNull = Boolean.valueOf(getEvaluatedProperty(IGNORE_NULL, context, flowFile));
+                final Boolean lowercaseFields = Boolean.valueOf(getEvaluatedProperty(LOWERCASE_FIELD_NAMES, context, flowFile));
+                final Boolean handleSchemaDrift = Boolean.valueOf(getEvaluatedProperty(HANDLE_SCHEMA_DRIFT, context, flowFile));
+
                 final RecordSet recordSet = recordReader.createRecordSet();
                 final List<String> fieldNames = recordReader.getSchema().getFieldNames();
-                final KuduTable kuduTable = kuduClient.openTable(tableName);
+                KuduTable kuduTable = kuduClient.openTable(tableName);
+
+                // If handleSchemaDrift is true, check for any missing columns and alter the Kudu table to add them.
+                if (handleSchemaDrift) {
+                    final Schema schema = kuduTable.getSchema();
+                    Stream<RecordField> fields = recordReader.getSchema().getFields().stream();
+                    List<RecordField> missing = fields.filter(field -> !schema.hasColumn(
+                            lowercaseFields ? field.getFieldName().toLowerCase() : field.getFieldName()))
+                            .collect(Collectors.toList());
+                    if (!missing.isEmpty()) {
+                        getLogger().info("adding {} columns to table '{}' to handle schema drift",
+                                new Object[]{missing.size(), tableName});
+                        AlterTableOptions alter = new AlterTableOptions();
+                        for (RecordField field : missing) {
+                            String columnName = lowercaseFields ? field.getFieldName().toLowerCase() : field.getFieldName();
+                            alter.addNullableColumn(columnName, toKuduType(field.getDataType()));
+                        }
+                        try {
+                            kuduClient.alterTable(tableName, alter);
+                        } catch (KuduException e) {
 
 Review comment:
   That's a good edge case. I think FF2 would fail in that case. The only way around that without server side changes to Kudu is to submit the alter requests 1 column at a time. It would be a bit slower in the case of multiple new columns, but I can adjust this to do that. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] pvillard31 commented on a change in pull request #4053: NIFI-7142: Automatically handle schema drift in the PutKudu processor

Posted by GitBox <gi...@apache.org>.
pvillard31 commented on a change in pull request #4053: NIFI-7142: Automatically handle schema drift in the PutKudu processor
URL: https://github.com/apache/nifi/pull/4053#discussion_r379702363
 
 

 ##########
 File path: nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
 ##########
 @@ -251,16 +267,50 @@ private void trigger(final ProcessContext context, final ProcessSession session,
         OperationType prevOperationType = OperationType.INSERT;
         final List<RowError> pendingRowErrors = new ArrayList<>();
         for (FlowFile flowFile : flowFiles) {
-            final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
-            final OperationType operationType = OperationType.valueOf(context.getProperty(INSERT_OPERATION).evaluateAttributeExpressions(flowFile).getValue());
-            final Boolean ignoreNull = Boolean.valueOf(context.getProperty(IGNORE_NULL).evaluateAttributeExpressions(flowFile).getValue());
-            final Boolean lowercaseFields = Boolean.valueOf(context.getProperty(LOWERCASE_FIELD_NAMES).evaluateAttributeExpressions(flowFile).getValue());
-
             try (final InputStream in = session.read(flowFile);
                 final RecordReader recordReader = recordReaderFactory.createRecordReader(flowFile, in, getLogger())) {
+
+                final String tableName = getEvaluatedProperty(TABLE_NAME, context, flowFile);
+                final OperationType operationType = OperationType.valueOf(getEvaluatedProperty(INSERT_OPERATION, context, flowFile));
+                final Boolean ignoreNull = Boolean.valueOf(getEvaluatedProperty(IGNORE_NULL, context, flowFile));
+                final Boolean lowercaseFields = Boolean.valueOf(getEvaluatedProperty(LOWERCASE_FIELD_NAMES, context, flowFile));
+                final Boolean handleSchemaDrift = Boolean.valueOf(getEvaluatedProperty(HANDLE_SCHEMA_DRIFT, context, flowFile));
+
                 final RecordSet recordSet = recordReader.createRecordSet();
                 final List<String> fieldNames = recordReader.getSchema().getFieldNames();
-                final KuduTable kuduTable = kuduClient.openTable(tableName);
+                KuduTable kuduTable = kuduClient.openTable(tableName);
+
+                // If handleSchemaDrift is true, check for any missing columns and alter the Kudu table to add them.
+                if (handleSchemaDrift) {
+                    final Schema schema = kuduTable.getSchema();
+                    Stream<RecordField> fields = recordReader.getSchema().getFields().stream();
+                    List<RecordField> missing = fields.filter(field -> !schema.hasColumn(
+                            lowercaseFields ? field.getFieldName().toLowerCase() : field.getFieldName()))
+                            .collect(Collectors.toList());
+                    if (!missing.isEmpty()) {
+                        getLogger().info("adding {} columns to table '{}' to handle schema drift",
+                                new Object[]{missing.size(), tableName});
+                        AlterTableOptions alter = new AlterTableOptions();
+                        for (RecordField field : missing) {
+                            String columnName = lowercaseFields ? field.getFieldName().toLowerCase() : field.getFieldName();
+                            alter.addNullableColumn(columnName, toKuduType(field.getDataType()));
+                        }
+                        try {
+                            kuduClient.alterTable(tableName, alter);
+                        } catch (KuduException e) {
 
 Review comment:
   What happens if table is with columns {a, b}
   Then flow file FF1 comes with {a, b, c}
   And another FF2 comes with {a, b, c, d}
   And there is multi-threading.
   So FF1 got processed and added column c to the table. But FF2 started being process with only {a, b} as schema. So the ``alter`` object will contain the addition of {c, d}. When the exception is thrown because c already exists, did it add the column d, or not?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] pvillard31 commented on a change in pull request #4053: NIFI-7142: Automatically handle schema drift in the PutKudu processor

Posted by GitBox <gi...@apache.org>.
pvillard31 commented on a change in pull request #4053: NIFI-7142: Automatically handle schema drift in the PutKudu processor
URL: https://github.com/apache/nifi/pull/4053#discussion_r379231085
 
 

 ##########
 File path: nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java
 ##########
 @@ -139,6 +139,12 @@ public void testWriteKudu() throws IOException, InitializationException {
         testRunner.setProperty(PutKudu.BATCH_SIZE, "10");
         testRunner.setProperty(PutKudu.FLOWFILE_BATCH_SIZE, "2");
 
+        // Enable lowercase handling.
+        testRunner.setProperty(PutKudu.LOWERCASE_FIELD_NAMES, "true");
+
+        // Enable schema drift handling.
+        testRunner.setProperty(PutKudu.HANDLE_SCHEMA_DRIFT, "true");
 
 Review comment:
   Could be worth doing the test using flow file attributes. Just to check EL evaluation.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] granthenke commented on issue #4053: NIFI-7142: Automatically handle schema drift in the PutKudu processor

Posted by GitBox <gi...@apache.org>.
granthenke commented on issue #4053: NIFI-7142: Automatically handle schema drift in the PutKudu processor
URL: https://github.com/apache/nifi/pull/4053#issuecomment-587554099
 
 
   I rebased onto the latest master changes.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] pvillard31 commented on a change in pull request #4053: NIFI-7142: Automatically handle schema drift in the PutKudu processor

Posted by GitBox <gi...@apache.org>.
pvillard31 commented on a change in pull request #4053: NIFI-7142: Automatically handle schema drift in the PutKudu processor
URL: https://github.com/apache/nifi/pull/4053#discussion_r379230777
 
 

 ##########
 File path: nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
 ##########
 @@ -348,6 +397,14 @@ private void trigger(final ProcessContext context, final ProcessSession session,
         session.adjustCounter("Records Inserted", totalCount, false);
     }
 
+    private String getEvaluatedProperty(PropertyDescriptor property, ProcessContext context, FlowFile flowFile) {
 
 Review comment:
   I believe that with this approach a flow file not having the expected attribute will be sent back to the input relationship and the processor will keep processing the same flow file and failing. It should probably be handled separately and sent to the failure relationship because it'll always fail with this "bad" flow file.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] pvillard31 commented on issue #4053: NIFI-7142: Automatically handle schema drift in the PutKudu processor

Posted by GitBox <gi...@apache.org>.
pvillard31 commented on issue #4053: NIFI-7142: Automatically handle schema drift in the PutKudu processor
URL: https://github.com/apache/nifi/pull/4053#issuecomment-587802184
 
 
   Merged to master, thanks @granthenke 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] granthenke commented on a change in pull request #4053: NIFI-7142: Automatically handle schema drift in the PutKudu processor

Posted by GitBox <gi...@apache.org>.
granthenke commented on a change in pull request #4053: NIFI-7142: Automatically handle schema drift in the PutKudu processor
URL: https://github.com/apache/nifi/pull/4053#discussion_r379440456
 
 

 ##########
 File path: nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
 ##########
 @@ -348,6 +397,14 @@ private void trigger(final ProcessContext context, final ProcessSession session,
         session.adjustCounter("Records Inserted", totalCount, false);
     }
 
+    private String getEvaluatedProperty(PropertyDescriptor property, ProcessContext context, FlowFile flowFile) {
 
 Review comment:
   This method is called in a try/catch that marks the flowfile as failed and then transfers it to the failure relationship. 
   
   ```
   ...
   } catch (Exception ex) {
      flowFileFailures.put(flowFile, ex);
   }             
   ...
   if (flowFileFailures.containsKey(flowFile)) {
      getLogger().error("Failed to write due to {}", new Object[]{flowFileFailures.get(flowFile)});
      session.transfer(flowFile, REL_FAILURE);
   ...
   ```
   
   Does that result in the correct behavior?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] granthenke commented on a change in pull request #4053: NIFI-7142: Automatically handle schema drift in the PutKudu processor

Posted by GitBox <gi...@apache.org>.
granthenke commented on a change in pull request #4053: NIFI-7142: Automatically handle schema drift in the PutKudu processor
URL: https://github.com/apache/nifi/pull/4053#discussion_r379712766
 
 

 ##########
 File path: nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
 ##########
 @@ -251,16 +267,50 @@ private void trigger(final ProcessContext context, final ProcessSession session,
         OperationType prevOperationType = OperationType.INSERT;
         final List<RowError> pendingRowErrors = new ArrayList<>();
         for (FlowFile flowFile : flowFiles) {
-            final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
-            final OperationType operationType = OperationType.valueOf(context.getProperty(INSERT_OPERATION).evaluateAttributeExpressions(flowFile).getValue());
-            final Boolean ignoreNull = Boolean.valueOf(context.getProperty(IGNORE_NULL).evaluateAttributeExpressions(flowFile).getValue());
-            final Boolean lowercaseFields = Boolean.valueOf(context.getProperty(LOWERCASE_FIELD_NAMES).evaluateAttributeExpressions(flowFile).getValue());
-
             try (final InputStream in = session.read(flowFile);
                 final RecordReader recordReader = recordReaderFactory.createRecordReader(flowFile, in, getLogger())) {
+
+                final String tableName = getEvaluatedProperty(TABLE_NAME, context, flowFile);
+                final OperationType operationType = OperationType.valueOf(getEvaluatedProperty(INSERT_OPERATION, context, flowFile));
+                final Boolean ignoreNull = Boolean.valueOf(getEvaluatedProperty(IGNORE_NULL, context, flowFile));
+                final Boolean lowercaseFields = Boolean.valueOf(getEvaluatedProperty(LOWERCASE_FIELD_NAMES, context, flowFile));
+                final Boolean handleSchemaDrift = Boolean.valueOf(getEvaluatedProperty(HANDLE_SCHEMA_DRIFT, context, flowFile));
+
                 final RecordSet recordSet = recordReader.createRecordSet();
                 final List<String> fieldNames = recordReader.getSchema().getFieldNames();
-                final KuduTable kuduTable = kuduClient.openTable(tableName);
+                KuduTable kuduTable = kuduClient.openTable(tableName);
+
+                // If handleSchemaDrift is true, check for any missing columns and alter the Kudu table to add them.
+                if (handleSchemaDrift) {
+                    final Schema schema = kuduTable.getSchema();
+                    Stream<RecordField> fields = recordReader.getSchema().getFields().stream();
+                    List<RecordField> missing = fields.filter(field -> !schema.hasColumn(
+                            lowercaseFields ? field.getFieldName().toLowerCase() : field.getFieldName()))
+                            .collect(Collectors.toList());
+                    if (!missing.isEmpty()) {
+                        getLogger().info("adding {} columns to table '{}' to handle schema drift",
+                                new Object[]{missing.size(), tableName});
+                        AlterTableOptions alter = new AlterTableOptions();
+                        for (RecordField field : missing) {
+                            String columnName = lowercaseFields ? field.getFieldName().toLowerCase() : field.getFieldName();
+                            alter.addNullableColumn(columnName, toKuduType(field.getDataType()));
+                        }
+                        try {
+                            kuduClient.alterTable(tableName, alter);
+                        } catch (KuduException e) {
+                            // Ignore the exception if the column already exists due to concurrent
+                            // threads or applications attempting to handle schema drift.
+                            if (e.getStatus().isAlreadyPresent()) {
+                                getLogger().info("column already exists in table ' {}' while handling schema drift",
 
 Review comment:
   I had considered that, but decided against it in case there were any security concerns with that type of content being logged. This shouldn't be an issue which someone needs to track down and if it is, they should be able to debug it by cross referencing the fields and the table schema themselves. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] pvillard31 commented on a change in pull request #4053: NIFI-7142: Automatically handle schema drift in the PutKudu processor

Posted by GitBox <gi...@apache.org>.
pvillard31 commented on a change in pull request #4053: NIFI-7142: Automatically handle schema drift in the PutKudu processor
URL: https://github.com/apache/nifi/pull/4053#discussion_r379099330
 
 

 ##########
 File path: nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
 ##########
 @@ -108,6 +113,17 @@
             .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
             .build();
 
+    protected static final PropertyDescriptor HANDLE_SCHEMA_DRIFT = new Builder()
+            .name("Handle Schema Drift")
+            .description("If set to true, when fields with names that are not in the target Kudu table " +
+                    "are encountered, the Kudu table will be altered to include new columns for those fields.")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
 
 Review comment:
   Unfortunately it's not possible to support EL evaluation with allowable values. Unless you specify a specific attribute name in the allowable values.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] pvillard31 commented on a change in pull request #4053: NIFI-7142: Automatically handle schema drift in the PutKudu processor

Posted by GitBox <gi...@apache.org>.
pvillard31 commented on a change in pull request #4053: NIFI-7142: Automatically handle schema drift in the PutKudu processor
URL: https://github.com/apache/nifi/pull/4053#discussion_r379702397
 
 

 ##########
 File path: nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
 ##########
 @@ -251,16 +267,50 @@ private void trigger(final ProcessContext context, final ProcessSession session,
         OperationType prevOperationType = OperationType.INSERT;
         final List<RowError> pendingRowErrors = new ArrayList<>();
         for (FlowFile flowFile : flowFiles) {
-            final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
-            final OperationType operationType = OperationType.valueOf(context.getProperty(INSERT_OPERATION).evaluateAttributeExpressions(flowFile).getValue());
-            final Boolean ignoreNull = Boolean.valueOf(context.getProperty(IGNORE_NULL).evaluateAttributeExpressions(flowFile).getValue());
-            final Boolean lowercaseFields = Boolean.valueOf(context.getProperty(LOWERCASE_FIELD_NAMES).evaluateAttributeExpressions(flowFile).getValue());
-
             try (final InputStream in = session.read(flowFile);
                 final RecordReader recordReader = recordReaderFactory.createRecordReader(flowFile, in, getLogger())) {
+
+                final String tableName = getEvaluatedProperty(TABLE_NAME, context, flowFile);
+                final OperationType operationType = OperationType.valueOf(getEvaluatedProperty(INSERT_OPERATION, context, flowFile));
+                final Boolean ignoreNull = Boolean.valueOf(getEvaluatedProperty(IGNORE_NULL, context, flowFile));
+                final Boolean lowercaseFields = Boolean.valueOf(getEvaluatedProperty(LOWERCASE_FIELD_NAMES, context, flowFile));
+                final Boolean handleSchemaDrift = Boolean.valueOf(getEvaluatedProperty(HANDLE_SCHEMA_DRIFT, context, flowFile));
+
                 final RecordSet recordSet = recordReader.createRecordSet();
                 final List<String> fieldNames = recordReader.getSchema().getFieldNames();
-                final KuduTable kuduTable = kuduClient.openTable(tableName);
+                KuduTable kuduTable = kuduClient.openTable(tableName);
+
+                // If handleSchemaDrift is true, check for any missing columns and alter the Kudu table to add them.
+                if (handleSchemaDrift) {
+                    final Schema schema = kuduTable.getSchema();
+                    Stream<RecordField> fields = recordReader.getSchema().getFields().stream();
+                    List<RecordField> missing = fields.filter(field -> !schema.hasColumn(
+                            lowercaseFields ? field.getFieldName().toLowerCase() : field.getFieldName()))
+                            .collect(Collectors.toList());
+                    if (!missing.isEmpty()) {
+                        getLogger().info("adding {} columns to table '{}' to handle schema drift",
+                                new Object[]{missing.size(), tableName});
+                        AlterTableOptions alter = new AlterTableOptions();
+                        for (RecordField field : missing) {
+                            String columnName = lowercaseFields ? field.getFieldName().toLowerCase() : field.getFieldName();
+                            alter.addNullableColumn(columnName, toKuduType(field.getDataType()));
+                        }
+                        try {
+                            kuduClient.alterTable(tableName, alter);
+                        } catch (KuduException e) {
+                            // Ignore the exception if the column already exists due to concurrent
+                            // threads or applications attempting to handle schema drift.
+                            if (e.getStatus().isAlreadyPresent()) {
+                                getLogger().info("column already exists in table ' {}' while handling schema drift",
 
 Review comment:
   ```suggestion
                                   getLogger().info("column already exists in table '{}' while handling schema drift",
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] pvillard31 commented on a change in pull request #4053: NIFI-7142: Automatically handle schema drift in the PutKudu processor

Posted by GitBox <gi...@apache.org>.
pvillard31 commented on a change in pull request #4053: NIFI-7142: Automatically handle schema drift in the PutKudu processor
URL: https://github.com/apache/nifi/pull/4053#discussion_r379715615
 
 

 ##########
 File path: nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
 ##########
 @@ -251,16 +267,50 @@ private void trigger(final ProcessContext context, final ProcessSession session,
         OperationType prevOperationType = OperationType.INSERT;
         final List<RowError> pendingRowErrors = new ArrayList<>();
         for (FlowFile flowFile : flowFiles) {
-            final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
-            final OperationType operationType = OperationType.valueOf(context.getProperty(INSERT_OPERATION).evaluateAttributeExpressions(flowFile).getValue());
-            final Boolean ignoreNull = Boolean.valueOf(context.getProperty(IGNORE_NULL).evaluateAttributeExpressions(flowFile).getValue());
-            final Boolean lowercaseFields = Boolean.valueOf(context.getProperty(LOWERCASE_FIELD_NAMES).evaluateAttributeExpressions(flowFile).getValue());
-
             try (final InputStream in = session.read(flowFile);
                 final RecordReader recordReader = recordReaderFactory.createRecordReader(flowFile, in, getLogger())) {
+
+                final String tableName = getEvaluatedProperty(TABLE_NAME, context, flowFile);
+                final OperationType operationType = OperationType.valueOf(getEvaluatedProperty(INSERT_OPERATION, context, flowFile));
+                final Boolean ignoreNull = Boolean.valueOf(getEvaluatedProperty(IGNORE_NULL, context, flowFile));
+                final Boolean lowercaseFields = Boolean.valueOf(getEvaluatedProperty(LOWERCASE_FIELD_NAMES, context, flowFile));
+                final Boolean handleSchemaDrift = Boolean.valueOf(getEvaluatedProperty(HANDLE_SCHEMA_DRIFT, context, flowFile));
+
                 final RecordSet recordSet = recordReader.createRecordSet();
                 final List<String> fieldNames = recordReader.getSchema().getFieldNames();
-                final KuduTable kuduTable = kuduClient.openTable(tableName);
+                KuduTable kuduTable = kuduClient.openTable(tableName);
+
+                // If handleSchemaDrift is true, check for any missing columns and alter the Kudu table to add them.
+                if (handleSchemaDrift) {
+                    final Schema schema = kuduTable.getSchema();
+                    Stream<RecordField> fields = recordReader.getSchema().getFields().stream();
+                    List<RecordField> missing = fields.filter(field -> !schema.hasColumn(
+                            lowercaseFields ? field.getFieldName().toLowerCase() : field.getFieldName()))
+                            .collect(Collectors.toList());
+                    if (!missing.isEmpty()) {
+                        getLogger().info("adding {} columns to table '{}' to handle schema drift",
+                                new Object[]{missing.size(), tableName});
+                        AlterTableOptions alter = new AlterTableOptions();
+                        for (RecordField field : missing) {
+                            String columnName = lowercaseFields ? field.getFieldName().toLowerCase() : field.getFieldName();
+                            alter.addNullableColumn(columnName, toKuduType(field.getDataType()));
+                        }
+                        try {
+                            kuduClient.alterTable(tableName, alter);
+                        } catch (KuduException e) {
+                            // Ignore the exception if the column already exists due to concurrent
+                            // threads or applications attempting to handle schema drift.
+                            if (e.getStatus().isAlreadyPresent()) {
+                                getLogger().info("column already exists in table ' {}' while handling schema drift",
 
 Review comment:
   You can still commit the change suggested above (removing a whitespace) if that's a typo.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] pvillard31 commented on a change in pull request #4053: NIFI-7142: Automatically handle schema drift in the PutKudu processor

Posted by GitBox <gi...@apache.org>.
pvillard31 commented on a change in pull request #4053: NIFI-7142: Automatically handle schema drift in the PutKudu processor
URL: https://github.com/apache/nifi/pull/4053#discussion_r379702535
 
 

 ##########
 File path: nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
 ##########
 @@ -251,16 +267,50 @@ private void trigger(final ProcessContext context, final ProcessSession session,
         OperationType prevOperationType = OperationType.INSERT;
         final List<RowError> pendingRowErrors = new ArrayList<>();
         for (FlowFile flowFile : flowFiles) {
-            final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
-            final OperationType operationType = OperationType.valueOf(context.getProperty(INSERT_OPERATION).evaluateAttributeExpressions(flowFile).getValue());
-            final Boolean ignoreNull = Boolean.valueOf(context.getProperty(IGNORE_NULL).evaluateAttributeExpressions(flowFile).getValue());
-            final Boolean lowercaseFields = Boolean.valueOf(context.getProperty(LOWERCASE_FIELD_NAMES).evaluateAttributeExpressions(flowFile).getValue());
-
             try (final InputStream in = session.read(flowFile);
                 final RecordReader recordReader = recordReaderFactory.createRecordReader(flowFile, in, getLogger())) {
+
+                final String tableName = getEvaluatedProperty(TABLE_NAME, context, flowFile);
+                final OperationType operationType = OperationType.valueOf(getEvaluatedProperty(INSERT_OPERATION, context, flowFile));
+                final Boolean ignoreNull = Boolean.valueOf(getEvaluatedProperty(IGNORE_NULL, context, flowFile));
+                final Boolean lowercaseFields = Boolean.valueOf(getEvaluatedProperty(LOWERCASE_FIELD_NAMES, context, flowFile));
+                final Boolean handleSchemaDrift = Boolean.valueOf(getEvaluatedProperty(HANDLE_SCHEMA_DRIFT, context, flowFile));
+
                 final RecordSet recordSet = recordReader.createRecordSet();
                 final List<String> fieldNames = recordReader.getSchema().getFieldNames();
-                final KuduTable kuduTable = kuduClient.openTable(tableName);
+                KuduTable kuduTable = kuduClient.openTable(tableName);
+
+                // If handleSchemaDrift is true, check for any missing columns and alter the Kudu table to add them.
+                if (handleSchemaDrift) {
+                    final Schema schema = kuduTable.getSchema();
+                    Stream<RecordField> fields = recordReader.getSchema().getFields().stream();
+                    List<RecordField> missing = fields.filter(field -> !schema.hasColumn(
+                            lowercaseFields ? field.getFieldName().toLowerCase() : field.getFieldName()))
+                            .collect(Collectors.toList());
+                    if (!missing.isEmpty()) {
+                        getLogger().info("adding {} columns to table '{}' to handle schema drift",
+                                new Object[]{missing.size(), tableName});
+                        AlterTableOptions alter = new AlterTableOptions();
+                        for (RecordField field : missing) {
+                            String columnName = lowercaseFields ? field.getFieldName().toLowerCase() : field.getFieldName();
+                            alter.addNullableColumn(columnName, toKuduType(field.getDataType()));
+                        }
+                        try {
+                            kuduClient.alterTable(tableName, alter);
+                        } catch (KuduException e) {
+                            // Ignore the exception if the column already exists due to concurrent
+                            // threads or applications attempting to handle schema drift.
+                            if (e.getStatus().isAlreadyPresent()) {
+                                getLogger().info("column already exists in table ' {}' while handling schema drift",
 
 Review comment:
   Is there a way to add which column(s) already exist(s) in the log message? Would it make sense?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] pvillard31 commented on a change in pull request #4053: NIFI-7142: Automatically handle schema drift in the PutKudu processor

Posted by GitBox <gi...@apache.org>.
pvillard31 commented on a change in pull request #4053: NIFI-7142: Automatically handle schema drift in the PutKudu processor
URL: https://github.com/apache/nifi/pull/4053#discussion_r379715527
 
 

 ##########
 File path: nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
 ##########
 @@ -251,16 +267,50 @@ private void trigger(final ProcessContext context, final ProcessSession session,
         OperationType prevOperationType = OperationType.INSERT;
         final List<RowError> pendingRowErrors = new ArrayList<>();
         for (FlowFile flowFile : flowFiles) {
-            final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
-            final OperationType operationType = OperationType.valueOf(context.getProperty(INSERT_OPERATION).evaluateAttributeExpressions(flowFile).getValue());
-            final Boolean ignoreNull = Boolean.valueOf(context.getProperty(IGNORE_NULL).evaluateAttributeExpressions(flowFile).getValue());
-            final Boolean lowercaseFields = Boolean.valueOf(context.getProperty(LOWERCASE_FIELD_NAMES).evaluateAttributeExpressions(flowFile).getValue());
-
             try (final InputStream in = session.read(flowFile);
                 final RecordReader recordReader = recordReaderFactory.createRecordReader(flowFile, in, getLogger())) {
+
+                final String tableName = getEvaluatedProperty(TABLE_NAME, context, flowFile);
+                final OperationType operationType = OperationType.valueOf(getEvaluatedProperty(INSERT_OPERATION, context, flowFile));
+                final Boolean ignoreNull = Boolean.valueOf(getEvaluatedProperty(IGNORE_NULL, context, flowFile));
+                final Boolean lowercaseFields = Boolean.valueOf(getEvaluatedProperty(LOWERCASE_FIELD_NAMES, context, flowFile));
+                final Boolean handleSchemaDrift = Boolean.valueOf(getEvaluatedProperty(HANDLE_SCHEMA_DRIFT, context, flowFile));
+
                 final RecordSet recordSet = recordReader.createRecordSet();
                 final List<String> fieldNames = recordReader.getSchema().getFieldNames();
-                final KuduTable kuduTable = kuduClient.openTable(tableName);
+                KuduTable kuduTable = kuduClient.openTable(tableName);
+
+                // If handleSchemaDrift is true, check for any missing columns and alter the Kudu table to add them.
+                if (handleSchemaDrift) {
+                    final Schema schema = kuduTable.getSchema();
+                    Stream<RecordField> fields = recordReader.getSchema().getFields().stream();
+                    List<RecordField> missing = fields.filter(field -> !schema.hasColumn(
+                            lowercaseFields ? field.getFieldName().toLowerCase() : field.getFieldName()))
+                            .collect(Collectors.toList());
+                    if (!missing.isEmpty()) {
+                        getLogger().info("adding {} columns to table '{}' to handle schema drift",
+                                new Object[]{missing.size(), tableName});
+                        AlterTableOptions alter = new AlterTableOptions();
+                        for (RecordField field : missing) {
+                            String columnName = lowercaseFields ? field.getFieldName().toLowerCase() : field.getFieldName();
+                            alter.addNullableColumn(columnName, toKuduType(field.getDataType()));
+                        }
+                        try {
+                            kuduClient.alterTable(tableName, alter);
+                        } catch (KuduException e) {
 
 Review comment:
   Adding new columns should a very rare event, right? Adding one column at a time should be OK. And in case the column already exists, we just move on to the next one in the list without failing.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] pvillard31 commented on a change in pull request #4053: NIFI-7142: Automatically handle schema drift in the PutKudu processor

Posted by GitBox <gi...@apache.org>.
pvillard31 commented on a change in pull request #4053: NIFI-7142: Automatically handle schema drift in the PutKudu processor
URL: https://github.com/apache/nifi/pull/4053#discussion_r379715476
 
 

 ##########
 File path: nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
 ##########
 @@ -251,16 +267,50 @@ private void trigger(final ProcessContext context, final ProcessSession session,
         OperationType prevOperationType = OperationType.INSERT;
         final List<RowError> pendingRowErrors = new ArrayList<>();
         for (FlowFile flowFile : flowFiles) {
-            final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
-            final OperationType operationType = OperationType.valueOf(context.getProperty(INSERT_OPERATION).evaluateAttributeExpressions(flowFile).getValue());
-            final Boolean ignoreNull = Boolean.valueOf(context.getProperty(IGNORE_NULL).evaluateAttributeExpressions(flowFile).getValue());
-            final Boolean lowercaseFields = Boolean.valueOf(context.getProperty(LOWERCASE_FIELD_NAMES).evaluateAttributeExpressions(flowFile).getValue());
-
             try (final InputStream in = session.read(flowFile);
                 final RecordReader recordReader = recordReaderFactory.createRecordReader(flowFile, in, getLogger())) {
+
+                final String tableName = getEvaluatedProperty(TABLE_NAME, context, flowFile);
+                final OperationType operationType = OperationType.valueOf(getEvaluatedProperty(INSERT_OPERATION, context, flowFile));
+                final Boolean ignoreNull = Boolean.valueOf(getEvaluatedProperty(IGNORE_NULL, context, flowFile));
+                final Boolean lowercaseFields = Boolean.valueOf(getEvaluatedProperty(LOWERCASE_FIELD_NAMES, context, flowFile));
+                final Boolean handleSchemaDrift = Boolean.valueOf(getEvaluatedProperty(HANDLE_SCHEMA_DRIFT, context, flowFile));
+
                 final RecordSet recordSet = recordReader.createRecordSet();
                 final List<String> fieldNames = recordReader.getSchema().getFieldNames();
-                final KuduTable kuduTable = kuduClient.openTable(tableName);
+                KuduTable kuduTable = kuduClient.openTable(tableName);
+
+                // If handleSchemaDrift is true, check for any missing columns and alter the Kudu table to add them.
+                if (handleSchemaDrift) {
+                    final Schema schema = kuduTable.getSchema();
+                    Stream<RecordField> fields = recordReader.getSchema().getFields().stream();
+                    List<RecordField> missing = fields.filter(field -> !schema.hasColumn(
+                            lowercaseFields ? field.getFieldName().toLowerCase() : field.getFieldName()))
+                            .collect(Collectors.toList());
+                    if (!missing.isEmpty()) {
+                        getLogger().info("adding {} columns to table '{}' to handle schema drift",
+                                new Object[]{missing.size(), tableName});
+                        AlterTableOptions alter = new AlterTableOptions();
+                        for (RecordField field : missing) {
+                            String columnName = lowercaseFields ? field.getFieldName().toLowerCase() : field.getFieldName();
+                            alter.addNullableColumn(columnName, toKuduType(field.getDataType()));
+                        }
+                        try {
+                            kuduClient.alterTable(tableName, alter);
+                        } catch (KuduException e) {
+                            // Ignore the exception if the column already exists due to concurrent
+                            // threads or applications attempting to handle schema drift.
+                            if (e.getStatus().isAlreadyPresent()) {
+                                getLogger().info("column already exists in table ' {}' while handling schema drift",
 
 Review comment:
   Yeah, that's a legit concern. Let's keep it like this.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] pvillard31 commented on a change in pull request #4053: NIFI-7142: Automatically handle schema drift in the PutKudu processor

Posted by GitBox <gi...@apache.org>.
pvillard31 commented on a change in pull request #4053: NIFI-7142: Automatically handle schema drift in the PutKudu processor
URL: https://github.com/apache/nifi/pull/4053#discussion_r379099968
 
 

 ##########
 File path: nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
 ##########
 @@ -255,12 +272,45 @@ private void trigger(final ProcessContext context, final ProcessSession session,
             final OperationType operationType = OperationType.valueOf(context.getProperty(INSERT_OPERATION).evaluateAttributeExpressions(flowFile).getValue());
             final Boolean ignoreNull = Boolean.valueOf(context.getProperty(IGNORE_NULL).evaluateAttributeExpressions(flowFile).getValue());
             final Boolean lowercaseFields = Boolean.valueOf(context.getProperty(LOWERCASE_FIELD_NAMES).evaluateAttributeExpressions(flowFile).getValue());
+            final Boolean handleSchemaDrift = Boolean.valueOf(context.getProperty(HANDLE_SCHEMA_DRIFT).evaluateAttributeExpressions(flowFile).getValue());
 
 Review comment:
   Same comment applies here. And if you do add a specific attribute to be evaluated against (something like ${handle.schema.drift}), you'll need to check if the attribute does exist in the flow file, otherwise you would have a NPE here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] granthenke commented on a change in pull request #4053: NIFI-7142: Automatically handle schema drift in the PutKudu processor

Posted by GitBox <gi...@apache.org>.
granthenke commented on a change in pull request #4053: NIFI-7142: Automatically handle schema drift in the PutKudu processor
URL: https://github.com/apache/nifi/pull/4053#discussion_r379106414
 
 

 ##########
 File path: nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
 ##########
 @@ -108,6 +113,17 @@
             .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
             .build();
 
+    protected static final PropertyDescriptor HANDLE_SCHEMA_DRIFT = new Builder()
+            .name("Handle Schema Drift")
+            .description("If set to true, when fields with names that are not in the target Kudu table " +
+                    "are encountered, the Kudu table will be altered to include new columns for those fields.")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
 
 Review comment:
   I just learned about this while working on [NIFI-6867](https://issues.apache.org/jira/browse/NIFI-6867). I will remove the expression language support. I don't think it's all that useful on this property.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] asfgit closed pull request #4053: NIFI-7142: Automatically handle schema drift in the PutKudu processor

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #4053: NIFI-7142: Automatically handle schema drift in the PutKudu processor
URL: https://github.com/apache/nifi/pull/4053
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] granthenke commented on a change in pull request #4053: NIFI-7142: Automatically handle schema drift in the PutKudu processor

Posted by GitBox <gi...@apache.org>.
granthenke commented on a change in pull request #4053: NIFI-7142: Automatically handle schema drift in the PutKudu processor
URL: https://github.com/apache/nifi/pull/4053#discussion_r379657582
 
 

 ##########
 File path: nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
 ##########
 @@ -348,6 +397,14 @@ private void trigger(final ProcessContext context, final ProcessSession session,
         session.adjustCounter("Records Inserted", totalCount, false);
     }
 
+    private String getEvaluatedProperty(PropertyDescriptor property, ProcessContext context, FlowFile flowFile) {
 
 Review comment:
   oh, yeah not sure how I missed that. I will move them into the try/catch.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] pvillard31 commented on a change in pull request #4053: NIFI-7142: Automatically handle schema drift in the PutKudu processor

Posted by GitBox <gi...@apache.org>.
pvillard31 commented on a change in pull request #4053: NIFI-7142: Automatically handle schema drift in the PutKudu processor
URL: https://github.com/apache/nifi/pull/4053#discussion_r379616008
 
 

 ##########
 File path: nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
 ##########
 @@ -348,6 +397,14 @@ private void trigger(final ProcessContext context, final ProcessSession session,
         session.adjustCounter("Records Inserted", totalCount, false);
     }
 
+    private String getEvaluatedProperty(PropertyDescriptor property, ProcessContext context, FlowFile flowFile) {
 
 Review comment:
   Hmm this method is called on L270-274 with is outside the ``try {} catch (){}`` starting L276, no?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services