You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by krichter722 <gi...@git.apache.org> on 2018/03/19 00:35:13 UTC

[GitHub] storm pull request #2596: storm-elasticsearch-examples: fixed all checkstyle...

GitHub user krichter722 opened a pull request:

    https://github.com/apache/storm/pull/2596

    storm-elasticsearch-examples: fixed all checkstyle warnings

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/krichter722/storm checkstyle-fix-es-examples

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2596.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2596
    
----

----


---

[GitHub] storm pull request #2596: [STORM-3004] storm-elasticsearch-examples: fixed a...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2596#discussion_r176908022
  
    --- Diff: examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java ---
    @@ -34,52 +39,128 @@
     import org.apache.storm.tuple.Fields;
     import org.apache.storm.tuple.Values;
     
    -public class EsIndexTopology {
    +/**
    + * Demonstrates an ElasticSearch Strom topology.
    + * @author unknown
    + */
    +public final class EsIndexTopology {
     
    +    /**
    +     * The id of the used spout.
    +     */
         static final String SPOUT_ID = "spout";
    +    /**
    +     * The id of the used bolt.
    +     */
         static final String BOLT_ID = "bolt";
    +    /**
    +     * The name of the used topology.
    +     */
         static final String TOPOLOGY_NAME = "elasticsearch-test-topology1";
    +    /**
    +     * The number of pending tuples triggering logging.
    +     */
    +    private static final int PENDING_COUNT_MAX = 1000;
     
    -    public static void main(String[] args) throws Exception {
    +    /**
    +     * The example's main method.
    +     * @param args the command line arguments
    +     * @throws AlreadyAliveException if the topology is already started
    +     * @throws InvalidTopologyException if the topology is invalid
    +     * @throws AuthorizationException if the topology authorization fails
    +     */
    +    public static void main(final String[] args) throws AlreadyAliveException,
    +            InvalidTopologyException,
    +            AuthorizationException {
             Config config = new Config();
             config.setNumWorkers(1);
             TopologyBuilder builder = new TopologyBuilder();
             UserDataSpout spout = new UserDataSpout();
             builder.setSpout(SPOUT_ID, spout, 1);
             EsTupleMapper tupleMapper = EsTestUtil.generateDefaultTupleMapper();
             EsConfig esConfig = new EsConfig("http://localhost:9300");
    -        builder.setBolt(BOLT_ID, new EsIndexBolt(esConfig, tupleMapper), 1).shuffleGrouping(SPOUT_ID);
    +        builder.setBolt(BOLT_ID, new EsIndexBolt(esConfig, tupleMapper), 1)
    +                .shuffleGrouping(SPOUT_ID);
     
             EsTestUtil.startEsNode();
    -        EsTestUtil.waitForSeconds(5);
    -        StormSubmitter.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
    +        EsTestUtil.waitForSeconds(EsConstants.WAIT_DEFAULT);
    +        StormSubmitter.submitTopology(TOPOLOGY_NAME,
    +                config,
    +                builder.createTopology());
         }
     
    +    /**
    +     * The user data spout.
    +     */
         public static class UserDataSpout extends BaseRichSpout {
    +        private static final long serialVersionUID = 1L;
    +        /**
    +         * The pending values.
    +         */
             private ConcurrentHashMap<UUID, Values> pending;
    +        /**
    +         * The collector passed in
    +         * {@link #open(java.util.Map, org.apache.storm.task.TopologyContext,
    +         * org.apache.storm.spout.SpoutOutputCollector) }.
    +         */
             private SpoutOutputCollector collector;
    +        /**
    +         * The sources.
    +         */
             private String[] sources = {
    -                "{\"user\":\"user1\"}",
    -                "{\"user\":\"user2\"}",
    -                "{\"user\":\"user3\"}",
    -                "{\"user\":\"user4\"}"
    +            "{\"user\":\"user1\"}",
    +            "{\"user\":\"user2\"}",
    +            "{\"user\":\"user3\"}",
    +            "{\"user\":\"user4\"}"
             };
    +        /**
    +         * The current index.
    +         */
             private int index = 0;
    +        /**
    +         * The current count.
    +         */
             private int count = 0;
    +        /**
    +         * The total.
    +         */
             private long total = 0L;
    +        /**
    +         * The index name.
    +         */
             private String indexName = "index1";
    +        /**
    +         * The type name.
    +         */
             private String typeName = "type1";
     
    -        public void declareOutputFields(OutputFieldsDeclarer declarer) {
    +        /**
    +         * Declares {@code source}, {@code index}, {@code type} and {@code id}.
    +         * @param declarer the declarer to pass to
    +         */
    +        @Override
    +        public void declareOutputFields(final OutputFieldsDeclarer declarer) {
                 declarer.declare(new Fields("source", "index", "type", "id"));
             }
     
    -        public void open(Map<String, Object> config, TopologyContext context,
    -                         SpoutOutputCollector collector) {
    -            this.collector = collector;
    -            this.pending = new ConcurrentHashMap<UUID, Values>();
    +        /**
    +         * Acquires {@code collector} and initializes {@code pending}.
    +         * @param config unused
    +         * @param context unused
    +         * @param collectorArg the collector to acquire
    +         */
    +        @Override
    +        public void open(final Map<String, Object> config,
    +                final TopologyContext context,
    +                final SpoutOutputCollector collectorArg) {
    +            this.collector = collectorArg;
    +            this.pending = new ConcurrentHashMap<>();
             }
     
    +        /**
    +         * Processes the next tuple.
    --- End diff --
    
    Nit: nextTuple makes the spout emit the next tuple, if any. 


---

[GitHub] storm issue #2596: [STORM-3004] storm-elasticsearch-examples: fixed all chec...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2596
  
    @krichter722 kindly reminder.


---

[GitHub] storm pull request #2596: [STORM-3004] storm-elasticsearch-examples: fixed a...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2596#discussion_r176908219
  
    --- Diff: examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java ---
    @@ -53,79 +78,197 @@ public static void main(String[] args) throws Exception {
             Fields esFields = new Fields("index", "type", "source");
             EsTupleMapper tupleMapper = EsTestUtil.generateDefaultTupleMapper();
             StateFactory factory = new EsStateFactory(esConfig, tupleMapper);
    -        TridentState state = stream.partitionPersist(factory, esFields, new EsUpdater(), new Fields());
    +        TridentState state = stream.partitionPersist(factory,
    +                esFields,
    +                new EsUpdater(),
    +                new Fields());
     
             EsTestUtil.startEsNode();
    -        EsTestUtil.waitForSeconds(5);
    +        EsTestUtil.waitForSeconds(EsConstants.WAIT_DEFAULT);
     
    -        StormSubmitter.submitTopology(TOPOLOGY_NAME, new Config(), topology.build());
    +        StormSubmitter.submitTopology(TOPOLOGY_NAME,
    +                new Config(),
    +                topology.build());
         }
     
    +    /**
    +     * A fixed batch spout.
    +     */
         public static class FixedBatchSpout implements IBatchSpout {
    -        int maxBatchSize;
    -        HashMap<Long, List<List<Object>>> batches = new HashMap<Long, List<List<Object>>>();
    +        private static final long serialVersionUID = 1L;
    +        /**
    +         * The maximum batch size.
    +         */
    +        private int maxBatchSize;
    +        /**
    +         * The passed batches.
    +         */
    +        private HashMap<Long, List<List<Object>>> batches = new HashMap<>();
    +        /**
    +         * The output values.
    +         */
             private Values[] outputs = {
    -                new Values("{\"user\":\"user1\"}", "index1", "type1", UUID.randomUUID().toString()),
    -                new Values("{\"user\":\"user2\"}", "index1", "type2", UUID.randomUUID().toString()),
    -                new Values("{\"user\":\"user3\"}", "index2", "type1", UUID.randomUUID().toString()),
    -                new Values("{\"user\":\"user4\"}", "index2", "type2", UUID.randomUUID().toString())
    +            new Values("{\"user\":\"user1\"}",
    +                    "index1",
    +                    "type1",
    +                    UUID.randomUUID().toString()),
    +            new Values("{\"user\":\"user2\"}",
    +                    "index1",
    +                    "type2",
    +                    UUID.randomUUID().toString()),
    +            new Values("{\"user\":\"user3\"}",
    +                    "index2",
    +                    "type1",
    +                    UUID.randomUUID().toString()),
    +            new Values("{\"user\":\"user4\"}",
    +                    "index2",
    +                    "type2",
    +                    UUID.randomUUID().toString())
             };
    +        /**
    +         * The current index.
    +         */
             private int index = 0;
    -        boolean cycle = false;
    +        /**
    +         * A flag indicating whether cycling ought to be performed.
    +         */
    +        private boolean cycle = false;
     
    -        public FixedBatchSpout(int maxBatchSize) {
    -            this.maxBatchSize = maxBatchSize;
    +        /**
    +         * Creates a new fixed batch spout.
    +         * @param maxBatchSizeArg the maximum batch size to set
    +         */
    +        public FixedBatchSpout(final int maxBatchSizeArg) {
    +            this.maxBatchSize = maxBatchSizeArg;
             }
     
    -        public void setCycle(boolean cycle) {
    -            this.cycle = cycle;
    +        /**
    +         * Sets the cycle flag.
    +         * @param cycleArg the cycle flag value
    +         */
    +        public void setCycle(final boolean cycleArg) {
    +            this.cycle = cycleArg;
             }
     
    +        /**
    +         * Gets the output fields.
    +         * @return the output fields.
    +         */
             @Override
             public Fields getOutputFields() {
                 return new Fields("source", "index", "type", "id");
             }
     
    +        /**
    +         * Open the topology.
    +         * @param conf the configuration to use for opening
    +         * @param context the context to use for opening
    +         */
             @Override
    -        public void open(Map<String, Object> conf, TopologyContext context) {
    +        public void open(final Map<String, Object> conf,
    +                final TopologyContext context) {
                 index = 0;
             }
     
    +        /**
    +         * Emits a batch.
    +         * @param batchId the batch id to use
    +         * @param collector the collector to emit to
    +         */
             @Override
    -        public void emitBatch(long batchId, TridentCollector collector) {
    -            List<List<Object>> batch = this.batches.get(batchId);
    +        public void emitBatch(final long batchId,
    +                final TridentCollector collector) {
    +            List<List<Object>> batch = this.getBatches().get(batchId);
                 if (batch == null) {
                     batch = new ArrayList<List<Object>>();
    -                if (index >= outputs.length && cycle) {
    +                if (index >= outputs.length && isCycle()) {
    --- End diff --
    
    Nit: Doesn't seem like there's any reason to use the getter rather than referring to the field?


---

[GitHub] storm pull request #2596: [STORM-3004] storm-elasticsearch-examples: fixed a...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2596#discussion_r176908034
  
    --- Diff: examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java ---
    @@ -92,20 +173,37 @@ public void nextTuple() {
                 }
                 count++;
                 total++;
    -            if (count > 1000) {
    +            if (count > PENDING_COUNT_MAX) {
                     count = 0;
    -                System.out.println("Pending count: " + this.pending.size() + ", total: " + this.total);
    +                System.out.println("Pending count: " + this.pending.size()
    +                        + ", total: " + this.total);
                 }
                 Thread.yield();
             }
     
    -        public void ack(Object msgId) {
    +        /**
    +         * Acknoledges the message with id {@code msgId}.
    +         * @param msgId the message id
    +         */
    +        @Override
    +        public void ack(final Object msgId) {
                 this.pending.remove(msgId);
             }
     
    -        public void fail(Object msgId) {
    +        /**
    +         * Markes the message with id {@code msgId} as failed.
    --- End diff --
    
    Marks


---

[GitHub] storm pull request #2596: [STORM-3004] storm-elasticsearch-examples: fixed a...

Posted by krichter722 <gi...@git.apache.org>.
Github user krichter722 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2596#discussion_r177086700
  
    --- Diff: examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java ---
    @@ -92,20 +173,37 @@ public void nextTuple() {
                 }
                 count++;
                 total++;
    -            if (count > 1000) {
    +            if (count > PENDING_COUNT_MAX) {
                     count = 0;
    -                System.out.println("Pending count: " + this.pending.size() + ", total: " + this.total);
    +                System.out.println("Pending count: " + this.pending.size()
    +                        + ", total: " + this.total);
                 }
                 Thread.yield();
             }
     
    -        public void ack(Object msgId) {
    +        /**
    +         * Acknoledges the message with id {@code msgId}.
    +         * @param msgId the message id
    +         */
    +        @Override
    +        public void ack(final Object msgId) {
                 this.pending.remove(msgId);
             }
     
    -        public void fail(Object msgId) {
    +        /**
    +         * Markes the message with id {@code msgId} as failed.
    +         * @param msgId the message id
    +         */
    +        @Override
    +        public void fail(final Object msgId) {
                 System.out.println("**** RESENDING FAILED TUPLE");
                 this.collector.emit(this.pending.get(msgId), msgId);
             }
         }
    +
    +    /**
    +     * Utility constructor.
    --- End diff --
    
    https://stackoverflow.com/questions/31487912/netbeans-warning-utility-class-without-constructor


---

[GitHub] storm pull request #2596: [STORM-3004] storm-elasticsearch-examples: fixed a...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2596#discussion_r176907879
  
    --- Diff: examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java ---
    @@ -34,52 +39,128 @@
     import org.apache.storm.tuple.Fields;
     import org.apache.storm.tuple.Values;
     
    -public class EsIndexTopology {
    +/**
    + * Demonstrates an ElasticSearch Strom topology.
    --- End diff --
    
    Strom -> Storm


---

[GitHub] storm pull request #2596: [STORM-3004] storm-elasticsearch-examples: fixed a...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2596#discussion_r177155194
  
    --- Diff: external/storm-opentsdb/pom.xml ---
    @@ -1,3 +1,4 @@
    +
    --- End diff --
    
    This change seems unnecessary.


---

[GitHub] storm pull request #2596: [STORM-3004] storm-elasticsearch-examples: fixed a...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2596#discussion_r176907913
  
    --- Diff: examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java ---
    @@ -34,52 +39,128 @@
     import org.apache.storm.tuple.Fields;
     import org.apache.storm.tuple.Values;
     
    -public class EsIndexTopology {
    +/**
    + * Demonstrates an ElasticSearch Strom topology.
    + * @author unknown
    --- End diff --
    
    We generally don't use `@author` in the javadoc, it's just noise since most files are worked on by more than one person. Could you remove it here and elsewhere?


---

[GitHub] storm pull request #2596: [STORM-3004] storm-elasticsearch-examples: fixed a...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2596#discussion_r176944098
  
    --- Diff: examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java ---
    @@ -34,52 +39,128 @@
     import org.apache.storm.tuple.Fields;
     import org.apache.storm.tuple.Values;
     
    -public class EsIndexTopology {
    +/**
    + * Demonstrates an ElasticSearch Strom topology.
    + * @author unknown
    + */
    +public final class EsIndexTopology {
     
    +    /**
    +     * The id of the used spout.
    +     */
         static final String SPOUT_ID = "spout";
    +    /**
    +     * The id of the used bolt.
    +     */
         static final String BOLT_ID = "bolt";
    +    /**
    +     * The name of the used topology.
    +     */
         static final String TOPOLOGY_NAME = "elasticsearch-test-topology1";
    +    /**
    +     * The number of pending tuples triggering logging.
    +     */
    +    private static final int PENDING_COUNT_MAX = 1000;
     
    -    public static void main(String[] args) throws Exception {
    +    /**
    +     * The example's main method.
    +     * @param args the command line arguments
    +     * @throws AlreadyAliveException if the topology is already started
    +     * @throws InvalidTopologyException if the topology is invalid
    +     * @throws AuthorizationException if the topology authorization fails
    +     */
    +    public static void main(final String[] args) throws AlreadyAliveException,
    +            InvalidTopologyException,
    +            AuthorizationException {
             Config config = new Config();
             config.setNumWorkers(1);
             TopologyBuilder builder = new TopologyBuilder();
             UserDataSpout spout = new UserDataSpout();
             builder.setSpout(SPOUT_ID, spout, 1);
             EsTupleMapper tupleMapper = EsTestUtil.generateDefaultTupleMapper();
             EsConfig esConfig = new EsConfig("http://localhost:9300");
    -        builder.setBolt(BOLT_ID, new EsIndexBolt(esConfig, tupleMapper), 1).shuffleGrouping(SPOUT_ID);
    +        builder.setBolt(BOLT_ID, new EsIndexBolt(esConfig, tupleMapper), 1)
    +                .shuffleGrouping(SPOUT_ID);
     
             EsTestUtil.startEsNode();
    -        EsTestUtil.waitForSeconds(5);
    -        StormSubmitter.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
    +        EsTestUtil.waitForSeconds(EsConstants.WAIT_DEFAULT);
    +        StormSubmitter.submitTopology(TOPOLOGY_NAME,
    +                config,
    +                builder.createTopology());
         }
     
    +    /**
    +     * The user data spout.
    +     */
         public static class UserDataSpout extends BaseRichSpout {
    +        private static final long serialVersionUID = 1L;
    +        /**
    +         * The pending values.
    +         */
             private ConcurrentHashMap<UUID, Values> pending;
    +        /**
    +         * The collector passed in
    +         * {@link #open(java.util.Map, org.apache.storm.task.TopologyContext,
    +         * org.apache.storm.spout.SpoutOutputCollector) }.
    +         */
             private SpoutOutputCollector collector;
    +        /**
    +         * The sources.
    +         */
             private String[] sources = {
    -                "{\"user\":\"user1\"}",
    -                "{\"user\":\"user2\"}",
    -                "{\"user\":\"user3\"}",
    -                "{\"user\":\"user4\"}"
    +            "{\"user\":\"user1\"}",
    +            "{\"user\":\"user2\"}",
    +            "{\"user\":\"user3\"}",
    +            "{\"user\":\"user4\"}"
             };
    +        /**
    --- End diff --
    
    I checked, and checkstyle passes fine for me without these comments. As far as I can tell, comments are required on public classes, public methods that don't override supertype methods, and public constructors/variables.
    
    For example, this file works for me with only comments on the EsIndexTopology class and the main method. All the others can be removed without violating the checkstyle check.
    
    Does this not work for you?


---

[GitHub] storm issue #2596: [STORM-3004] storm-elasticsearch-examples: fixed all chec...

Posted by krichter722 <gi...@git.apache.org>.
Github user krichter722 commented on the issue:

    https://github.com/apache/storm/pull/2596
  
    > Thanks for the fixes. I'll review this as soon as I can. In the meantime, could you raise issues on https://issues.apache.org/jira to track your changes?
    
    Nice. Done.


---

[GitHub] storm pull request #2596: [STORM-3004] storm-elasticsearch-examples: fixed a...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2596#discussion_r177155543
  
    --- Diff: examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java ---
    @@ -92,20 +173,37 @@ public void nextTuple() {
                 }
                 count++;
                 total++;
    -            if (count > 1000) {
    +            if (count > PENDING_COUNT_MAX) {
                     count = 0;
    -                System.out.println("Pending count: " + this.pending.size() + ", total: " + this.total);
    +                System.out.println("Pending count: " + this.pending.size()
    +                        + ", total: " + this.total);
                 }
                 Thread.yield();
             }
     
    -        public void ack(Object msgId) {
    +        /**
    +         * Acknoledges the message with id {@code msgId}.
    +         * @param msgId the message id
    +         */
    +        @Override
    +        public void ack(final Object msgId) {
                 this.pending.remove(msgId);
             }
     
    -        public void fail(Object msgId) {
    +        /**
    +         * Markes the message with id {@code msgId} as failed.
    +         * @param msgId the message id
    +         */
    +        @Override
    +        public void fail(final Object msgId) {
                 System.out.println("**** RESENDING FAILED TUPLE");
                 this.collector.emit(this.pending.get(msgId), msgId);
             }
         }
    +
    +    /**
    +     * Utility constructor.
    --- End diff --
    
    Thanks for the explanation, that makes sense. Could you change the comment to e.g. "Prevent instantiation" or something similar, so people who run across this later will know why it's there?


---

[GitHub] storm pull request #2596: [STORM-3004] storm-elasticsearch-examples: fixed a...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2596#discussion_r176908208
  
    --- Diff: examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java ---
    @@ -53,79 +78,197 @@ public static void main(String[] args) throws Exception {
             Fields esFields = new Fields("index", "type", "source");
             EsTupleMapper tupleMapper = EsTestUtil.generateDefaultTupleMapper();
             StateFactory factory = new EsStateFactory(esConfig, tupleMapper);
    -        TridentState state = stream.partitionPersist(factory, esFields, new EsUpdater(), new Fields());
    +        TridentState state = stream.partitionPersist(factory,
    +                esFields,
    +                new EsUpdater(),
    +                new Fields());
     
             EsTestUtil.startEsNode();
    -        EsTestUtil.waitForSeconds(5);
    +        EsTestUtil.waitForSeconds(EsConstants.WAIT_DEFAULT);
     
    -        StormSubmitter.submitTopology(TOPOLOGY_NAME, new Config(), topology.build());
    +        StormSubmitter.submitTopology(TOPOLOGY_NAME,
    +                new Config(),
    +                topology.build());
         }
     
    +    /**
    +     * A fixed batch spout.
    +     */
         public static class FixedBatchSpout implements IBatchSpout {
    -        int maxBatchSize;
    -        HashMap<Long, List<List<Object>>> batches = new HashMap<Long, List<List<Object>>>();
    +        private static final long serialVersionUID = 1L;
    +        /**
    +         * The maximum batch size.
    +         */
    +        private int maxBatchSize;
    +        /**
    +         * The passed batches.
    +         */
    +        private HashMap<Long, List<List<Object>>> batches = new HashMap<>();
    +        /**
    +         * The output values.
    +         */
             private Values[] outputs = {
    -                new Values("{\"user\":\"user1\"}", "index1", "type1", UUID.randomUUID().toString()),
    -                new Values("{\"user\":\"user2\"}", "index1", "type2", UUID.randomUUID().toString()),
    -                new Values("{\"user\":\"user3\"}", "index2", "type1", UUID.randomUUID().toString()),
    -                new Values("{\"user\":\"user4\"}", "index2", "type2", UUID.randomUUID().toString())
    +            new Values("{\"user\":\"user1\"}",
    +                    "index1",
    +                    "type1",
    +                    UUID.randomUUID().toString()),
    +            new Values("{\"user\":\"user2\"}",
    +                    "index1",
    +                    "type2",
    +                    UUID.randomUUID().toString()),
    +            new Values("{\"user\":\"user3\"}",
    +                    "index2",
    +                    "type1",
    +                    UUID.randomUUID().toString()),
    +            new Values("{\"user\":\"user4\"}",
    +                    "index2",
    +                    "type2",
    +                    UUID.randomUUID().toString())
             };
    +        /**
    +         * The current index.
    +         */
             private int index = 0;
    -        boolean cycle = false;
    +        /**
    +         * A flag indicating whether cycling ought to be performed.
    +         */
    +        private boolean cycle = false;
     
    -        public FixedBatchSpout(int maxBatchSize) {
    -            this.maxBatchSize = maxBatchSize;
    +        /**
    +         * Creates a new fixed batch spout.
    +         * @param maxBatchSizeArg the maximum batch size to set
    +         */
    +        public FixedBatchSpout(final int maxBatchSizeArg) {
    +            this.maxBatchSize = maxBatchSizeArg;
             }
     
    -        public void setCycle(boolean cycle) {
    -            this.cycle = cycle;
    +        /**
    +         * Sets the cycle flag.
    +         * @param cycleArg the cycle flag value
    +         */
    +        public void setCycle(final boolean cycleArg) {
    +            this.cycle = cycleArg;
             }
     
    +        /**
    +         * Gets the output fields.
    +         * @return the output fields.
    +         */
             @Override
             public Fields getOutputFields() {
                 return new Fields("source", "index", "type", "id");
             }
     
    +        /**
    +         * Open the topology.
    +         * @param conf the configuration to use for opening
    +         * @param context the context to use for opening
    +         */
             @Override
    -        public void open(Map<String, Object> conf, TopologyContext context) {
    +        public void open(final Map<String, Object> conf,
    +                final TopologyContext context) {
                 index = 0;
             }
     
    +        /**
    +         * Emits a batch.
    +         * @param batchId the batch id to use
    +         * @param collector the collector to emit to
    +         */
             @Override
    -        public void emitBatch(long batchId, TridentCollector collector) {
    -            List<List<Object>> batch = this.batches.get(batchId);
    +        public void emitBatch(final long batchId,
    +                final TridentCollector collector) {
    +            List<List<Object>> batch = this.getBatches().get(batchId);
                 if (batch == null) {
                     batch = new ArrayList<List<Object>>();
    -                if (index >= outputs.length && cycle) {
    +                if (index >= outputs.length && isCycle()) {
                         index = 0;
                     }
    -                for (int i = 0; i < maxBatchSize; index++, i++) {
    +                for (int i = 0; i < getMaxBatchSize(); index++, i++) {
                         if (index == outputs.length) {
                             index = 0;
                         }
                         batch.add(outputs[index]);
                     }
    -                this.batches.put(batchId, batch);
    +                this.getBatches().put(batchId, batch);
                 }
                 for (List<Object> list : batch) {
                     collector.emit(list);
                 }
             }
     
    +        /**
    +         * Acknoledges the message with id {@code msgId}.
    --- End diff --
    
    Acknowledges


---

[GitHub] storm issue #2596: [STORM-3004] storm-elasticsearch-examples: fixed all chec...

Posted by krichter722 <gi...@git.apache.org>.
Github user krichter722 commented on the issue:

    https://github.com/apache/storm/pull/2596
  
    I agree with all your comments and removed the `TridentEsTopology.get/setMaxBatchSize`. The noisy comments are required (see comment above). Thanks for the review.


---

[GitHub] storm pull request #2596: [STORM-3004] storm-elasticsearch-examples: fixed a...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2596#discussion_r176908038
  
    --- Diff: examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java ---
    @@ -92,20 +173,37 @@ public void nextTuple() {
                 }
                 count++;
                 total++;
    -            if (count > 1000) {
    +            if (count > PENDING_COUNT_MAX) {
                     count = 0;
    -                System.out.println("Pending count: " + this.pending.size() + ", total: " + this.total);
    +                System.out.println("Pending count: " + this.pending.size()
    +                        + ", total: " + this.total);
                 }
                 Thread.yield();
             }
     
    -        public void ack(Object msgId) {
    +        /**
    +         * Acknoledges the message with id {@code msgId}.
    +         * @param msgId the message id
    +         */
    +        @Override
    +        public void ack(final Object msgId) {
                 this.pending.remove(msgId);
             }
     
    -        public void fail(Object msgId) {
    +        /**
    +         * Markes the message with id {@code msgId} as failed.
    +         * @param msgId the message id
    +         */
    +        @Override
    +        public void fail(final Object msgId) {
                 System.out.println("**** RESENDING FAILED TUPLE");
                 this.collector.emit(this.pending.get(msgId), msgId);
             }
         }
    +
    +    /**
    +     * Utility constructor.
    --- End diff --
    
    What does this mean/what is this for?


---

[GitHub] storm pull request #2596: [STORM-3004] storm-elasticsearch-examples: fixed a...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2596#discussion_r176907925
  
    --- Diff: examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java ---
    @@ -34,52 +39,128 @@
     import org.apache.storm.tuple.Fields;
     import org.apache.storm.tuple.Values;
     
    -public class EsIndexTopology {
    +/**
    + * Demonstrates an ElasticSearch Strom topology.
    + * @author unknown
    + */
    +public final class EsIndexTopology {
     
    +    /**
    +     * The id of the used spout.
    +     */
         static final String SPOUT_ID = "spout";
    --- End diff --
    
    I think these should be private, that way checkstyle hopefully won't complain about missing comments.


---

[GitHub] storm pull request #2596: [STORM-3004] storm-elasticsearch-examples: fixed a...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2596#discussion_r176908172
  
    --- Diff: examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java ---
    @@ -38,12 +43,32 @@
     import org.apache.storm.tuple.Fields;
     import org.apache.storm.tuple.Values;
     
    -public class TridentEsTopology {
    -
    -    static final String TOPOLOGY_NAME = "elasticsearch-test-topology2";
    +/**
    + * A Trident topology example.
    + * @author unknown
    + */
    +public final class TridentEsTopology {
    +    /**
    +     * The default batch size. Necessary in order to avoid magic number
    --- End diff --
    
    Nit: I don't think you need to mention why the field is here. Won't checkstyle complain if someone replaces the field with a magic number again?


---

[GitHub] storm pull request #2596: [STORM-3004] storm-elasticsearch-examples: fixed a...

Posted by krichter722 <gi...@git.apache.org>.
Github user krichter722 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2596#discussion_r176943570
  
    --- Diff: examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java ---
    @@ -34,52 +39,128 @@
     import org.apache.storm.tuple.Fields;
     import org.apache.storm.tuple.Values;
     
    -public class EsIndexTopology {
    +/**
    + * Demonstrates an ElasticSearch Strom topology.
    + * @author unknown
    + */
    +public final class EsIndexTopology {
     
    +    /**
    +     * The id of the used spout.
    +     */
         static final String SPOUT_ID = "spout";
    +    /**
    +     * The id of the used bolt.
    +     */
         static final String BOLT_ID = "bolt";
    +    /**
    +     * The name of the used topology.
    +     */
         static final String TOPOLOGY_NAME = "elasticsearch-test-topology1";
    +    /**
    +     * The number of pending tuples triggering logging.
    +     */
    +    private static final int PENDING_COUNT_MAX = 1000;
     
    -    public static void main(String[] args) throws Exception {
    +    /**
    +     * The example's main method.
    +     * @param args the command line arguments
    +     * @throws AlreadyAliveException if the topology is already started
    +     * @throws InvalidTopologyException if the topology is invalid
    +     * @throws AuthorizationException if the topology authorization fails
    +     */
    +    public static void main(final String[] args) throws AlreadyAliveException,
    +            InvalidTopologyException,
    +            AuthorizationException {
             Config config = new Config();
             config.setNumWorkers(1);
             TopologyBuilder builder = new TopologyBuilder();
             UserDataSpout spout = new UserDataSpout();
             builder.setSpout(SPOUT_ID, spout, 1);
             EsTupleMapper tupleMapper = EsTestUtil.generateDefaultTupleMapper();
             EsConfig esConfig = new EsConfig("http://localhost:9300");
    -        builder.setBolt(BOLT_ID, new EsIndexBolt(esConfig, tupleMapper), 1).shuffleGrouping(SPOUT_ID);
    +        builder.setBolt(BOLT_ID, new EsIndexBolt(esConfig, tupleMapper), 1)
    +                .shuffleGrouping(SPOUT_ID);
     
             EsTestUtil.startEsNode();
    -        EsTestUtil.waitForSeconds(5);
    -        StormSubmitter.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
    +        EsTestUtil.waitForSeconds(EsConstants.WAIT_DEFAULT);
    +        StormSubmitter.submitTopology(TOPOLOGY_NAME,
    +                config,
    +                builder.createTopology());
         }
     
    +    /**
    +     * The user data spout.
    +     */
         public static class UserDataSpout extends BaseRichSpout {
    +        private static final long serialVersionUID = 1L;
    +        /**
    +         * The pending values.
    +         */
             private ConcurrentHashMap<UUID, Values> pending;
    +        /**
    +         * The collector passed in
    +         * {@link #open(java.util.Map, org.apache.storm.task.TopologyContext,
    +         * org.apache.storm.spout.SpoutOutputCollector) }.
    +         */
             private SpoutOutputCollector collector;
    +        /**
    +         * The sources.
    +         */
             private String[] sources = {
    -                "{\"user\":\"user1\"}",
    -                "{\"user\":\"user2\"}",
    -                "{\"user\":\"user3\"}",
    -                "{\"user\":\"user4\"}"
    +            "{\"user\":\"user1\"}",
    +            "{\"user\":\"user2\"}",
    +            "{\"user\":\"user3\"}",
    +            "{\"user\":\"user4\"}"
             };
    +        /**
    --- End diff --
    
    Yes, the current setup requires comments on almost everything. I agree that they almost no use. Therefore this PR is a good usability check for the current extremely strict checkstyle configuration.


---

[GitHub] storm issue #2596: [STORM-3004] storm-elasticsearch-examples: fixed all chec...

Posted by krichter722 <gi...@git.apache.org>.
Github user krichter722 commented on the issue:

    https://github.com/apache/storm/pull/2596
  
    Last comments incorporated.


---

[GitHub] storm pull request #2596: [STORM-3004] storm-elasticsearch-examples: fixed a...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2596#discussion_r176908105
  
    --- Diff: examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/common/EsConstants.java ---
    @@ -15,8 +15,28 @@
      * See the License for the specific language governing permissions and
      * limitations under the License.
      */
    +
     package org.apache.storm.elasticsearch.common;
     
    -public class EsConstants {
    -    public static String clusterName = "test-cluster";
    +/**
    + * Constants used in ElasticSearch examples.
    + * @author richter
    + */
    +public final class EsConstants {
    +
    +    /**
    +     * The cluster name.
    +     */
    +    public static final String CLUSTER_NAME = "test-cluster";
    +    /**
    +     * The default wait value in seconds. Necessary in order to avoid magic
    +     * number anti-pattern.
    +     */
    +    public static final int WAIT_DEFAULT = 5;
    --- End diff --
    
    Nit: Putting this in a field is a good change. Could you rename the field so it contains the unit (e.g. WAIT_DEFAULT_SECS)?


---

[GitHub] storm issue #2596: [STORM-3004] storm-elasticsearch-examples: fixed all chec...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2596
  
    +1, thanks for being patient. @krichter722 could you add the issue number to the commit message, I'll merge after that. I'd add it myself, but then the PR won't show as merged.


---

[GitHub] storm issue #2596: [STORM-3004] storm-elasticsearch-examples: fixed all chec...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2596
  
    @krichter722 Thanks, merged to master. I also added you to the contributors list in JIRA, so you should be able to assign tasks to yourself now.


---

[GitHub] storm pull request #2596: [STORM-3004] storm-elasticsearch-examples: fixed a...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2596#discussion_r176907985
  
    --- Diff: examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java ---
    @@ -34,52 +39,128 @@
     import org.apache.storm.tuple.Fields;
     import org.apache.storm.tuple.Values;
     
    -public class EsIndexTopology {
    +/**
    + * Demonstrates an ElasticSearch Strom topology.
    + * @author unknown
    + */
    +public final class EsIndexTopology {
     
    +    /**
    +     * The id of the used spout.
    +     */
         static final String SPOUT_ID = "spout";
    +    /**
    +     * The id of the used bolt.
    +     */
         static final String BOLT_ID = "bolt";
    +    /**
    +     * The name of the used topology.
    +     */
         static final String TOPOLOGY_NAME = "elasticsearch-test-topology1";
    +    /**
    +     * The number of pending tuples triggering logging.
    +     */
    +    private static final int PENDING_COUNT_MAX = 1000;
     
    -    public static void main(String[] args) throws Exception {
    +    /**
    +     * The example's main method.
    +     * @param args the command line arguments
    +     * @throws AlreadyAliveException if the topology is already started
    +     * @throws InvalidTopologyException if the topology is invalid
    +     * @throws AuthorizationException if the topology authorization fails
    +     */
    +    public static void main(final String[] args) throws AlreadyAliveException,
    +            InvalidTopologyException,
    +            AuthorizationException {
             Config config = new Config();
             config.setNumWorkers(1);
             TopologyBuilder builder = new TopologyBuilder();
             UserDataSpout spout = new UserDataSpout();
             builder.setSpout(SPOUT_ID, spout, 1);
             EsTupleMapper tupleMapper = EsTestUtil.generateDefaultTupleMapper();
             EsConfig esConfig = new EsConfig("http://localhost:9300");
    -        builder.setBolt(BOLT_ID, new EsIndexBolt(esConfig, tupleMapper), 1).shuffleGrouping(SPOUT_ID);
    +        builder.setBolt(BOLT_ID, new EsIndexBolt(esConfig, tupleMapper), 1)
    +                .shuffleGrouping(SPOUT_ID);
     
             EsTestUtil.startEsNode();
    -        EsTestUtil.waitForSeconds(5);
    -        StormSubmitter.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
    +        EsTestUtil.waitForSeconds(EsConstants.WAIT_DEFAULT);
    +        StormSubmitter.submitTopology(TOPOLOGY_NAME,
    +                config,
    +                builder.createTopology());
         }
     
    +    /**
    +     * The user data spout.
    +     */
         public static class UserDataSpout extends BaseRichSpout {
    +        private static final long serialVersionUID = 1L;
    +        /**
    +         * The pending values.
    +         */
             private ConcurrentHashMap<UUID, Values> pending;
    +        /**
    +         * The collector passed in
    +         * {@link #open(java.util.Map, org.apache.storm.task.TopologyContext,
    +         * org.apache.storm.spout.SpoutOutputCollector) }.
    +         */
             private SpoutOutputCollector collector;
    +        /**
    +         * The sources.
    +         */
             private String[] sources = {
    -                "{\"user\":\"user1\"}",
    -                "{\"user\":\"user2\"}",
    -                "{\"user\":\"user3\"}",
    -                "{\"user\":\"user4\"}"
    +            "{\"user\":\"user1\"}",
    +            "{\"user\":\"user2\"}",
    +            "{\"user\":\"user3\"}",
    +            "{\"user\":\"user4\"}"
             };
    +        /**
    --- End diff --
    
    Is checkstyle requiring these comments? If not I'd rather we don't have them. Comments that are just "The $variable_name" are just noise.


---

[GitHub] storm pull request #2596: [STORM-3004] storm-elasticsearch-examples: fixed a...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/2596


---

[GitHub] storm issue #2596: storm-elasticsearch-examples: fixed all checkstyle warnin...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2596
  
    @krichter722 Thanks for the fixes. I'll review this as soon as I can. In the meantime, could you raise issues on https://issues.apache.org/jira to track your changes? It makes it easier for us to track which branches fixes have been applied to, and we use the JIRA issues to generate release notes. Please also rename the PR and commit to contain the JIRA issue number. Thanks :)


---

[GitHub] storm pull request #2596: [STORM-3004] storm-elasticsearch-examples: fixed a...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2596#discussion_r176908031
  
    --- Diff: examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java ---
    @@ -92,20 +173,37 @@ public void nextTuple() {
                 }
                 count++;
                 total++;
    -            if (count > 1000) {
    +            if (count > PENDING_COUNT_MAX) {
                     count = 0;
    -                System.out.println("Pending count: " + this.pending.size() + ", total: " + this.total);
    +                System.out.println("Pending count: " + this.pending.size()
    +                        + ", total: " + this.total);
                 }
                 Thread.yield();
             }
     
    -        public void ack(Object msgId) {
    +        /**
    +         * Acknoledges the message with id {@code msgId}.
    --- End diff --
    
    Acknowledges


---

[GitHub] storm pull request #2596: [STORM-3004] storm-elasticsearch-examples: fixed a...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2596#discussion_r176908239
  
    --- Diff: examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java ---
    @@ -53,79 +78,197 @@ public static void main(String[] args) throws Exception {
             Fields esFields = new Fields("index", "type", "source");
             EsTupleMapper tupleMapper = EsTestUtil.generateDefaultTupleMapper();
             StateFactory factory = new EsStateFactory(esConfig, tupleMapper);
    -        TridentState state = stream.partitionPersist(factory, esFields, new EsUpdater(), new Fields());
    +        TridentState state = stream.partitionPersist(factory,
    +                esFields,
    +                new EsUpdater(),
    +                new Fields());
     
             EsTestUtil.startEsNode();
    -        EsTestUtil.waitForSeconds(5);
    +        EsTestUtil.waitForSeconds(EsConstants.WAIT_DEFAULT);
     
    -        StormSubmitter.submitTopology(TOPOLOGY_NAME, new Config(), topology.build());
    +        StormSubmitter.submitTopology(TOPOLOGY_NAME,
    +                new Config(),
    +                topology.build());
         }
     
    +    /**
    +     * A fixed batch spout.
    +     */
         public static class FixedBatchSpout implements IBatchSpout {
    -        int maxBatchSize;
    -        HashMap<Long, List<List<Object>>> batches = new HashMap<Long, List<List<Object>>>();
    +        private static final long serialVersionUID = 1L;
    +        /**
    +         * The maximum batch size.
    +         */
    +        private int maxBatchSize;
    +        /**
    +         * The passed batches.
    +         */
    +        private HashMap<Long, List<List<Object>>> batches = new HashMap<>();
    +        /**
    +         * The output values.
    +         */
             private Values[] outputs = {
    -                new Values("{\"user\":\"user1\"}", "index1", "type1", UUID.randomUUID().toString()),
    -                new Values("{\"user\":\"user2\"}", "index1", "type2", UUID.randomUUID().toString()),
    -                new Values("{\"user\":\"user3\"}", "index2", "type1", UUID.randomUUID().toString()),
    -                new Values("{\"user\":\"user4\"}", "index2", "type2", UUID.randomUUID().toString())
    +            new Values("{\"user\":\"user1\"}",
    +                    "index1",
    +                    "type1",
    +                    UUID.randomUUID().toString()),
    +            new Values("{\"user\":\"user2\"}",
    +                    "index1",
    +                    "type2",
    +                    UUID.randomUUID().toString()),
    +            new Values("{\"user\":\"user3\"}",
    +                    "index2",
    +                    "type1",
    +                    UUID.randomUUID().toString()),
    +            new Values("{\"user\":\"user4\"}",
    +                    "index2",
    +                    "type2",
    +                    UUID.randomUUID().toString())
             };
    +        /**
    +         * The current index.
    +         */
             private int index = 0;
    -        boolean cycle = false;
    +        /**
    +         * A flag indicating whether cycling ought to be performed.
    +         */
    +        private boolean cycle = false;
     
    -        public FixedBatchSpout(int maxBatchSize) {
    -            this.maxBatchSize = maxBatchSize;
    +        /**
    +         * Creates a new fixed batch spout.
    +         * @param maxBatchSizeArg the maximum batch size to set
    +         */
    +        public FixedBatchSpout(final int maxBatchSizeArg) {
    +            this.maxBatchSize = maxBatchSizeArg;
             }
     
    -        public void setCycle(boolean cycle) {
    -            this.cycle = cycle;
    +        /**
    +         * Sets the cycle flag.
    +         * @param cycleArg the cycle flag value
    +         */
    +        public void setCycle(final boolean cycleArg) {
    +            this.cycle = cycleArg;
             }
     
    +        /**
    +         * Gets the output fields.
    +         * @return the output fields.
    +         */
             @Override
             public Fields getOutputFields() {
                 return new Fields("source", "index", "type", "id");
             }
     
    +        /**
    +         * Open the topology.
    +         * @param conf the configuration to use for opening
    +         * @param context the context to use for opening
    +         */
             @Override
    -        public void open(Map<String, Object> conf, TopologyContext context) {
    +        public void open(final Map<String, Object> conf,
    +                final TopologyContext context) {
                 index = 0;
             }
     
    +        /**
    +         * Emits a batch.
    +         * @param batchId the batch id to use
    +         * @param collector the collector to emit to
    +         */
             @Override
    -        public void emitBatch(long batchId, TridentCollector collector) {
    -            List<List<Object>> batch = this.batches.get(batchId);
    +        public void emitBatch(final long batchId,
    +                final TridentCollector collector) {
    +            List<List<Object>> batch = this.getBatches().get(batchId);
                 if (batch == null) {
                     batch = new ArrayList<List<Object>>();
    -                if (index >= outputs.length && cycle) {
    +                if (index >= outputs.length && isCycle()) {
                         index = 0;
                     }
    -                for (int i = 0; i < maxBatchSize; index++, i++) {
    +                for (int i = 0; i < getMaxBatchSize(); index++, i++) {
                         if (index == outputs.length) {
                             index = 0;
                         }
                         batch.add(outputs[index]);
                     }
    -                this.batches.put(batchId, batch);
    +                this.getBatches().put(batchId, batch);
                 }
                 for (List<Object> list : batch) {
                     collector.emit(list);
                 }
             }
     
    +        /**
    +         * Acknoledges the message with id {@code msgId}.
    +         * @param batchId the message id
    +         */
             @Override
    -        public void ack(long batchId) {
    -            this.batches.remove(batchId);
    +        public void ack(final long batchId) {
    +            this.getBatches().remove(batchId);
             }
     
    +        /**
    +         * Close the spout.
    +         */
             @Override
             public void close() {
             }
     
    +        /**
    +         * Get the component configuration.
    +         * @return the component configuration
    +         */
             @Override
             public Map<String, Object> getComponentConfiguration() {
                 Config conf = new Config();
                 conf.setMaxTaskParallelism(1);
                 return conf;
             }
    +
    +        /**
    +         * Get the maximum batch size.
    +         * @return the maxBatchSize
    +         */
    +        public int getMaxBatchSize() {
    --- End diff --
    
    A little unsure why we need these getters and setters?


---

[GitHub] storm pull request #2596: [STORM-3004] storm-elasticsearch-examples: fixed a...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2596#discussion_r176908124
  
    --- Diff: examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/common/EsTestUtil.java ---
    @@ -33,43 +34,94 @@
     import org.elasticsearch.node.Node;
     import org.elasticsearch.node.NodeBuilder;
     
    -public class EsTestUtil {
    -    public static Tuple generateTestTuple(String source, String index, String type, String id) {
    +/**
    + * ElasticSearch example utilities.
    + * @author richter
    + */
    +public final class EsTestUtil {
    +    /**
    +     * The number of milliseconds in a second (necessary in order to avoid magic
    +     * number anti-pattern.
    +     */
    +    private static final int MILLIS_IN_SECOND = 1000;
    +
    +    /**
    +     * Generates a test tuple.
    +     * @param source the source of the tuple
    +     * @param index the index of the tuple
    +     * @param type the type of the tuple
    +     * @param id the id of the tuple
    +     * @return the generated tuple
    +     */
    +    public static Tuple generateTestTuple(final String source,
    +            final String index,
    +            final String type,
    +            final String id) {
             TopologyBuilder builder = new TopologyBuilder();
    -        GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(),
    -                new Config(), new HashMap(), new HashMap(), new HashMap(), "") {
    +        GeneralTopologyContext topologyContext = new GeneralTopologyContext(
    +                builder.createTopology(),
    +                new Config(),
    +                new HashMap<>(),
    +                new HashMap<>(),
    +                new HashMap<>(),
    +                "") {
                 @Override
    -            public Fields getComponentOutputFields(String componentId, String streamId) {
    +            public Fields getComponentOutputFields(final String componentId,
    +                    final String streamId) {
                     return new Fields("source", "index", "type", "id");
                 }
             };
    -        return new TupleImpl(topologyContext, new Values(source, index, type, id), source, 1, "");
    +        return new TupleImpl(topologyContext,
    +                new Values(source, index, type, id),
    +                source,
    +                1,
    +                "");
         }
     
    +    /**
    +     * Generates a new tuple mapper.
    +     * @return the generated mapper
    +     */
         public static EsTupleMapper generateDefaultTupleMapper() {
             return new DefaultEsTupleMapper();
         }
     
    -    public static Node startEsNode(){
    +    /**
    +     * Starts an ElasticSearch node.
    +     * @return the started node.
    +     */
    +    public static Node startEsNode() {
             Node node = NodeBuilder.nodeBuilder().data(true).settings(
                     Settings.settingsBuilder()
    -                        .put(ClusterName.SETTING, EsConstants.clusterName)
    +                        .put(ClusterName.SETTING, EsConstants.CLUSTER_NAME)
                             .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
                             .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
                             .put(EsExecutors.PROCESSORS, 1)
                             .put("http.enabled", true)
    -                        .put("index.percolator.map_unmapped_fields_as_string", true)
    +                        .put("index.percolator.map_unmapped_fields_as_string",
    +                                true)
                             .put("index.store.type", "mmapfs")
                             .put("path.home", "./data")
             ).build();
             node.start();
             return node;
         }
     
    -    public static void waitForSeconds(int seconds) {
    +    /**
    +     * Waits for specified seconds and ignores {@link InterruptedException}.
    +     * @param seconds the seconds to wait
    +     */
    +    public static void waitForSeconds(final int seconds) {
             try {
    -            Thread.sleep(seconds * 1000);
    -        } catch (InterruptedException e) {
    +            Thread.sleep(seconds * MILLIS_IN_SECOND);
    --- End diff --
    
    Nit: You could use https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/TimeUnit.html if you'd like to get rid of the millis constant.


---

[GitHub] storm pull request #2596: [STORM-3004] storm-elasticsearch-examples: fixed a...

Posted by krichter722 <gi...@git.apache.org>.
Github user krichter722 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2596#discussion_r177086661
  
    --- Diff: examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java ---
    @@ -34,52 +39,128 @@
     import org.apache.storm.tuple.Fields;
     import org.apache.storm.tuple.Values;
     
    -public class EsIndexTopology {
    +/**
    + * Demonstrates an ElasticSearch Strom topology.
    + * @author unknown
    + */
    +public final class EsIndexTopology {
     
    +    /**
    +     * The id of the used spout.
    +     */
         static final String SPOUT_ID = "spout";
    +    /**
    +     * The id of the used bolt.
    +     */
         static final String BOLT_ID = "bolt";
    +    /**
    +     * The name of the used topology.
    +     */
         static final String TOPOLOGY_NAME = "elasticsearch-test-topology1";
    +    /**
    +     * The number of pending tuples triggering logging.
    +     */
    +    private static final int PENDING_COUNT_MAX = 1000;
     
    -    public static void main(String[] args) throws Exception {
    +    /**
    +     * The example's main method.
    +     * @param args the command line arguments
    +     * @throws AlreadyAliveException if the topology is already started
    +     * @throws InvalidTopologyException if the topology is invalid
    +     * @throws AuthorizationException if the topology authorization fails
    +     */
    +    public static void main(final String[] args) throws AlreadyAliveException,
    +            InvalidTopologyException,
    +            AuthorizationException {
             Config config = new Config();
             config.setNumWorkers(1);
             TopologyBuilder builder = new TopologyBuilder();
             UserDataSpout spout = new UserDataSpout();
             builder.setSpout(SPOUT_ID, spout, 1);
             EsTupleMapper tupleMapper = EsTestUtil.generateDefaultTupleMapper();
             EsConfig esConfig = new EsConfig("http://localhost:9300");
    -        builder.setBolt(BOLT_ID, new EsIndexBolt(esConfig, tupleMapper), 1).shuffleGrouping(SPOUT_ID);
    +        builder.setBolt(BOLT_ID, new EsIndexBolt(esConfig, tupleMapper), 1)
    +                .shuffleGrouping(SPOUT_ID);
     
             EsTestUtil.startEsNode();
    -        EsTestUtil.waitForSeconds(5);
    -        StormSubmitter.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
    +        EsTestUtil.waitForSeconds(EsConstants.WAIT_DEFAULT);
    +        StormSubmitter.submitTopology(TOPOLOGY_NAME,
    +                config,
    +                builder.createTopology());
         }
     
    +    /**
    +     * The user data spout.
    +     */
         public static class UserDataSpout extends BaseRichSpout {
    +        private static final long serialVersionUID = 1L;
    +        /**
    +         * The pending values.
    +         */
             private ConcurrentHashMap<UUID, Values> pending;
    +        /**
    +         * The collector passed in
    +         * {@link #open(java.util.Map, org.apache.storm.task.TopologyContext,
    +         * org.apache.storm.spout.SpoutOutputCollector) }.
    +         */
             private SpoutOutputCollector collector;
    +        /**
    +         * The sources.
    +         */
             private String[] sources = {
    -                "{\"user\":\"user1\"}",
    -                "{\"user\":\"user2\"}",
    -                "{\"user\":\"user3\"}",
    -                "{\"user\":\"user4\"}"
    +            "{\"user\":\"user1\"}",
    +            "{\"user\":\"user2\"}",
    +            "{\"user\":\"user3\"}",
    +            "{\"user\":\"user4\"}"
             };
    +        /**
    --- End diff --
    
    Ah, you're right, I was using the `checkstyle:check` target which apparently uses a much stricter set of rules which might be even different from the one configured to run in the `validate` phase. I'll remove the noisy comments on private fields and methods.


---

[GitHub] storm pull request #2596: [STORM-3004] storm-elasticsearch-examples: fixed a...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2596#discussion_r176908195
  
    --- Diff: examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java ---
    @@ -53,79 +78,197 @@ public static void main(String[] args) throws Exception {
             Fields esFields = new Fields("index", "type", "source");
             EsTupleMapper tupleMapper = EsTestUtil.generateDefaultTupleMapper();
             StateFactory factory = new EsStateFactory(esConfig, tupleMapper);
    -        TridentState state = stream.partitionPersist(factory, esFields, new EsUpdater(), new Fields());
    +        TridentState state = stream.partitionPersist(factory,
    +                esFields,
    +                new EsUpdater(),
    +                new Fields());
     
             EsTestUtil.startEsNode();
    -        EsTestUtil.waitForSeconds(5);
    +        EsTestUtil.waitForSeconds(EsConstants.WAIT_DEFAULT);
     
    -        StormSubmitter.submitTopology(TOPOLOGY_NAME, new Config(), topology.build());
    +        StormSubmitter.submitTopology(TOPOLOGY_NAME,
    +                new Config(),
    +                topology.build());
         }
     
    +    /**
    +     * A fixed batch spout.
    +     */
         public static class FixedBatchSpout implements IBatchSpout {
    -        int maxBatchSize;
    -        HashMap<Long, List<List<Object>>> batches = new HashMap<Long, List<List<Object>>>();
    +        private static final long serialVersionUID = 1L;
    +        /**
    +         * The maximum batch size.
    +         */
    +        private int maxBatchSize;
    +        /**
    +         * The passed batches.
    +         */
    +        private HashMap<Long, List<List<Object>>> batches = new HashMap<>();
    +        /**
    +         * The output values.
    +         */
             private Values[] outputs = {
    -                new Values("{\"user\":\"user1\"}", "index1", "type1", UUID.randomUUID().toString()),
    -                new Values("{\"user\":\"user2\"}", "index1", "type2", UUID.randomUUID().toString()),
    -                new Values("{\"user\":\"user3\"}", "index2", "type1", UUID.randomUUID().toString()),
    -                new Values("{\"user\":\"user4\"}", "index2", "type2", UUID.randomUUID().toString())
    +            new Values("{\"user\":\"user1\"}",
    +                    "index1",
    +                    "type1",
    +                    UUID.randomUUID().toString()),
    +            new Values("{\"user\":\"user2\"}",
    +                    "index1",
    +                    "type2",
    +                    UUID.randomUUID().toString()),
    +            new Values("{\"user\":\"user3\"}",
    +                    "index2",
    +                    "type1",
    +                    UUID.randomUUID().toString()),
    +            new Values("{\"user\":\"user4\"}",
    +                    "index2",
    +                    "type2",
    +                    UUID.randomUUID().toString())
             };
    +        /**
    +         * The current index.
    +         */
             private int index = 0;
    -        boolean cycle = false;
    +        /**
    +         * A flag indicating whether cycling ought to be performed.
    +         */
    +        private boolean cycle = false;
     
    -        public FixedBatchSpout(int maxBatchSize) {
    -            this.maxBatchSize = maxBatchSize;
    +        /**
    +         * Creates a new fixed batch spout.
    +         * @param maxBatchSizeArg the maximum batch size to set
    +         */
    +        public FixedBatchSpout(final int maxBatchSizeArg) {
    +            this.maxBatchSize = maxBatchSizeArg;
             }
     
    -        public void setCycle(boolean cycle) {
    -            this.cycle = cycle;
    +        /**
    +         * Sets the cycle flag.
    +         * @param cycleArg the cycle flag value
    +         */
    +        public void setCycle(final boolean cycleArg) {
    +            this.cycle = cycleArg;
             }
     
    +        /**
    +         * Gets the output fields.
    +         * @return the output fields.
    +         */
             @Override
             public Fields getOutputFields() {
                 return new Fields("source", "index", "type", "id");
             }
     
    +        /**
    +         * Open the topology.
    --- End diff --
    
    Nit: Opens the spout, not the topology


---