You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "bejancsaba (via GitHub)" <gi...@apache.org> on 2023/04/09 20:42:18 UTC

[GitHub] [nifi] bejancsaba commented on a diff in pull request #7140: NIFI-11402 - PutBigQuery fix for case sensitivity and error handling

bejancsaba commented on code in PR #7140:
URL: https://github.com/apache/nifi/pull/7140#discussion_r1161337789


##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java:
##########
@@ -303,6 +301,7 @@ private void finishProcessing(ProcessSession session, FlowFile flowFile, StreamW
             flowFile = session.putAttribute(flowFile, BigQueryAttributes.JOB_NB_RECORDS_ATTR, isBatch() ? "0" : String.valueOf(appendSuccessCount.get() * recordBatchCount));
             session.penalize(flowFile);
             session.transfer(flowFile, REL_FAILURE);
+            error.set(null); // set error to null for next execution

Review Comment:
   Thanks for catching this. What do you think about adding a unit test covering this scenario (failing before the change and passing after it)
   ```
       @Test
       void testNextFlowFileProcessedWhenIntermittentErrorResolved() {
           when(writeClient.createWriteStream(isA(CreateWriteStreamRequest.class))).thenReturn(writeStream);
           TableSchema myTableSchema = mockTableSchema(FIELD_1_NAME, TableFieldSchema.Type.STRING, FIELD_2_NAME, TableFieldSchema.Type.STRING);
           when(writeStream.getTableSchema()).thenReturn(myTableSchema);
           when(streamWriter.append(isA(ProtoRows.class), isA(Long.class)))
               .thenReturn(ApiFutures.immediateFailedFuture(new StatusRuntimeException(Status.INTERNAL)))
               .thenReturn(ApiFutures.immediateFuture(AppendRowsResponse.newBuilder().setAppendResult(mock(AppendRowsResponse.AppendResult.class)).build()));
   
           runner.setProperty(PutBigQuery.RETRY_COUNT, "0");
   
           runner.enqueue(csvContentWithLines(1));
           runner.enqueue(csvContentWithLines(1));
           runner.run(2);
   
           verify(streamWriter, times(2)).append(any(ProtoRows.class), anyLong());
   
           runner.assertQueueEmpty();
           runner.assertTransferCount(PutBigQuery.REL_FAILURE, 1);
           runner.assertTransferCount(PutBigQuery.REL_SUCCESS, 1);
       }
   ```



##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java:
##########
@@ -434,6 +433,9 @@ private static Map<String, Object> convertMapRecord(Map<String, Object> map) {
         Map<String, Object> result = new HashMap<>();
         for (String key : map.keySet()) {
             Object obj = map.get(key);
+            // BigQuery is not case sensitive on the column names but the protobuf message
+            // expect all column names to be lower case
+            key = key.toLowerCase();

Review Comment:
   This is good to know. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org