You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2016/01/13 18:22:15 UTC

[2/8] storm git commit: on dossett&hmcl 's comments

on dossett&hmcl 's comments

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/cea44369
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/cea44369
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/cea44369

Branch: refs/heads/master
Commit: cea443699522b1dcb3d6953604eb3ca95da6b56d
Parents: db03e4a
Author: Xin Wang <be...@163.com>
Authored: Thu Jan 7 23:12:05 2016 +0800
Committer: Xin Wang <be...@163.com>
Committed: Thu Jan 7 23:12:05 2016 +0800

----------------------------------------------------------------------
 .../apache/storm/solr/bolt/SolrUpdateBolt.java  | 52 ++++++++++++++++----
 1 file changed, 43 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/cea44369/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java b/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java
index 814d3b2..adb2b92 100644
--- a/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java
+++ b/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java
@@ -18,12 +18,14 @@
 
 package org.apache.storm.solr.bolt;
 
+import backtype.storm.Config;
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.base.BaseRichBolt;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.utils.TupleUtils;
+import backtype.storm.utils.Utils;
 
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrRequest;
@@ -52,6 +54,7 @@ public class SolrUpdateBolt extends BaseRichBolt {
     private SolrClient solrClient;
     private OutputCollector collector;
     private List<Tuple> toCommitTuples;
+    private Integer tickTupleInterval = 0;
 
     public SolrUpdateBolt(SolrConfig solrConfig, SolrMapper solrMapper) {
         this(solrConfig, solrMapper, null);
@@ -66,11 +69,19 @@ public class SolrUpdateBolt extends BaseRichBolt {
                     this.getClass().getSimpleName(), solrConfig, solrMapper, commitStgy);
     }
 
+    @Override
     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
         this.collector = collector;
         this.solrClient = new CloudSolrClient(solrConfig.getZkHostString());
         this.toCommitTuples = new ArrayList<>(capacity());
 
+        //set default tickTupleInterval if interval is zero
+        if (stormConf.containsKey("topology.message.timeout.secs") && tickTupleInterval == 0) {
+            Integer topologyTimeout = Utils.getInt(stormConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS));
+            tickTupleInterval = (int)(Math.floor(topologyTimeout / 2));
+            LOG.debug("Setting tick tuple interval to [{}] based on topology timeout", tickTupleInterval);
+        }
+
     }
 
     private int capacity() {
@@ -80,10 +91,13 @@ public class SolrUpdateBolt extends BaseRichBolt {
                 defArrListCpcty;
     }
 
+    @Override
     public void execute(Tuple tuple) {
         try {
-            SolrRequest request = solrMapper.toSolrRequest(tuple);
-            solrClient.request(request, solrMapper.getCollection());
+            if (!TupleUtils.isTick(tuple)) {//Don't add tick tuples to the SolrRequest
+                SolrRequest request = solrMapper.toSolrRequest(tuple);
+                solrClient.request(request, solrMapper.getCollection());
+            }
             ack(tuple);
         } catch (Exception e) {
             fail(tuple, e);
@@ -94,19 +108,19 @@ public class SolrUpdateBolt extends BaseRichBolt {
         if (commitStgy == null) {
             collector.ack(tuple);
         } else {
+            boolean forceCommit = false;
             if (TupleUtils.isTick(tuple)) {
                 LOG.debug("TICK! forcing solr client commit");
                 collector.ack(tuple);
-                commitStgy.commit();
-                solrClient.commit(solrMapper.getCollection());
-                ackCommittedTuples();
+                forceCommit = true;
             } else {
                 toCommitTuples.add(tuple);
                 commitStgy.update();
-                if (commitStgy.commit()) {
-                    solrClient.commit(solrMapper.getCollection());
-                    ackCommittedTuples();
-                }
+            }
+
+            if (forceCommit || commitStgy.commit()) {
+                solrClient.commit(solrMapper.getCollection());
+                ackCommittedTuples();
             }
         }
     }
@@ -141,6 +155,26 @@ public class SolrUpdateBolt extends BaseRichBolt {
         return queuedTuples;
     }
 
+    public SolrUpdateBolt withTickIntervalSecs(int tickTupleInterval) {
+        this.tickTupleInterval = tickTupleInterval;
+        return this;
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        Map<String, Object> conf = super.getComponentConfiguration();
+        if (conf == null)
+            conf = new Config();
+
+        if (tickTupleInterval > 0) {
+            LOG.info("Enabling tick tuple with interval [{}]", tickTupleInterval);
+            conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, tickTupleInterval);
+        }
+
+        return conf;
+    }
+
+    @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) { }
 
 }