You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Sergio Ryan <se...@potatobeans.id> on 2020/08/24 12:11:23 UTC

partitionPersist with HDFS, does not trigger updateState, only creates 0 bytes file in HDFS

Hi,

I am trying to create a simple Trident topology to save tuples to HDFS. 
Here is my code:

public class Topology {
     public static void main(String []args) {new Topology().run(args); }

     private void run(String []args) {
         FileNameFormat fileNameFormat =new DefaultFileNameFormat()
                 .withPath("/test")
                 .withPrefix("sk")
                 .withExtension(".txt"); RecordFormat recordFormat =new DelimitedRecordFormat()
// .withFieldDelimiter("\t") .withFields(new Fields("word")); FileRotationPolicy rotationPolicy =new FileSizeRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB); HdfsState.Options options =new HdfsState.HdfsFileOptions()
                 .withFileNameFormat(fileNameFormat)
                 .withRecordFormat(recordFormat)
                 .withRotationPolicy(rotationPolicy)
                 .withFsUrl("hdfs://192.168.0.10:9000"); HdfsStateFactory stateFactory =new HdfsStateFactory().withOptions(options); Config conf =new Config(); TridentTopology topology =new TridentTopology(); TestWordSpout testWordSpout =new TestWordSpout(); Stream stream = topology.newStream("teststream", testWordSpout); stream.partitionPersist(stateFactory, new Fields("word"), new HdfsUpdater(), new Fields()); try {
             LocalCluster localCluster =new LocalCluster(); localCluster.submitTopology("test_topology", conf, topology.build()); //StormSubmitter.submitTopology(args[0], conf, topology.build()); }catch (Exception e) {
             System.err.println(e.getMessage()); }
     }
}

I use the TestWordSpout to emit some random words. I can start the 
topology with no problem, and the topology seem to be able to connect to 
HDFS without any problems. It does create a file in /test directory, but 
does not write any content to it. There are no error messages in the 
log. I created TestHdfsUpdater, which extends HdfsUpdater, which 
basically just prints some text everytime updateState method is called, 
but it seems that the method is not even called. Can somebody help me 
with this?

Thank you!


Re: partitionPersist with HDFS, does not trigger updateState, only creates 0 bytes file in HDFS

Posted by Sergio Ryan <se...@potatobeans.id>.
Sorry for the dumb question, the problem is the spout. I have to use 
like FixedBatchSpout which implements IBatchSpout, doh.

On 8/24/20 7:11 PM, Sergio Ryan wrote:
>
> Hi,
>
> I am trying to create a simple Trident topology to save tuples to 
> HDFS. Here is my code:
>
> public class Topology {
>      public static void main(String []args) {new Topology().run(args); }
>
>      private void run(String []args) {
>          FileNameFormat fileNameFormat =new DefaultFileNameFormat()
>                  .withPath("/test")
>                  .withPrefix("sk")
>                  .withExtension(".txt"); RecordFormat recordFormat =new DelimitedRecordFormat()
> // .withFieldDelimiter("\t") .withFields(new Fields("word")); FileRotationPolicy rotationPolicy =new FileSizeRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB); HdfsState.Options options =new HdfsState.HdfsFileOptions()
>                  .withFileNameFormat(fileNameFormat)
>                  .withRecordFormat(recordFormat)
>                  .withRotationPolicy(rotationPolicy)
>                  .withFsUrl("hdfs://192.168.0.10:9000"); HdfsStateFactory stateFactory =new HdfsStateFactory().withOptions(options); Config conf =new Config(); TridentTopology topology =new TridentTopology(); TestWordSpout testWordSpout =new TestWordSpout(); Stream stream = topology.newStream("teststream", testWordSpout); stream.partitionPersist(stateFactory, new Fields("word"), new HdfsUpdater(), new Fields()); try {
>              LocalCluster localCluster =new LocalCluster(); localCluster.submitTopology("test_topology", conf, topology.build()); //StormSubmitter.submitTopology(args[0], conf, topology.build()); }catch (Exception e) {
>              System.err.println(e.getMessage()); }
>      }
> }
>
> I use the TestWordSpout to emit some random words. I can start the 
> topology with no problem, and the topology seem to be able to connect 
> to HDFS without any problems. It does create a file in /test 
> directory, but does not write any content to it. There are no error 
> messages in the log. I created TestHdfsUpdater, which extends 
> HdfsUpdater, which basically just prints some text everytime 
> updateState method is called, but it seems that the method is not even 
> called. Can somebody help me with this?
>
> Thank you!
>