You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by "McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)" <ch...@hpe.com> on 2016/04/13 20:54:18 UTC

NPE in PutKafka processors

Hi!

I’m running a 0.7.0 snapshot version and am running into an NPE with the PutKafka processor.

I’m thinking Oleg would be interested in this.

java.lang.NullPointerException: null
at java.lang.String.<init>(String.java:503) ~[na:1.8.0_45]
at org.apache.nifi.processors.kafka.PutKafka.buildFailedFlowFileAttributes(PutKafka.java:396) ~[na:na]
at org.apache.nifi.processors.kafka.PutKafka.onTrigger(PutKafka.java:308) ~[na:na]
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) ~[nifi-api-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1059) ~[nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_45]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_45]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_45]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_45]

Thanks,

Chris

Re: NPE in PutKafka processors

Posted by Joe Witt <jo...@gmail.com>.
Chris,

That line is 'attributes.put(ATTR_DELIMITER, new
String(messageContext.getDelimiterBytes(), StandardCharsets.UTF_8));'

Therefore it appears to assume there is a delimiter which is not
correct and should be guarded.  Here is the JIRA for it.  In your use
case you might be able to get away with using a bogus delimiter of say
\n to work around this.  Here is a JIRA for it
https://issues.apache.org/jira/browse/NIFI-1764

Thanks
JOe

On Wed, Apr 13, 2016 at 2:54 PM, McDermott, Chris Kevin (MSDU -
STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
> Hi!
>
> I’m running a 0.7.0 snapshot version and am running into an NPE with the PutKafka processor.
>
> I’m thinking Oleg would be interested in this.
>
> java.lang.NullPointerException: null
> at java.lang.String.<init>(String.java:503) ~[na:1.8.0_45]
> at org.apache.nifi.processors.kafka.PutKafka.buildFailedFlowFileAttributes(PutKafka.java:396) ~[na:na]
> at org.apache.nifi.processors.kafka.PutKafka.onTrigger(PutKafka.java:308) ~[na:na]
> at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) ~[nifi-api-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
> at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1059) ~[nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
> at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
> at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
> at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_45]
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_45]
> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_45]
> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_45]
>
> Thanks,
>
> Chris

Re: NPE in PutKafka processors

Posted by "McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)" <ch...@hpe.com>.
Here is a probable patch. I’ve tested it and it seems to do the trick.

diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
index 3b5eb4f..19e06ea 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
@@ -1,4 +1,5 @@
 /*
+
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -393,7 +394,7 @@ public class PutKafka extends AbstractProcessor {
         attributes.put(ATTR_FAILED_SEGMENTS, new String(failedSegments.toByteArray(), StandardCharsets.UTF_8));
         attributes.put(ATTR_TOPIC, messageContext.getTopicName());
         attributes.put(ATTR_KEY, messageContext.getKeyBytesAsString());
-        attributes.put(ATTR_DELIMITER, new String(messageContext.getDelimiterBytes(), StandardCharsets.UTF_8));
+        attributes.put(ATTR_DELIMITER, messageContext.getDelimeterBytesAsString());
         return attributes;
     }

diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/SplittableMessageContext.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/SplittableMessageContext.java
index d5f1c0b..4d63b7b 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/SplittableMessageContext.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/SplittableMessageContext.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.processors.kafka;

+import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.BitSet;

@@ -108,6 +109,13 @@ final class SplittableMessageContext {
     }

     /**
+     * Returns the delimiter bytes as String
+     */
+    String getDelimeterBytesAsString() {
+        return this.delimiterBytes != null ? new String(this.delimiterBytes, StandardCharsets.UTF_8) : null;
+    }
+
+    /**
      * Returns the key bytes as String
      */
     String getKeyBytesAsString() {







On 4/13/16, 2:54 PM, "McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)" <ch...@hpe.com> wrote:

>Hi!
>
>I’m running a 0.7.0 snapshot version and am running into an NPE with the PutKafka processor.
>
>I’m thinking Oleg would be interested in this.
>
>java.lang.NullPointerException: null
>at java.lang.String.<init>(String.java:503) ~[na:1.8.0_45]
>at org.apache.nifi.processors.kafka.PutKafka.buildFailedFlowFileAttributes(PutKafka.java:396) ~[na:na]
>at org.apache.nifi.processors.kafka.PutKafka.onTrigger(PutKafka.java:308) ~[na:na]
>at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) ~[nifi-api-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1059) ~[nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_45]
>at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_45]
>at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_45]
>at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_45]
>
>Thanks,
>
>Chris