You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2016/01/13 18:22:15 UTC
[2/8] storm git commit: on dossett&hmcl 's comments
on dossett&hmcl 's comments
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/cea44369
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/cea44369
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/cea44369
Branch: refs/heads/master
Commit: cea443699522b1dcb3d6953604eb3ca95da6b56d
Parents: db03e4a
Author: Xin Wang <be...@163.com>
Authored: Thu Jan 7 23:12:05 2016 +0800
Committer: Xin Wang <be...@163.com>
Committed: Thu Jan 7 23:12:05 2016 +0800
----------------------------------------------------------------------
.../apache/storm/solr/bolt/SolrUpdateBolt.java | 52 ++++++++++++++++----
1 file changed, 43 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/cea44369/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java b/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java
index 814d3b2..adb2b92 100644
--- a/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java
+++ b/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java
@@ -18,12 +18,14 @@
package org.apache.storm.solr.bolt;
+import backtype.storm.Config;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import backtype.storm.utils.TupleUtils;
+import backtype.storm.utils.Utils;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
@@ -52,6 +54,7 @@ public class SolrUpdateBolt extends BaseRichBolt {
private SolrClient solrClient;
private OutputCollector collector;
private List<Tuple> toCommitTuples;
+ private Integer tickTupleInterval = 0;
public SolrUpdateBolt(SolrConfig solrConfig, SolrMapper solrMapper) {
this(solrConfig, solrMapper, null);
@@ -66,11 +69,19 @@ public class SolrUpdateBolt extends BaseRichBolt {
this.getClass().getSimpleName(), solrConfig, solrMapper, commitStgy);
}
+ @Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
this.solrClient = new CloudSolrClient(solrConfig.getZkHostString());
this.toCommitTuples = new ArrayList<>(capacity());
+ //set default tickTupleInterval if interval is zero
+ if (stormConf.containsKey("topology.message.timeout.secs") && tickTupleInterval == 0) {
+ Integer topologyTimeout = Utils.getInt(stormConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS));
+ tickTupleInterval = (int)(Math.floor(topologyTimeout / 2));
+ LOG.debug("Setting tick tuple interval to [{}] based on topology timeout", tickTupleInterval);
+ }
+
}
private int capacity() {
@@ -80,10 +91,13 @@ public class SolrUpdateBolt extends BaseRichBolt {
defArrListCpcty;
}
+ @Override
public void execute(Tuple tuple) {
try {
- SolrRequest request = solrMapper.toSolrRequest(tuple);
- solrClient.request(request, solrMapper.getCollection());
+ if (!TupleUtils.isTick(tuple)) {//Don't add tick tuples to the SolrRequest
+ SolrRequest request = solrMapper.toSolrRequest(tuple);
+ solrClient.request(request, solrMapper.getCollection());
+ }
ack(tuple);
} catch (Exception e) {
fail(tuple, e);
@@ -94,19 +108,19 @@ public class SolrUpdateBolt extends BaseRichBolt {
if (commitStgy == null) {
collector.ack(tuple);
} else {
+ boolean forceCommit = false;
if (TupleUtils.isTick(tuple)) {
LOG.debug("TICK! forcing solr client commit");
collector.ack(tuple);
- commitStgy.commit();
- solrClient.commit(solrMapper.getCollection());
- ackCommittedTuples();
+ forceCommit = true;
} else {
toCommitTuples.add(tuple);
commitStgy.update();
- if (commitStgy.commit()) {
- solrClient.commit(solrMapper.getCollection());
- ackCommittedTuples();
- }
+ }
+
+ if (forceCommit || commitStgy.commit()) {
+ solrClient.commit(solrMapper.getCollection());
+ ackCommittedTuples();
}
}
}
@@ -141,6 +155,26 @@ public class SolrUpdateBolt extends BaseRichBolt {
return queuedTuples;
}
+ public SolrUpdateBolt withTickIntervalSecs(int tickTupleInterval) {
+ this.tickTupleInterval = tickTupleInterval;
+ return this;
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ Map<String, Object> conf = super.getComponentConfiguration();
+ if (conf == null)
+ conf = new Config();
+
+ if (tickTupleInterval > 0) {
+ LOG.info("Enabling tick tuple with interval [{}]", tickTupleInterval);
+ conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, tickTupleInterval);
+ }
+
+ return conf;
+ }
+
+ @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) { }
}