You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by 张博 <jy...@gmail.com> on 2017/09/13 06:15:35 UTC

How to use Drools in the topology

Hi!
Now I want to use Drools in a blot,it works normal in the LocalCluster, but
when I put it to the production cluster,it has error.
The blot:
public class DealLostBolt extends BaseRichBolt {

  private static final long serialVersionUID = 1L;

  private static final Logger LOGGER =
LoggerFactory.getLogger("DEAL_LOST_BOLT");

  private OutputCollector collector;

  private KieSession kieSession;

  private FactHandle factHandle;

  @Override
  public void execute(Tuple input) {
    // 获取数据
    String sentence = (String) input.getValue(0);
    LOGGER.info("DealLostBolt获取到的数据:" + sentence);

    // 数据转换
    PutDataPoint dataPoint = Json.fromJson(PutDataPoint.class, sentence);

    KieServices ks = KieServices.Factory.get();
    KieContainer kieContainer = ks.getKieClasspathContainer();
    kieSession = kieContainer.newKieSession("all-rule");
    kieSession.getAgenda().getAgendaGroup("deal-lost").setFocus();

    factHandle = kieSession.insert(dataPoint);
    kieSession.fireAllRules();
    kieSession.delete(factHandle);

    collector.emit(new Values(sentence));
  }

  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("value"));

  }

  @Override
  public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
    this.collector = collector;
  }

}
The erros:
java.lang.RuntimeException: java.lang.NullPointerException
at
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:495)
~[storm-core-1.1.1.jar:1.1.1]
at
org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:460)
~[storm-core-1.1.1.jar:1.1.1]
at
org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73)
~[storm-core-1.1.1.jar:1.1.1]
at
org.apache.storm.daemon.executor$fn__5030$fn__5043$fn__5096.invoke(executor.clj:848)
~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484)
[storm-core-1.1.1.jar:1.1.1]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
Caused by: java.lang.NullPointerException
at
org.kie.internal.io.ResourceFactory.newByteArrayResource(ResourceFactory.java:66)
~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at
org.drools.compiler.kie.builder.impl.AbstractKieModule.getResource(AbstractKieModule.java:299)
~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at
org.drools.compiler.kie.builder.impl.AbstractKieModule.addResourceToCompiler(AbstractKieModule.java:264)
~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at
org.drools.compiler.kie.builder.impl.AbstractKieModule.addResourceToCompiler(AbstractKieModule.java:259)
~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at
org.drools.compiler.kie.builder.impl.AbstractKieProject.buildKnowledgePackages(AbstractKieProject.java:228)
~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at
org.drools.compiler.kie.builder.impl.AbstractKieModule.createKieBase(AbstractKieModule.java:206)
~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at
org.drools.compiler.kie.builder.impl.KieContainerImpl.createKieBase(KieContainerImpl.java:584)
~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at
org.drools.compiler.kie.builder.impl.KieContainerImpl.getKieBase(KieContainerImpl.java:552)
~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at
org.drools.compiler.kie.builder.impl.KieContainerImpl.newKieSession(KieContainerImpl.java:680)
~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at
org.drools.compiler.kie.builder.impl.KieContainerImpl.newKieSession(KieContainerImpl.java:648)
~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at cn.ennwifi.storm.bolt.DealLostBolt.execute(DealLostBolt.java:52)
~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at
org.apache.storm.daemon.executor$fn__5030$tuple_action_fn__5032.invoke(executor.clj:729)
~[storm-core-1.1.1.jar:1.1.1]
at
org.apache.storm.daemon.executor$mk_task_receiver$fn__4951.invoke(executor.clj:461)
~[storm-core-1.1.1.jar:1.1.1]
at
org.apache.storm.disruptor$clojure_handler$reify__4465.onEvent(disruptor.clj:40)
~[storm-core-1.1.1.jar:1.1.1]
at
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:482)
~[storm-core-1.1.1.jar:1.1.1]
... 6 more

Could somebody help me?

Thanks!

Re: How to use Drools in the topology

Posted by 张博 <jy...@gmail.com>.
I changs the package way,it runs OK.But I don't know why.
The wrong way is follow,and put the jar
"se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar" to production.
                                 <plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<mainClass>cn.ennwifi.storm.Application</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>

Today I changes it to :
                                  <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>target/libs</outputDirectory>
<includeScope>runtime</includeScope>
</configuration>
</execution>
</executions>
</plugin>

Then I put the se-storm-0.0.1-SNAPSHOT.jar to storm/extlib,and put thest
target/libs/*.jar to storm/extlib also. The problem is solved.But why is
that.

I see the directory of the two jar, the *.xml file and  *.drl files that
Drools used are in the same position。

2017-09-15 9:39 GMT+08:00 张博 <jy...@gmail.com>:

> The Drools's version I depend on is 7.2.0.Final.The dependency in pom.xml
> is
> <dependency>
> <groupId>org.drools</groupId>
> <artifactId>drools-compiler</artifactId>
> <version>7.2.0.Final</version>
> </dependency>
>
> And the bolt is:
> public class DealLimitBolt extends BaseRichBolt {
>
>   private OutputCollector collector;
>
>   private KieSession kieSession;
>
>   public void execute(Tuple input) {
>     // 获取数据
>     String sentence = (String) input.getValue(0);
>
>     // 数据转换
>     PutDataPoint dataPoint = Json.fromJson(PutDataPoint.class, sentence);
>
>     // 获取规则
>     String key = String.valueOf(dataPoint.tags.get("gatewayId")) +
> String.valueOf(dataPoint.getTags().get("deviceId"))
>         + dataPoint.getMetric() + "0";
>     RuleLimitParam paramObj = (RuleLimitParam) MapService.getObject(key);
>     LOGGER.info(Json.toJson(MapService.getObjects(), JsonFormat.tidy()));
>
>     if (paramObj != null) {
>
>       //I used the kieSession here
>       LimitFact fact = new LimitFact();
>       fact.setHigh(paramObj.high);
>       fact.setLow(paramObj.low);
>       fact.setOperate(paramObj.operate);
>       fact.setValue(Float.valueOf(dataPoint.value));
>       kieSession.insert(fact);
>       kieSession.fireAllRules();
>
>     }
>
>     collector.ack(input);
>   }
>
>
>   public void prepare(Map stormConf, TopologyContext context,
> OutputCollector collector) {
>     this.collector = collector;
>     SpringTools instance = SpringTools.getInstance();
>     ApplicationContext applicationContext = instance.
> getApplicationContext();
>     methodService = applicationContext.getBean(MethodService.class);
>     DroolsService droolsService = applicationContext.getBean(
> DroolsService.class);
>     droolsService.getRules();
>
>     //This is the init.
>     KieServices ks = KieServices.Factory.get();
>     KieContainer kContainer = ks.getKieClasspathContainer();
>     kieSession = kContainer.newKieSession("all-rule");
>   }
>
>   public void declareOutputFields(OutputFieldsDeclarer declarer) {
>
>   }
>
> The drools also uses a xml file that in resources/META-INF/kmodule.xml,
> <?xml version="1.0" encoding="UTF-8"?>
> <kmodule xmlns="http://www.drools.org/xsd/kmodule">
>     <kbase name="rules" packages="com.rules">
>         <ksession name="all-rule"/>
>     </kbase>
> </kmodule>
>
> And the rules I put them in resources/com/rules/DealLimit.drl
>
>
>
> 2017-09-15 0:03 GMT+08:00 Stig Rohde Døssing <sr...@apache.org>:
>
>> There are a lot of differences between local cluster and production. The
>> primary difference is that local clusters run all the code in one JVM,
>> whereas a production cluster runs bolts spread across multiple worker JVMs,
>> and the topology wiring code in a separate JVM on the Nimbus host.
>>
>> There is no initialization in this prepare method:
>> @Override
>>   public void prepare(Map stormConf, TopologyContext context,
>> OutputCollector collector) {
>>     this.collector = collector;
>>   }
>>
>> I think it's easier to help if you post the code you're using to
>> initialize Drools, as well as which Drools artifact and version you're
>> depending on.
>>
>> 2017-09-14 12:37 GMT+02:00 张博 <jy...@gmail.com>:
>>
>>> I only use drools in bolt.I init it in prepare method.So,I think that it
>>> is not the reason.But it runs in the localcluster.Do you know the
>>> difference between the localcluster and the production cluster?
>>>
>>> 2017-09-13 23:27 GMT+08:00 Stig Rohde Døssing <sr...@apache.org>:
>>>
>>>> I'm not familiar with Drools, so I'm just guessing here, but are you
>>>> doing any kind of setup of the KieContainer before submitting your
>>>> topology? When you run your topology the bolt doesn't run in the same JVM
>>>> as the topology setup code, so any setup done via static variables/methods
>>>> won't transfer from the submitter JVM to the bolt JVM.
>>>>
>>>> If you need to run code before starting a worker, you might want to
>>>> look at https://github.com/apache/storm/blob/master/storm-client/src
>>>> /jvm/org/apache/storm/hooks/BaseWorkerHook.java and
>>>> https://storm.apache.org/releases/1.0.3/javadocs/org/apache/
>>>> storm/topology/TopologyBuilder.html#addWorkerHook-org.apache
>>>> .storm.hooks.IWorkerHook-.
>>>>
>>>> 2017-09-13 15:30 GMT+02:00 zhangwenwei <je...@icloud.com>:
>>>>
>>>>> According to the log info, there have a NPE occur when call method
>>>>> kieContainer.newKieSession().
>>>>>
>>>>> Best Regards,
>>>>> Jerry Zhang
>>>>>
>>>>> > On 13 Sep 2017, at 14:15, 张博 <jy...@gmail.com> wrote:
>>>>> >
>>>>> > Hi!
>>>>> > Now I want to use Drools in a blot,it works normal in the
>>>>> LocalCluster, but when I put it to the production cluster,it has error.
>>>>> > The blot:
>>>>> > public class DealLostBolt extends BaseRichBolt {
>>>>> >
>>>>> >   private static final long serialVersionUID = 1L;
>>>>> >
>>>>> >   private static final Logger LOGGER = LoggerFactory.getLogger("DEAL_
>>>>> LOST_BOLT");
>>>>> >
>>>>> >   private OutputCollector collector;
>>>>> >
>>>>> >   private KieSession kieSession;
>>>>> >
>>>>> >   private FactHandle factHandle;
>>>>> >
>>>>> >   @Override
>>>>> >   public void execute(Tuple input) {
>>>>> >     // 获取数据
>>>>> >     String sentence = (String) input.getValue(0);
>>>>> >     LOGGER.info("DealLostBolt获取到的数据:" + sentence);
>>>>> >
>>>>> >     // 数据转换
>>>>> >     PutDataPoint dataPoint = Json.fromJson(PutDataPoint.class,
>>>>> sentence);
>>>>> >
>>>>> >     KieServices ks = KieServices.Factory.get();
>>>>> >     KieContainer kieContainer = ks.getKieClasspathContainer();
>>>>> >     kieSession = kieContainer.newKieSession("all-rule");
>>>>> >     kieSession.getAgenda().getAgendaGroup("deal-lost").setFocus();
>>>>> >
>>>>> >     factHandle = kieSession.insert(dataPoint);
>>>>> >     kieSession.fireAllRules();
>>>>> >     kieSession.delete(factHandle);
>>>>> >
>>>>> >     collector.emit(new Values(sentence));
>>>>> >   }
>>>>> >
>>>>> >   @Override
>>>>> >   public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>>> >     declarer.declare(new Fields("value"));
>>>>> >
>>>>> >   }
>>>>> >
>>>>> >   @Override
>>>>> >   public void prepare(Map stormConf, TopologyContext context,
>>>>> OutputCollector collector) {
>>>>> >     this.collector = collector;
>>>>> >   }
>>>>> >
>>>>> > }
>>>>> > The erros:
>>>>> > java.lang.RuntimeException: java.lang.NullPointerException
>>>>> >       at org.apache.storm.utils.Disrupt
>>>>> orQueue.consumeBatchToCursor(DisruptorQueue.java:495)
>>>>> ~[storm-core-1.1.1.jar:1.1.1]
>>>>> >       at org.apache.storm.utils.Disrupt
>>>>> orQueue.consumeBatchWhenAvailable(DisruptorQueue.java:460)
>>>>> ~[storm-core-1.1.1.jar:1.1.1]
>>>>> >       at org.apache.storm.disruptor$con
>>>>> sume_batch_when_available.invoke(disruptor.clj:73)
>>>>> ~[storm-core-1.1.1.jar:1.1.1]
>>>>> >       at org.apache.storm.daemon.execut
>>>>> or$fn__5030$fn__5043$fn__5096.invoke(executor.clj:848)
>>>>> ~[storm-core-1.1.1.jar:1.1.1]
>>>>> >       at org.apache.storm.util$async_lo
>>>>> op$fn__557.invoke(util.clj:484) [storm-core-1.1.1.jar:1.1.1]
>>>>> >       at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>>>>> >       at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
>>>>> > Caused by: java.lang.NullPointerException
>>>>> >       at org.kie.internal.io.ResourceFa
>>>>> ctory.newByteArrayResource(ResourceFactory.java:66)
>>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>>>> >       at org.drools.compiler.kie.builde
>>>>> r.impl.AbstractKieModule.getResource(AbstractKieModule.java:299)
>>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>>>> >       at org.drools.compiler.kie.builde
>>>>> r.impl.AbstractKieModule.addResourceToCompiler(AbstractKieModule.java:264)
>>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>>>> >       at org.drools.compiler.kie.builde
>>>>> r.impl.AbstractKieModule.addResourceToCompiler(AbstractKieModule.java:259)
>>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>>>> >       at org.drools.compiler.kie.builde
>>>>> r.impl.AbstractKieProject.buildKnowledgePackages(AbstractKieProject.java:228)
>>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>>>> >       at org.drools.compiler.kie.builde
>>>>> r.impl.AbstractKieModule.createKieBase(AbstractKieModule.java:206)
>>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>>>> >       at org.drools.compiler.kie.builde
>>>>> r.impl.KieContainerImpl.createKieBase(KieContainerImpl.java:584)
>>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>>>> >       at org.drools.compiler.kie.builde
>>>>> r.impl.KieContainerImpl.getKieBase(KieContainerImpl.java:552)
>>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>>>> >       at org.drools.compiler.kie.builde
>>>>> r.impl.KieContainerImpl.newKieSession(KieContainerImpl.java:680)
>>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>>>> >       at org.drools.compiler.kie.builde
>>>>> r.impl.KieContainerImpl.newKieSession(KieContainerImpl.java:648)
>>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>>>> >       at cn.ennwifi.storm.bolt.DealLost
>>>>> Bolt.execute(DealLostBolt.java:52) ~[se-storm-0.0.1-SNAPSHOT-jar-
>>>>> with-dependencies.jar:?]
>>>>> >       at org.apache.storm.daemon.execut
>>>>> or$fn__5030$tuple_action_fn__5032.invoke(executor.clj:729)
>>>>> ~[storm-core-1.1.1.jar:1.1.1]
>>>>> >       at org.apache.storm.daemon.execut
>>>>> or$mk_task_receiver$fn__4951.invoke(executor.clj:461)
>>>>> ~[storm-core-1.1.1.jar:1.1.1]
>>>>> >       at org.apache.storm.disruptor$clo
>>>>> jure_handler$reify__4465.onEvent(disruptor.clj:40)
>>>>> ~[storm-core-1.1.1.jar:1.1.1]
>>>>> >       at org.apache.storm.utils.Disrupt
>>>>> orQueue.consumeBatchToCursor(DisruptorQueue.java:482)
>>>>> ~[storm-core-1.1.1.jar:1.1.1]
>>>>> >       ... 6 more
>>>>> >
>>>>> > Could somebody help me?
>>>>> >
>>>>> > Thanks!
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: How to use Drools in the topology

Posted by 张博 <jy...@gmail.com>.
The Drools's version I depend on is 7.2.0.Final.The dependency in pom.xml is
<dependency>
<groupId>org.drools</groupId>
<artifactId>drools-compiler</artifactId>
<version>7.2.0.Final</version>
</dependency>

And the bolt is:
public class DealLimitBolt extends BaseRichBolt {

  private OutputCollector collector;

  private KieSession kieSession;

  public void execute(Tuple input) {
    // 获取数据
    String sentence = (String) input.getValue(0);

    // 数据转换
    PutDataPoint dataPoint = Json.fromJson(PutDataPoint.class, sentence);

    // 获取规则
    String key = String.valueOf(dataPoint.tags.get("gatewayId")) +
String.valueOf(dataPoint.getTags().get("deviceId"))
        + dataPoint.getMetric() + "0";
    RuleLimitParam paramObj = (RuleLimitParam) MapService.getObject(key);
    LOGGER.info(Json.toJson(MapService.getObjects(), JsonFormat.tidy()));

    if (paramObj != null) {

      //I used the kieSession here
      LimitFact fact = new LimitFact();
      fact.setHigh(paramObj.high);
      fact.setLow(paramObj.low);
      fact.setOperate(paramObj.operate);
      fact.setValue(Float.valueOf(dataPoint.value));
      kieSession.insert(fact);
      kieSession.fireAllRules();

    }

    collector.ack(input);
  }


  public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
    this.collector = collector;
    SpringTools instance = SpringTools.getInstance();
    ApplicationContext applicationContext =
instance.getApplicationContext();
    methodService = applicationContext.getBean(MethodService.class);
    DroolsService droolsService =
applicationContext.getBean(DroolsService.class);
    droolsService.getRules();

    //This is the init.
    KieServices ks = KieServices.Factory.get();
    KieContainer kContainer = ks.getKieClasspathContainer();
    kieSession = kContainer.newKieSession("all-rule");
  }

  public void declareOutputFields(OutputFieldsDeclarer declarer) {

  }

The drools also uses a xml file that in resources/META-INF/kmodule.xml,
<?xml version="1.0" encoding="UTF-8"?>
<kmodule xmlns="http://www.drools.org/xsd/kmodule">
    <kbase name="rules" packages="com.rules">
        <ksession name="all-rule"/>
    </kbase>
</kmodule>

And the rules I put them in resources/com/rules/DealLimit.drl



2017-09-15 0:03 GMT+08:00 Stig Rohde Døssing <sr...@apache.org>:

> There are a lot of differences between local cluster and production. The
> primary difference is that local clusters run all the code in one JVM,
> whereas a production cluster runs bolts spread across multiple worker JVMs,
> and the topology wiring code in a separate JVM on the Nimbus host.
>
> There is no initialization in this prepare method:
> @Override
>   public void prepare(Map stormConf, TopologyContext context,
> OutputCollector collector) {
>     this.collector = collector;
>   }
>
> I think it's easier to help if you post the code you're using to
> initialize Drools, as well as which Drools artifact and version you're
> depending on.
>
> 2017-09-14 12:37 GMT+02:00 张博 <jy...@gmail.com>:
>
>> I only use drools in bolt.I init it in prepare method.So,I think that it
>> is not the reason.But it runs in the localcluster.Do you know the
>> difference between the localcluster and the production cluster?
>>
>> 2017-09-13 23:27 GMT+08:00 Stig Rohde Døssing <sr...@apache.org>:
>>
>>> I'm not familiar with Drools, so I'm just guessing here, but are you
>>> doing any kind of setup of the KieContainer before submitting your
>>> topology? When you run your topology the bolt doesn't run in the same JVM
>>> as the topology setup code, so any setup done via static variables/methods
>>> won't transfer from the submitter JVM to the bolt JVM.
>>>
>>> If you need to run code before starting a worker, you might want to look
>>> at https://github.com/apache/storm/blob/master/storm-client/src
>>> /jvm/org/apache/storm/hooks/BaseWorkerHook.java and
>>> https://storm.apache.org/releases/1.0.3/javadocs/org/apache/
>>> storm/topology/TopologyBuilder.html#addWorkerHook-org.apache
>>> .storm.hooks.IWorkerHook-.
>>>
>>> 2017-09-13 15:30 GMT+02:00 zhangwenwei <je...@icloud.com>:
>>>
>>>> According to the log info, there have a NPE occur when call method
>>>> kieContainer.newKieSession().
>>>>
>>>> Best Regards,
>>>> Jerry Zhang
>>>>
>>>> > On 13 Sep 2017, at 14:15, 张博 <jy...@gmail.com> wrote:
>>>> >
>>>> > Hi!
>>>> > Now I want to use Drools in a blot,it works normal in the
>>>> LocalCluster, but when I put it to the production cluster,it has error.
>>>> > The blot:
>>>> > public class DealLostBolt extends BaseRichBolt {
>>>> >
>>>> >   private static final long serialVersionUID = 1L;
>>>> >
>>>> >   private static final Logger LOGGER = LoggerFactory.getLogger("DEAL_
>>>> LOST_BOLT");
>>>> >
>>>> >   private OutputCollector collector;
>>>> >
>>>> >   private KieSession kieSession;
>>>> >
>>>> >   private FactHandle factHandle;
>>>> >
>>>> >   @Override
>>>> >   public void execute(Tuple input) {
>>>> >     // 获取数据
>>>> >     String sentence = (String) input.getValue(0);
>>>> >     LOGGER.info("DealLostBolt获取到的数据:" + sentence);
>>>> >
>>>> >     // 数据转换
>>>> >     PutDataPoint dataPoint = Json.fromJson(PutDataPoint.class,
>>>> sentence);
>>>> >
>>>> >     KieServices ks = KieServices.Factory.get();
>>>> >     KieContainer kieContainer = ks.getKieClasspathContainer();
>>>> >     kieSession = kieContainer.newKieSession("all-rule");
>>>> >     kieSession.getAgenda().getAgendaGroup("deal-lost").setFocus();
>>>> >
>>>> >     factHandle = kieSession.insert(dataPoint);
>>>> >     kieSession.fireAllRules();
>>>> >     kieSession.delete(factHandle);
>>>> >
>>>> >     collector.emit(new Values(sentence));
>>>> >   }
>>>> >
>>>> >   @Override
>>>> >   public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>> >     declarer.declare(new Fields("value"));
>>>> >
>>>> >   }
>>>> >
>>>> >   @Override
>>>> >   public void prepare(Map stormConf, TopologyContext context,
>>>> OutputCollector collector) {
>>>> >     this.collector = collector;
>>>> >   }
>>>> >
>>>> > }
>>>> > The erros:
>>>> > java.lang.RuntimeException: java.lang.NullPointerException
>>>> >       at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:495)
>>>> ~[storm-core-1.1.1.jar:1.1.1]
>>>> >       at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:460)
>>>> ~[storm-core-1.1.1.jar:1.1.1]
>>>> >       at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73)
>>>> ~[storm-core-1.1.1.jar:1.1.1]
>>>> >       at org.apache.storm.daemon.executor$fn__5030$fn__5043$fn__5096.invoke(executor.clj:848)
>>>> ~[storm-core-1.1.1.jar:1.1.1]
>>>> >       at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484)
>>>> [storm-core-1.1.1.jar:1.1.1]
>>>> >       at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>>>> >       at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
>>>> > Caused by: java.lang.NullPointerException
>>>> >       at org.kie.internal.io.ResourceFactory.newByteArrayResource(ResourceFactory.java:66)
>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>>> >       at org.drools.compiler.kie.builder.impl.AbstractKieModule.getRe
>>>> source(AbstractKieModule.java:299) ~[se-storm-0.0.1-SNAPSHOT-jar-
>>>> with-dependencies.jar:?]
>>>> >       at org.drools.compiler.kie.builder.impl.AbstractKieModule.addRe
>>>> sourceToCompiler(AbstractKieModule.java:264)
>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>>> >       at org.drools.compiler.kie.builder.impl.AbstractKieModule.addRe
>>>> sourceToCompiler(AbstractKieModule.java:259)
>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>>> >       at org.drools.compiler.kie.builder.impl.AbstractKieProject.buil
>>>> dKnowledgePackages(AbstractKieProject.java:228)
>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>>> >       at org.drools.compiler.kie.builder.impl.AbstractKieModule.creat
>>>> eKieBase(AbstractKieModule.java:206) ~[se-storm-0.0.1-SNAPSHOT-jar-
>>>> with-dependencies.jar:?]
>>>> >       at org.drools.compiler.kie.builder.impl.KieContainerImpl.create
>>>> KieBase(KieContainerImpl.java:584) ~[se-storm-0.0.1-SNAPSHOT-jar-
>>>> with-dependencies.jar:?]
>>>> >       at org.drools.compiler.kie.builder.impl.KieContainerImpl.getKie
>>>> Base(KieContainerImpl.java:552) ~[se-storm-0.0.1-SNAPSHOT-jar-
>>>> with-dependencies.jar:?]
>>>> >       at org.drools.compiler.kie.builder.impl.KieContainerImpl.newKie
>>>> Session(KieContainerImpl.java:680) ~[se-storm-0.0.1-SNAPSHOT-jar-
>>>> with-dependencies.jar:?]
>>>> >       at org.drools.compiler.kie.builder.impl.KieContainerImpl.newKie
>>>> Session(KieContainerImpl.java:648) ~[se-storm-0.0.1-SNAPSHOT-jar-
>>>> with-dependencies.jar:?]
>>>> >       at cn.ennwifi.storm.bolt.DealLostBolt.execute(DealLostBolt.java:52)
>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>>> >       at org.apache.storm.daemon.executor$fn__5030$tuple_action_fn__5032.invoke(executor.clj:729)
>>>> ~[storm-core-1.1.1.jar:1.1.1]
>>>> >       at org.apache.storm.daemon.executor$mk_task_receiver$fn__4951.invoke(executor.clj:461)
>>>> ~[storm-core-1.1.1.jar:1.1.1]
>>>> >       at org.apache.storm.disruptor$clojure_handler$reify__4465.onEvent(disruptor.clj:40)
>>>> ~[storm-core-1.1.1.jar:1.1.1]
>>>> >       at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:482)
>>>> ~[storm-core-1.1.1.jar:1.1.1]
>>>> >       ... 6 more
>>>> >
>>>> > Could somebody help me?
>>>> >
>>>> > Thanks!
>>>>
>>>>
>>>
>>
>

Re: How to use Drools in the topology

Posted by Stig Rohde Døssing <sr...@apache.org>.
There are a lot of differences between local cluster and production. The
primary difference is that local clusters run all the code in one JVM,
whereas a production cluster runs bolts spread across multiple worker JVMs,
and the topology wiring code in a separate JVM on the Nimbus host.

There is no initialization in this prepare method:
@Override
  public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
    this.collector = collector;
  }

I think it's easier to help if you post the code you're using to initialize
Drools, as well as which Drools artifact and version you're depending on.

2017-09-14 12:37 GMT+02:00 张博 <jy...@gmail.com>:

> I only use drools in bolt.I init it in prepare method.So,I think that it
> is not the reason.But it runs in the localcluster.Do you know the
> difference between the localcluster and the production cluster?
>
> 2017-09-13 23:27 GMT+08:00 Stig Rohde Døssing <sr...@apache.org>:
>
>> I'm not familiar with Drools, so I'm just guessing here, but are you
>> doing any kind of setup of the KieContainer before submitting your
>> topology? When you run your topology the bolt doesn't run in the same JVM
>> as the topology setup code, so any setup done via static variables/methods
>> won't transfer from the submitter JVM to the bolt JVM.
>>
>> If you need to run code before starting a worker, you might want to look
>> at https://github.com/apache/storm/blob/master/storm-client/
>> src/jvm/org/apache/storm/hooks/BaseWorkerHook.java and
>> https://storm.apache.org/releases/1.0.3/javadocs/org/apache/
>> storm/topology/TopologyBuilder.html#addWorkerHook-org.apache.storm.hooks.
>> IWorkerHook-.
>>
>> 2017-09-13 15:30 GMT+02:00 zhangwenwei <je...@icloud.com>:
>>
>>> According to the log info, there have a NPE occur when call method
>>> kieContainer.newKieSession().
>>>
>>> Best Regards,
>>> Jerry Zhang
>>>
>>> > On 13 Sep 2017, at 14:15, 张博 <jy...@gmail.com> wrote:
>>> >
>>> > Hi!
>>> > Now I want to use Drools in a blot,it works normal in the
>>> LocalCluster, but when I put it to the production cluster,it has error.
>>> > The blot:
>>> > public class DealLostBolt extends BaseRichBolt {
>>> >
>>> >   private static final long serialVersionUID = 1L;
>>> >
>>> >   private static final Logger LOGGER = LoggerFactory.getLogger("DEAL_
>>> LOST_BOLT");
>>> >
>>> >   private OutputCollector collector;
>>> >
>>> >   private KieSession kieSession;
>>> >
>>> >   private FactHandle factHandle;
>>> >
>>> >   @Override
>>> >   public void execute(Tuple input) {
>>> >     // 获取数据
>>> >     String sentence = (String) input.getValue(0);
>>> >     LOGGER.info("DealLostBolt获取到的数据:" + sentence);
>>> >
>>> >     // 数据转换
>>> >     PutDataPoint dataPoint = Json.fromJson(PutDataPoint.class,
>>> sentence);
>>> >
>>> >     KieServices ks = KieServices.Factory.get();
>>> >     KieContainer kieContainer = ks.getKieClasspathContainer();
>>> >     kieSession = kieContainer.newKieSession("all-rule");
>>> >     kieSession.getAgenda().getAgendaGroup("deal-lost").setFocus();
>>> >
>>> >     factHandle = kieSession.insert(dataPoint);
>>> >     kieSession.fireAllRules();
>>> >     kieSession.delete(factHandle);
>>> >
>>> >     collector.emit(new Values(sentence));
>>> >   }
>>> >
>>> >   @Override
>>> >   public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>> >     declarer.declare(new Fields("value"));
>>> >
>>> >   }
>>> >
>>> >   @Override
>>> >   public void prepare(Map stormConf, TopologyContext context,
>>> OutputCollector collector) {
>>> >     this.collector = collector;
>>> >   }
>>> >
>>> > }
>>> > The erros:
>>> > java.lang.RuntimeException: java.lang.NullPointerException
>>> >       at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:495)
>>> ~[storm-core-1.1.1.jar:1.1.1]
>>> >       at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:460)
>>> ~[storm-core-1.1.1.jar:1.1.1]
>>> >       at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73)
>>> ~[storm-core-1.1.1.jar:1.1.1]
>>> >       at org.apache.storm.daemon.executor$fn__5030$fn__5043$fn__5096.invoke(executor.clj:848)
>>> ~[storm-core-1.1.1.jar:1.1.1]
>>> >       at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484)
>>> [storm-core-1.1.1.jar:1.1.1]
>>> >       at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>>> >       at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
>>> > Caused by: java.lang.NullPointerException
>>> >       at org.kie.internal.io.ResourceFactory.newByteArrayResource(ResourceFactory.java:66)
>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>> >       at org.drools.compiler.kie.builder.impl.AbstractKieModule.getRe
>>> source(AbstractKieModule.java:299) ~[se-storm-0.0.1-SNAPSHOT-jar-
>>> with-dependencies.jar:?]
>>> >       at org.drools.compiler.kie.builder.impl.AbstractKieModule.addRe
>>> sourceToCompiler(AbstractKieModule.java:264)
>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>> >       at org.drools.compiler.kie.builder.impl.AbstractKieModule.addRe
>>> sourceToCompiler(AbstractKieModule.java:259)
>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>> >       at org.drools.compiler.kie.builder.impl.AbstractKieProject.buil
>>> dKnowledgePackages(AbstractKieProject.java:228)
>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>> >       at org.drools.compiler.kie.builder.impl.AbstractKieModule.creat
>>> eKieBase(AbstractKieModule.java:206) ~[se-storm-0.0.1-SNAPSHOT-jar-
>>> with-dependencies.jar:?]
>>> >       at org.drools.compiler.kie.builder.impl.KieContainerImpl.create
>>> KieBase(KieContainerImpl.java:584) ~[se-storm-0.0.1-SNAPSHOT-jar-
>>> with-dependencies.jar:?]
>>> >       at org.drools.compiler.kie.builder.impl.KieContainerImpl.getKie
>>> Base(KieContainerImpl.java:552) ~[se-storm-0.0.1-SNAPSHOT-jar-
>>> with-dependencies.jar:?]
>>> >       at org.drools.compiler.kie.builder.impl.KieContainerImpl.newKie
>>> Session(KieContainerImpl.java:680) ~[se-storm-0.0.1-SNAPSHOT-jar-
>>> with-dependencies.jar:?]
>>> >       at org.drools.compiler.kie.builder.impl.KieContainerImpl.newKie
>>> Session(KieContainerImpl.java:648) ~[se-storm-0.0.1-SNAPSHOT-jar-
>>> with-dependencies.jar:?]
>>> >       at cn.ennwifi.storm.bolt.DealLostBolt.execute(DealLostBolt.java:52)
>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>> >       at org.apache.storm.daemon.executor$fn__5030$tuple_action_fn__5032.invoke(executor.clj:729)
>>> ~[storm-core-1.1.1.jar:1.1.1]
>>> >       at org.apache.storm.daemon.executor$mk_task_receiver$fn__4951.invoke(executor.clj:461)
>>> ~[storm-core-1.1.1.jar:1.1.1]
>>> >       at org.apache.storm.disruptor$clojure_handler$reify__4465.onEvent(disruptor.clj:40)
>>> ~[storm-core-1.1.1.jar:1.1.1]
>>> >       at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:482)
>>> ~[storm-core-1.1.1.jar:1.1.1]
>>> >       ... 6 more
>>> >
>>> > Could somebody help me?
>>> >
>>> > Thanks!
>>>
>>>
>>
>

Re: How to use Drools in the topology

Posted by 张博 <jy...@gmail.com>.
I only use drools in bolt.I init it in prepare method.So,I think that it is
not the reason.But it runs in the localcluster.Do you know the difference
between the localcluster and the production cluster?

2017-09-13 23:27 GMT+08:00 Stig Rohde Døssing <sr...@apache.org>:

> I'm not familiar with Drools, so I'm just guessing here, but are you doing
> any kind of setup of the KieContainer before submitting your topology? When
> you run your topology the bolt doesn't run in the same JVM as the topology
> setup code, so any setup done via static variables/methods won't transfer
> from the submitter JVM to the bolt JVM.
>
> If you need to run code before starting a worker, you might want to look
> at https://github.com/apache/storm/blob/master/storm-
> client/src/jvm/org/apache/storm/hooks/BaseWorkerHook.java and
> https://storm.apache.org/releases/1.0.3/javadocs/org/
> apache/storm/topology/TopologyBuilder.html#addWorkerHook-org.apache.
> storm.hooks.IWorkerHook-.
>
> 2017-09-13 15:30 GMT+02:00 zhangwenwei <je...@icloud.com>:
>
>> According to the log info, there have a NPE occur when call method
>> kieContainer.newKieSession().
>>
>> Best Regards,
>> Jerry Zhang
>>
>> > On 13 Sep 2017, at 14:15, 张博 <jy...@gmail.com> wrote:
>> >
>> > Hi!
>> > Now I want to use Drools in a blot,it works normal in the LocalCluster,
>> but when I put it to the production cluster,it has error.
>> > The blot:
>> > public class DealLostBolt extends BaseRichBolt {
>> >
>> >   private static final long serialVersionUID = 1L;
>> >
>> >   private static final Logger LOGGER = LoggerFactory.getLogger("DEAL_
>> LOST_BOLT");
>> >
>> >   private OutputCollector collector;
>> >
>> >   private KieSession kieSession;
>> >
>> >   private FactHandle factHandle;
>> >
>> >   @Override
>> >   public void execute(Tuple input) {
>> >     // 获取数据
>> >     String sentence = (String) input.getValue(0);
>> >     LOGGER.info("DealLostBolt获取到的数据:" + sentence);
>> >
>> >     // 数据转换
>> >     PutDataPoint dataPoint = Json.fromJson(PutDataPoint.class,
>> sentence);
>> >
>> >     KieServices ks = KieServices.Factory.get();
>> >     KieContainer kieContainer = ks.getKieClasspathContainer();
>> >     kieSession = kieContainer.newKieSession("all-rule");
>> >     kieSession.getAgenda().getAgendaGroup("deal-lost").setFocus();
>> >
>> >     factHandle = kieSession.insert(dataPoint);
>> >     kieSession.fireAllRules();
>> >     kieSession.delete(factHandle);
>> >
>> >     collector.emit(new Values(sentence));
>> >   }
>> >
>> >   @Override
>> >   public void declareOutputFields(OutputFieldsDeclarer declarer) {
>> >     declarer.declare(new Fields("value"));
>> >
>> >   }
>> >
>> >   @Override
>> >   public void prepare(Map stormConf, TopologyContext context,
>> OutputCollector collector) {
>> >     this.collector = collector;
>> >   }
>> >
>> > }
>> > The erros:
>> > java.lang.RuntimeException: java.lang.NullPointerException
>> >       at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:495)
>> ~[storm-core-1.1.1.jar:1.1.1]
>> >       at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:460)
>> ~[storm-core-1.1.1.jar:1.1.1]
>> >       at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73)
>> ~[storm-core-1.1.1.jar:1.1.1]
>> >       at org.apache.storm.daemon.executor$fn__5030$fn__5043$fn__5096.invoke(executor.clj:848)
>> ~[storm-core-1.1.1.jar:1.1.1]
>> >       at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484)
>> [storm-core-1.1.1.jar:1.1.1]
>> >       at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>> >       at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
>> > Caused by: java.lang.NullPointerException
>> >       at org.kie.internal.io.ResourceFactory.newByteArrayResource(ResourceFactory.java:66)
>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> >       at org.drools.compiler.kie.builder.impl.AbstractKieModule.
>> getResource(AbstractKieModule.java:299) ~[se-storm-0.0.1-SNAPSHOT-jar-
>> with-dependencies.jar:?]
>> >       at org.drools.compiler.kie.builder.impl.AbstractKieModule.addRe
>> sourceToCompiler(AbstractKieModule.java:264)
>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> >       at org.drools.compiler.kie.builder.impl.AbstractKieModule.addRe
>> sourceToCompiler(AbstractKieModule.java:259)
>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> >       at org.drools.compiler.kie.builder.impl.AbstractKieProject.buil
>> dKnowledgePackages(AbstractKieProject.java:228)
>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> >       at org.drools.compiler.kie.builder.impl.AbstractKieModule.creat
>> eKieBase(AbstractKieModule.java:206) ~[se-storm-0.0.1-SNAPSHOT-jar-
>> with-dependencies.jar:?]
>> >       at org.drools.compiler.kie.builder.impl.KieContainerImpl.create
>> KieBase(KieContainerImpl.java:584) ~[se-storm-0.0.1-SNAPSHOT-jar-
>> with-dependencies.jar:?]
>> >       at org.drools.compiler.kie.builder.impl.KieContainerImpl.getKie
>> Base(KieContainerImpl.java:552) ~[se-storm-0.0.1-SNAPSHOT-jar-
>> with-dependencies.jar:?]
>> >       at org.drools.compiler.kie.builder.impl.KieContainerImpl.newKie
>> Session(KieContainerImpl.java:680) ~[se-storm-0.0.1-SNAPSHOT-jar-
>> with-dependencies.jar:?]
>> >       at org.drools.compiler.kie.builder.impl.KieContainerImpl.newKie
>> Session(KieContainerImpl.java:648) ~[se-storm-0.0.1-SNAPSHOT-jar-
>> with-dependencies.jar:?]
>> >       at cn.ennwifi.storm.bolt.DealLostBolt.execute(DealLostBolt.java:52)
>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> >       at org.apache.storm.daemon.executor$fn__5030$tuple_action_fn__5032.invoke(executor.clj:729)
>> ~[storm-core-1.1.1.jar:1.1.1]
>> >       at org.apache.storm.daemon.executor$mk_task_receiver$fn__4951.invoke(executor.clj:461)
>> ~[storm-core-1.1.1.jar:1.1.1]
>> >       at org.apache.storm.disruptor$clojure_handler$reify__4465.onEvent(disruptor.clj:40)
>> ~[storm-core-1.1.1.jar:1.1.1]
>> >       at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:482)
>> ~[storm-core-1.1.1.jar:1.1.1]
>> >       ... 6 more
>> >
>> > Could somebody help me?
>> >
>> > Thanks!
>>
>>
>

Re: How to use Drools in the topology

Posted by Stig Rohde Døssing <sr...@apache.org>.
I'm not familiar with Drools, so I'm just guessing here, but are you doing
any kind of setup of the KieContainer before submitting your topology? When
you run your topology the bolt doesn't run in the same JVM as the topology
setup code, so any setup done via static variables/methods won't transfer
from the submitter JVM to the bolt JVM.

If you need to run code before starting a worker, you might want to look at
https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/hooks/BaseWorkerHook.java
and
https://storm.apache.org/releases/1.0.3/javadocs/org/apache/storm/topology/TopologyBuilder.html#addWorkerHook-org.apache.storm.hooks.IWorkerHook-
.

2017-09-13 15:30 GMT+02:00 zhangwenwei <je...@icloud.com>:

> According to the log info, there have a NPE occur when call method
> kieContainer.newKieSession().
>
> Best Regards,
> Jerry Zhang
>
> > On 13 Sep 2017, at 14:15, 张博 <jy...@gmail.com> wrote:
> >
> > Hi!
> > Now I want to use Drools in a blot,it works normal in the LocalCluster,
> but when I put it to the production cluster,it has error.
> > The blot:
> > public class DealLostBolt extends BaseRichBolt {
> >
> >   private static final long serialVersionUID = 1L;
> >
> >   private static final Logger LOGGER = LoggerFactory.getLogger("DEAL_
> LOST_BOLT");
> >
> >   private OutputCollector collector;
> >
> >   private KieSession kieSession;
> >
> >   private FactHandle factHandle;
> >
> >   @Override
> >   public void execute(Tuple input) {
> >     // 获取数据
> >     String sentence = (String) input.getValue(0);
> >     LOGGER.info("DealLostBolt获取到的数据:" + sentence);
> >
> >     // 数据转换
> >     PutDataPoint dataPoint = Json.fromJson(PutDataPoint.class,
> sentence);
> >
> >     KieServices ks = KieServices.Factory.get();
> >     KieContainer kieContainer = ks.getKieClasspathContainer();
> >     kieSession = kieContainer.newKieSession("all-rule");
> >     kieSession.getAgenda().getAgendaGroup("deal-lost").setFocus();
> >
> >     factHandle = kieSession.insert(dataPoint);
> >     kieSession.fireAllRules();
> >     kieSession.delete(factHandle);
> >
> >     collector.emit(new Values(sentence));
> >   }
> >
> >   @Override
> >   public void declareOutputFields(OutputFieldsDeclarer declarer) {
> >     declarer.declare(new Fields("value"));
> >
> >   }
> >
> >   @Override
> >   public void prepare(Map stormConf, TopologyContext context,
> OutputCollector collector) {
> >     this.collector = collector;
> >   }
> >
> > }
> > The erros:
> > java.lang.RuntimeException: java.lang.NullPointerException
> >       at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:495)
> ~[storm-core-1.1.1.jar:1.1.1]
> >       at org.apache.storm.utils.DisruptorQueue.
> consumeBatchWhenAvailable(DisruptorQueue.java:460)
> ~[storm-core-1.1.1.jar:1.1.1]
> >       at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73)
> ~[storm-core-1.1.1.jar:1.1.1]
> >       at org.apache.storm.daemon.executor$fn__5030$fn__5043$fn__5096.invoke(executor.clj:848)
> ~[storm-core-1.1.1.jar:1.1.1]
> >       at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484)
> [storm-core-1.1.1.jar:1.1.1]
> >       at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
> >       at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
> > Caused by: java.lang.NullPointerException
> >       at org.kie.internal.io.ResourceFactory.newByteArrayResource(ResourceFactory.java:66)
> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
> >       at org.drools.compiler.kie.builder.impl.
> AbstractKieModule.getResource(AbstractKieModule.java:299)
> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
> >       at org.drools.compiler.kie.builder.impl.AbstractKieModule.
> addResourceToCompiler(AbstractKieModule.java:264)
> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
> >       at org.drools.compiler.kie.builder.impl.AbstractKieModule.
> addResourceToCompiler(AbstractKieModule.java:259)
> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
> >       at org.drools.compiler.kie.builder.impl.AbstractKieProject.
> buildKnowledgePackages(AbstractKieProject.java:228)
> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
> >       at org.drools.compiler.kie.builder.impl.AbstractKieModule.
> createKieBase(AbstractKieModule.java:206) ~[se-storm-0.0.1-SNAPSHOT-jar-
> with-dependencies.jar:?]
> >       at org.drools.compiler.kie.builder.impl.KieContainerImpl.
> createKieBase(KieContainerImpl.java:584) ~[se-storm-0.0.1-SNAPSHOT-jar-
> with-dependencies.jar:?]
> >       at org.drools.compiler.kie.builder.impl.KieContainerImpl.
> getKieBase(KieContainerImpl.java:552) ~[se-storm-0.0.1-SNAPSHOT-jar-
> with-dependencies.jar:?]
> >       at org.drools.compiler.kie.builder.impl.KieContainerImpl.
> newKieSession(KieContainerImpl.java:680) ~[se-storm-0.0.1-SNAPSHOT-jar-
> with-dependencies.jar:?]
> >       at org.drools.compiler.kie.builder.impl.KieContainerImpl.
> newKieSession(KieContainerImpl.java:648) ~[se-storm-0.0.1-SNAPSHOT-jar-
> with-dependencies.jar:?]
> >       at cn.ennwifi.storm.bolt.DealLostBolt.execute(DealLostBolt.java:52)
> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
> >       at org.apache.storm.daemon.executor$fn__5030$tuple_
> action_fn__5032.invoke(executor.clj:729) ~[storm-core-1.1.1.jar:1.1.1]
> >       at org.apache.storm.daemon.executor$mk_task_receiver$fn__4951.invoke(executor.clj:461)
> ~[storm-core-1.1.1.jar:1.1.1]
> >       at org.apache.storm.disruptor$clojure_handler$reify__4465.onEvent(disruptor.clj:40)
> ~[storm-core-1.1.1.jar:1.1.1]
> >       at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:482)
> ~[storm-core-1.1.1.jar:1.1.1]
> >       ... 6 more
> >
> > Could somebody help me?
> >
> > Thanks!
>
>

Re: How to use Drools in the topology

Posted by zhangwenwei <je...@icloud.com>.
According to the log info, there have a NPE occur when call method kieContainer.newKieSession().

Best Regards,
Jerry Zhang

> On 13 Sep 2017, at 14:15, 张博 <jy...@gmail.com> wrote:
> 
> Hi!
> Now I want to use Drools in a blot,it works normal in the LocalCluster, but when I put it to the production cluster,it has error.
> The blot:
> public class DealLostBolt extends BaseRichBolt {
> 
>   private static final long serialVersionUID = 1L;
> 
>   private static final Logger LOGGER = LoggerFactory.getLogger("DEAL_LOST_BOLT");
> 
>   private OutputCollector collector;
> 
>   private KieSession kieSession;
> 
>   private FactHandle factHandle;
> 
>   @Override
>   public void execute(Tuple input) {
>     // 获取数据
>     String sentence = (String) input.getValue(0);
>     LOGGER.info("DealLostBolt获取到的数据:" + sentence);
> 
>     // 数据转换
>     PutDataPoint dataPoint = Json.fromJson(PutDataPoint.class, sentence);
> 
>     KieServices ks = KieServices.Factory.get();
>     KieContainer kieContainer = ks.getKieClasspathContainer();
>     kieSession = kieContainer.newKieSession("all-rule");
>     kieSession.getAgenda().getAgendaGroup("deal-lost").setFocus();
> 
>     factHandle = kieSession.insert(dataPoint);
>     kieSession.fireAllRules();
>     kieSession.delete(factHandle);
> 
>     collector.emit(new Values(sentence));
>   }
> 
>   @Override
>   public void declareOutputFields(OutputFieldsDeclarer declarer) {
>     declarer.declare(new Fields("value"));
> 
>   }
> 
>   @Override
>   public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
>     this.collector = collector;
>   }
> 
> }
> The erros:
> java.lang.RuntimeException: java.lang.NullPointerException
> 	at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:495) ~[storm-core-1.1.1.jar:1.1.1]
> 	at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:460) ~[storm-core-1.1.1.jar:1.1.1]
> 	at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73) ~[storm-core-1.1.1.jar:1.1.1]
> 	at org.apache.storm.daemon.executor$fn__5030$fn__5043$fn__5096.invoke(executor.clj:848) ~[storm-core-1.1.1.jar:1.1.1]
> 	at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.1.1.jar:1.1.1]
> 	at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
> 	at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
> Caused by: java.lang.NullPointerException
> 	at org.kie.internal.io.ResourceFactory.newByteArrayResource(ResourceFactory.java:66) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
> 	at org.drools.compiler.kie.builder.impl.AbstractKieModule.getResource(AbstractKieModule.java:299) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
> 	at org.drools.compiler.kie.builder.impl.AbstractKieModule.addResourceToCompiler(AbstractKieModule.java:264) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
> 	at org.drools.compiler.kie.builder.impl.AbstractKieModule.addResourceToCompiler(AbstractKieModule.java:259) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
> 	at org.drools.compiler.kie.builder.impl.AbstractKieProject.buildKnowledgePackages(AbstractKieProject.java:228) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
> 	at org.drools.compiler.kie.builder.impl.AbstractKieModule.createKieBase(AbstractKieModule.java:206) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
> 	at org.drools.compiler.kie.builder.impl.KieContainerImpl.createKieBase(KieContainerImpl.java:584) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
> 	at org.drools.compiler.kie.builder.impl.KieContainerImpl.getKieBase(KieContainerImpl.java:552) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
> 	at org.drools.compiler.kie.builder.impl.KieContainerImpl.newKieSession(KieContainerImpl.java:680) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
> 	at org.drools.compiler.kie.builder.impl.KieContainerImpl.newKieSession(KieContainerImpl.java:648) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
> 	at cn.ennwifi.storm.bolt.DealLostBolt.execute(DealLostBolt.java:52) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
> 	at org.apache.storm.daemon.executor$fn__5030$tuple_action_fn__5032.invoke(executor.clj:729) ~[storm-core-1.1.1.jar:1.1.1]
> 	at org.apache.storm.daemon.executor$mk_task_receiver$fn__4951.invoke(executor.clj:461) ~[storm-core-1.1.1.jar:1.1.1]
> 	at org.apache.storm.disruptor$clojure_handler$reify__4465.onEvent(disruptor.clj:40) ~[storm-core-1.1.1.jar:1.1.1]
> 	at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:482) ~[storm-core-1.1.1.jar:1.1.1]
> 	... 6 more
> 
> Could somebody help me?
> 
> Thanks!