You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/08/14 17:41:37 UTC
[1/3] storm git commit: STORM-938: Add tick tuples to HiveBolt for
time-based flushing
Repository: storm
Updated Branches:
refs/heads/master 80c6119ba -> b8d5635e8
STORM-938: Add tick tuples to HiveBolt for time-based flushing
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9fd358eb
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9fd358eb
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9fd358eb
Branch: refs/heads/master
Commit: 9fd358eb33b0f3f0d61a8a72890354eb11c5fa1c
Parents: 6de597a
Author: Aaron Dossett <aa...@target.com>
Authored: Fri Jul 31 10:34:44 2015 -0500
Committer: Aaron Dossett <aa...@target.com>
Committed: Fri Jul 31 10:34:44 2015 -0500
----------------------------------------------------------------------
external/storm-hive/README.md | 3 +-
.../org/apache/storm/hive/bolt/HiveBolt.java | 39 +++++++++++++++-----
.../apache/storm/hive/common/HiveOptions.java | 11 ++++++
.../apache/storm/hive/bolt/TestHiveBolt.java | 12 +++---
4 files changed, 49 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/9fd358eb/external/storm-hive/README.md
----------------------------------------------------------------------
diff --git a/external/storm-hive/README.md b/external/storm-hive/README.md
index b16c390..629ec54 100644
--- a/external/storm-hive/README.md
+++ b/external/storm-hive/README.md
@@ -76,9 +76,10 @@ HiveOptions params
|withBatchSize| Max number of events written to Hive in a single Hive transaction| Integer. default 15000|
|withCallTimeout| (In milliseconds) Timeout for Hive & HDFS I/O operations, such as openTxn, write, commit, abort. | Integer. default 10000|
|withHeartBeatInterval| (In seconds) Interval between consecutive heartbeats sent to Hive to keep unused transactions from expiring. Set this value to 0 to disable heartbeats.| Integer. default 240 |
-|withAutoCreatePartitions| HiveBolt will automatically create the necessary Hive partitions to stream to. |Boolean. defalut true |
+|withAutoCreatePartitions| HiveBolt will automatically create the necessary Hive partitions to stream to. |Boolean. default true |
|withKerberosPrinicipal| Kerberos user principal for accessing secure Hive | String|
|withKerberosKeytab| Kerberos keytab for accessing secure Hive | String |
+|withTickTupleInterval| (In seconds) If > 0 then the Hive Bolt will periodically flush transaction batches. Enabling this is recommended to avoid tuple timeouts while waiting for a batch to fill up.| Integer. default 0|
http://git-wip-us.apache.org/repos/asf/storm/blob/9fd358eb/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
index ec0ade2..e20d31f 100644
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
@@ -23,6 +23,8 @@ import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Tuple;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.utils.TupleUtils;
+import backtype.storm.Config;
import org.apache.storm.hive.common.HiveWriter;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hive.hcatalog.streaming.*;
@@ -99,16 +101,24 @@ public class HiveBolt extends BaseRichBolt {
@Override
public void execute(Tuple tuple) {
try {
- List<String> partitionVals = options.getMapper().mapPartitions(tuple);
- HiveEndPoint endPoint = HiveUtils.makeEndPoint(partitionVals, options);
- HiveWriter writer = getOrCreateWriter(endPoint);
- if(timeToSendHeartBeat.compareAndSet(true, false)) {
- enableHeartBeatOnAllWriters();
+ boolean forceFlush = false;
+ if (TupleUtils.isTick(tuple)) {
+ LOG.debug("TICK received! current batch status [" + tupleBatch.size() + "/" + options.getBatchSize() + "]");
+ forceFlush = true;
}
- writer.write(options.getMapper().mapRecord(tuple));
-
- tupleBatch.add(tuple);
- if(tupleBatch.size() >= options.getBatchSize()) {
+ else {
+ List<String> partitionVals = options.getMapper().mapPartitions(tuple);
+ HiveEndPoint endPoint = HiveUtils.makeEndPoint(partitionVals, options);
+ HiveWriter writer = getOrCreateWriter(endPoint);
+ if (timeToSendHeartBeat.compareAndSet(true, false)) {
+ enableHeartBeatOnAllWriters();
+ }
+ writer.write(options.getMapper().mapRecord(tuple));
+ tupleBatch.add(tuple);
+ if (tupleBatch.size() >= options.getBatchSize())
+ forceFlush = true;
+ }
+ if(forceFlush && !tupleBatch.isEmpty()) {
flushAllWriters(true);
LOG.info("acknowledging tuples after writers flushed ");
for(Tuple t : tupleBatch)
@@ -174,6 +184,17 @@ public class HiveBolt extends BaseRichBolt {
LOG.info("Hive Bolt stopped");
}
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ Map<String, Object> conf = super.getComponentConfiguration();
+ if (conf == null)
+ conf = new Config();
+
+ if (options.getTickTupleInterval() > 0)
+ conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, options.getTickTupleInterval());
+
+ return conf;
+ }
private void setupHeartBeatTimer() {
if(options.getHeartBeatInterval()>0) {
http://git-wip-us.apache.org/repos/asf/storm/blob/9fd358eb/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java
index d316294..3df1600 100644
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java
@@ -39,6 +39,7 @@ public class HiveOptions implements Serializable {
protected Boolean autoCreatePartitions = true;
protected String kerberosPrincipal;
protected String kerberosKeytab;
+ protected Integer tickTupleInterval = 0;
public HiveOptions(String metaStoreURI,String databaseName,String tableName,HiveMapper mapper) {
this.metaStoreURI = metaStoreURI;
@@ -47,6 +48,12 @@ public class HiveOptions implements Serializable {
this.mapper = mapper;
}
+ public HiveOptions withTickTupleInterval(Integer tickInterval)
+ {
+ this.tickTupleInterval = tickInterval;
+ return this;
+ }
+
public HiveOptions withTxnsPerBatch(Integer txnsPerBatch) {
this.txnsPerBatch = txnsPerBatch;
return this;
@@ -143,4 +150,8 @@ public class HiveOptions implements Serializable {
public String getKerberosKeytab() {
return kerberosKeytab;
}
+
+ public Integer getTickTupleInterval() {
+ return tickTupleInterval;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/9fd358eb/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
index b83b960..0350c6e 100644
--- a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
+++ b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
@@ -20,7 +20,6 @@ package org.apache.storm.hive.bolt;
import backtype.storm.Config;
import backtype.storm.task.GeneralTopologyContext;
-import backtype.storm.task.IOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
@@ -95,7 +94,7 @@ public class TestHiveBolt {
public TemporaryFolder dbFolder = new TemporaryFolder();
@Mock
- private IOutputCollector collector;
+ private OutputCollector collector;
private static final Logger LOG = LoggerFactory.getLogger(HiveBolt.class);
@@ -150,7 +149,7 @@ public class TestHiveBolt {
.withTxnsPerBatch(2)
.withBatchSize(2);
bolt = new HiveBolt(hiveOptions);
- bolt.prepare(config,null,new OutputCollector(collector));
+ bolt.prepare(config,null,collector);
Integer id = 100;
String msg = "test-123";
String city = "sunnyvale";
@@ -183,7 +182,7 @@ public class TestHiveBolt {
.withBatchSize(2)
.withAutoCreatePartitions(false);
bolt = new HiveBolt(hiveOptions);
- bolt.prepare(config,null,new OutputCollector(collector));
+ bolt.prepare(config,null,collector);
Integer id = 100;
String msg = "test-123";
String city = "sunnyvale";
@@ -217,7 +216,7 @@ public class TestHiveBolt {
.withTxnsPerBatch(2)
.withBatchSize(1);
bolt = new HiveBolt(hiveOptions);
- bolt.prepare(config,null,new OutputCollector(collector));
+ bolt.prepare(config,null,collector);
Integer id = 100;
String msg = "test-123";
Date d = new Date();
@@ -249,6 +248,7 @@ public class TestHiveBolt {
bolt = new HiveBolt(hiveOptions);
bolt.prepare(config, null, new OutputCollector(collector));
Tuple tuple1 = generateTestTuple(1, "SJC", "Sunnyvale", "CA");
+
//Tuple tuple2 = generateTestTuple(2,"SFO","San Jose","CA");
bolt.execute(tuple1);
verify(collector).ack(tuple1);
@@ -270,7 +270,7 @@ public class TestHiveBolt {
.withTxnsPerBatch(2)
.withBatchSize(1);
bolt = new HiveBolt(hiveOptions);
- bolt.prepare(config,null,new OutputCollector(collector));
+ bolt.prepare(config,null,collector);
Tuple tuple1 = generateTestTuple(1,"SJC","Sunnyvale","CA");
//Tuple tuple2 = generateTestTuple(2,"SFO","San Jose","CA");
bolt.execute(tuple1);
[3/3] storm git commit: Added STORM-938 to CHANGELOG.
Posted by sr...@apache.org.
Added STORM-938 to CHANGELOG.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b8d5635e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b8d5635e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b8d5635e
Branch: refs/heads/master
Commit: b8d5635e850ee376426cca65b39f33fa9e8ff392
Parents: b3b3174
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Fri Aug 14 08:40:54 2015 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Fri Aug 14 08:40:54 2015 -0700
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/b8d5635e/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 8f62775..e80ea09 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 0.11.0
+ * STORM-938: storm-hive add a time interval to flush tuples to hive.
* STORM-977: Incorrect signal (-9) when as-user is true
* STORM-964: Add config (with small default value) for logwriter to restrict its memory usage
* STORM-980: Re-include storm-kafka tests from Travis CI build
[2/3] storm git commit: Merge branch 'STORM-938' of
https://github.com/dossett/storm into STORM-938
Posted by sr...@apache.org.
Merge branch 'STORM-938' of https://github.com/dossett/storm into STORM-938
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b3b3174f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b3b3174f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b3b3174f
Branch: refs/heads/master
Commit: b3b3174f36f5421785a5769e2a9744678fdc984f
Parents: 80c6119 9fd358e
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Fri Aug 14 08:28:03 2015 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Fri Aug 14 08:28:03 2015 -0700
----------------------------------------------------------------------
external/storm-hive/README.md | 3 +-
.../org/apache/storm/hive/bolt/HiveBolt.java | 39 +++++++++++++++-----
.../apache/storm/hive/common/HiveOptions.java | 11 ++++++
.../apache/storm/hive/bolt/TestHiveBolt.java | 12 +++---
4 files changed, 49 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/b3b3174f/external/storm-hive/README.md
----------------------------------------------------------------------