You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "exceptionfactory (via GitHub)" <gi...@apache.org> on 2023/03/08 16:17:42 UTC

[GitHub] [nifi] exceptionfactory commented on a diff in pull request #4901: NIFI-8326: Send records as individual messages in Kafka RecordSinks

exceptionfactory commented on code in PR #4901:
URL: https://github.com/apache/nifi/pull/4901#discussion_r1129694093


##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/record/sink/kafka/KafkaRecordSink_2_6.java:
##########
@@ -226,65 +232,80 @@ public void onEnabled(final ConfigurationContext context) throws InitializationE
     public WriteResult sendData(final RecordSet recordSet, final Map<String, String> attributes, final boolean sendZeroResults) throws IOException {
 
         try {
-            WriteResult writeResult;
             final RecordSchema writeSchema = getWriterFactory().getSchema(null, recordSet.getSchema());
             final ByteArrayOutputStream baos = new ByteArrayOutputStream();
             final ByteCountingOutputStream out = new ByteCountingOutputStream(baos);
             int recordCount = 0;
             try (final RecordSetWriter writer = getWriterFactory().createWriter(getLogger(), writeSchema, out, attributes)) {
-                writer.beginRecordSet();
                 Record record;
                 while ((record = recordSet.next()) != null) {
+                    baos.reset();
+                    out.reset();
                     writer.write(record);
+                    writer.flush();
                     recordCount++;
                     if (out.getBytesWritten() > maxMessageSize) {
-                        throw new TokenTooLargeException("The query's result set size exceeds the maximum allowed message size of " + maxMessageSize + " bytes.");
+                        throw new TokenTooLargeException("A record's size exceeds the maximum allowed message size of " + maxMessageSize + " bytes.");
                     }
+                    sendMessage(topic, baos.toByteArray());
                 }
-                writeResult = writer.finishRecordSet();
                 if (out.getBytesWritten() > maxMessageSize) {
-                    throw new TokenTooLargeException("The query's result set size exceeds the maximum allowed message size of " + maxMessageSize + " bytes.");
+                    throw new TokenTooLargeException("A record's size exceeds the maximum allowed message size of " + maxMessageSize + " bytes.");
                 }
-                recordCount = writeResult.getRecordCount();
 
                 attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
                 attributes.put("record.count", Integer.toString(recordCount));
-                attributes.putAll(writeResult.getAttributes());
             }
 
-            if (recordCount > 0 || sendZeroResults) {
-                final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, null, null, baos.toByteArray());
-                try {
-                    producer.send(record, (metadata, exception) -> {
-                        if (exception != null) {
-                            throw new KafkaSendException(exception);
-                        }
-                    }).get(maxAckWaitMillis, TimeUnit.MILLISECONDS);
-                } catch (KafkaSendException kse) {
-                    Throwable t = kse.getCause();
-                    if (t instanceof IOException) {
-                        throw (IOException) t;
-                    } else {
-                        throw new IOException(t);
-                    }
-                } catch (final InterruptedException e) {
-                    getLogger().warn("Interrupted while waiting for an acknowledgement from Kafka");
-                    Thread.currentThread().interrupt();
-                } catch (final TimeoutException e) {
-                    getLogger().warn("Timed out while waiting for an acknowledgement from Kafka");
+            if (recordCount == 0) {
+                if (sendZeroResults) {
+                    sendMessage(topic, new byte[0]);
+                } else {
+                    return WriteResult.EMPTY;
                 }
-            } else {
-                writeResult = WriteResult.EMPTY;
             }
 
-            return writeResult;
+            acknowledgeTransmission();
+
+            return WriteResult.of(recordCount, attributes);
         } catch (IOException ioe) {
             throw ioe;
         } catch (Exception e) {
             throw new IOException("Failed to write metrics using record writer: " + e.getMessage(), e);
         }
     }
 
+    public void sendMessage(String topic, byte[] payload) throws IOException, ExecutionException {

Review Comment:
   It looks like this method should be `private` or at least `protected` since it is not part of the RecordSink interface. Is there a reason for making it public otherwise?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org