You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Sergio Ryan <se...@potatobeans.id> on 2020/08/24 12:11:23 UTC
partitionPersist with HDFS, does not trigger updateState, only
creates 0 bytes file in HDFS
Hi,
I am trying to create a simple Trident topology to save tuples to HDFS.
Here is my code:
public class Topology {
public static void main(String []args) {new Topology().run(args); }
private void run(String []args) {
FileNameFormat fileNameFormat =new DefaultFileNameFormat()
.withPath("/test")
.withPrefix("sk")
.withExtension(".txt"); RecordFormat recordFormat =new DelimitedRecordFormat()
// .withFieldDelimiter("\t") .withFields(new Fields("word")); FileRotationPolicy rotationPolicy =new FileSizeRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB); HdfsState.Options options =new HdfsState.HdfsFileOptions()
.withFileNameFormat(fileNameFormat)
.withRecordFormat(recordFormat)
.withRotationPolicy(rotationPolicy)
.withFsUrl("hdfs://192.168.0.10:9000"); HdfsStateFactory stateFactory =new HdfsStateFactory().withOptions(options); Config conf =new Config(); TridentTopology topology =new TridentTopology(); TestWordSpout testWordSpout =new TestWordSpout(); Stream stream = topology.newStream("teststream", testWordSpout); stream.partitionPersist(stateFactory, new Fields("word"), new HdfsUpdater(), new Fields()); try {
LocalCluster localCluster =new LocalCluster(); localCluster.submitTopology("test_topology", conf, topology.build()); //StormSubmitter.submitTopology(args[0], conf, topology.build()); }catch (Exception e) {
System.err.println(e.getMessage()); }
}
}
I use the TestWordSpout to emit some random words. I can start the
topology with no problem, and the topology seem to be able to connect to
HDFS without any problems. It does create a file in /test directory, but
does not write any content to it. There are no error messages in the
log. I created TestHdfsUpdater, which extends HdfsUpdater, which
basically just prints some text everytime updateState method is called,
but it seems that the method is not even called. Can somebody help me
with this?
Thank you!
Re: partitionPersist with HDFS, does not trigger updateState, only
creates 0 bytes file in HDFS
Posted by Sergio Ryan <se...@potatobeans.id>.
Sorry for the dumb question, the problem is the spout. I have to use
like FixedBatchSpout which implements IBatchSpout, doh.
On 8/24/20 7:11 PM, Sergio Ryan wrote:
>
> Hi,
>
> I am trying to create a simple Trident topology to save tuples to
> HDFS. Here is my code:
>
> public class Topology {
> public static void main(String []args) {new Topology().run(args); }
>
> private void run(String []args) {
> FileNameFormat fileNameFormat =new DefaultFileNameFormat()
> .withPath("/test")
> .withPrefix("sk")
> .withExtension(".txt"); RecordFormat recordFormat =new DelimitedRecordFormat()
> // .withFieldDelimiter("\t") .withFields(new Fields("word")); FileRotationPolicy rotationPolicy =new FileSizeRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB); HdfsState.Options options =new HdfsState.HdfsFileOptions()
> .withFileNameFormat(fileNameFormat)
> .withRecordFormat(recordFormat)
> .withRotationPolicy(rotationPolicy)
> .withFsUrl("hdfs://192.168.0.10:9000"); HdfsStateFactory stateFactory =new HdfsStateFactory().withOptions(options); Config conf =new Config(); TridentTopology topology =new TridentTopology(); TestWordSpout testWordSpout =new TestWordSpout(); Stream stream = topology.newStream("teststream", testWordSpout); stream.partitionPersist(stateFactory, new Fields("word"), new HdfsUpdater(), new Fields()); try {
> LocalCluster localCluster =new LocalCluster(); localCluster.submitTopology("test_topology", conf, topology.build()); //StormSubmitter.submitTopology(args[0], conf, topology.build()); }catch (Exception e) {
> System.err.println(e.getMessage()); }
> }
> }
>
> I use the TestWordSpout to emit some random words. I can start the
> topology with no problem, and the topology seem to be able to connect
> to HDFS without any problems. It does create a file in /test
> directory, but does not write any content to it. There are no error
> messages in the log. I created TestHdfsUpdater, which extends
> HdfsUpdater, which basically just prints some text everytime
> updateState method is called, but it seems that the method is not even
> called. Can somebody help me with this?
>
> Thank you!
>