You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/08/14 17:41:37 UTC

[1/3] storm git commit: STORM-938: Add tick tuples to HiveBolt for time-based flushing

Repository: storm
Updated Branches:
  refs/heads/master 80c6119ba -> b8d5635e8


STORM-938: Add tick tuples to HiveBolt for time-based flushing


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9fd358eb
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9fd358eb
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9fd358eb

Branch: refs/heads/master
Commit: 9fd358eb33b0f3f0d61a8a72890354eb11c5fa1c
Parents: 6de597a
Author: Aaron Dossett <aa...@target.com>
Authored: Fri Jul 31 10:34:44 2015 -0500
Committer: Aaron Dossett <aa...@target.com>
Committed: Fri Jul 31 10:34:44 2015 -0500

----------------------------------------------------------------------
 external/storm-hive/README.md                   |  3 +-
 .../org/apache/storm/hive/bolt/HiveBolt.java    | 39 +++++++++++++++-----
 .../apache/storm/hive/common/HiveOptions.java   | 11 ++++++
 .../apache/storm/hive/bolt/TestHiveBolt.java    | 12 +++---
 4 files changed, 49 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/9fd358eb/external/storm-hive/README.md
----------------------------------------------------------------------
diff --git a/external/storm-hive/README.md b/external/storm-hive/README.md
index b16c390..629ec54 100644
--- a/external/storm-hive/README.md
+++ b/external/storm-hive/README.md
@@ -76,9 +76,10 @@ HiveOptions params
 |withBatchSize| Max number of events written to Hive in a single Hive transaction| Integer. default 15000|
 |withCallTimeout| (In milliseconds) Timeout for Hive & HDFS I/O operations, such as openTxn, write, commit, abort. | Integer. default 10000|
 |withHeartBeatInterval| (In seconds) Interval between consecutive heartbeats sent to Hive to keep unused transactions from expiring. Set this value to 0 to disable heartbeats.| Integer. default 240 |
-|withAutoCreatePartitions| HiveBolt will automatically create the necessary Hive partitions to stream to. |Boolean. defalut true |
+|withAutoCreatePartitions| HiveBolt will automatically create the necessary Hive partitions to stream to. |Boolean. default true |
 |withKerberosPrinicipal| Kerberos user principal for accessing secure Hive | String|
 |withKerberosKeytab| Kerberos keytab for accessing secure Hive | String |
+|withTickTupleInterval| (In seconds) If > 0 then the Hive Bolt will periodically flush transaction batches. Enabling this is recommended to avoid tuple timeouts while waiting for a batch to fill up.| Integer. default 0|
 
 
  

http://git-wip-us.apache.org/repos/asf/storm/blob/9fd358eb/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
index ec0ade2..e20d31f 100644
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
@@ -23,6 +23,8 @@ import backtype.storm.task.TopologyContext;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.topology.base.BaseRichBolt;
 import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.utils.TupleUtils;
+import backtype.storm.Config;
 import org.apache.storm.hive.common.HiveWriter;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hive.hcatalog.streaming.*;
@@ -99,16 +101,24 @@ public class HiveBolt extends  BaseRichBolt {
     @Override
     public void execute(Tuple tuple) {
         try {
-            List<String> partitionVals = options.getMapper().mapPartitions(tuple);
-            HiveEndPoint endPoint = HiveUtils.makeEndPoint(partitionVals, options);
-            HiveWriter writer = getOrCreateWriter(endPoint);
-            if(timeToSendHeartBeat.compareAndSet(true, false)) {
-                enableHeartBeatOnAllWriters();
+            boolean forceFlush = false;
+            if (TupleUtils.isTick(tuple)) {
+                LOG.debug("TICK received! current batch status [" + tupleBatch.size() + "/" + options.getBatchSize() + "]");
+                forceFlush = true;
             }
-            writer.write(options.getMapper().mapRecord(tuple));
-
-            tupleBatch.add(tuple);
-            if(tupleBatch.size() >= options.getBatchSize()) {
+            else {
+                List<String> partitionVals = options.getMapper().mapPartitions(tuple);
+                HiveEndPoint endPoint = HiveUtils.makeEndPoint(partitionVals, options);
+                HiveWriter writer = getOrCreateWriter(endPoint);
+                if (timeToSendHeartBeat.compareAndSet(true, false)) {
+                    enableHeartBeatOnAllWriters();
+                }
+                writer.write(options.getMapper().mapRecord(tuple));
+                tupleBatch.add(tuple);
+                if (tupleBatch.size() >= options.getBatchSize())
+                    forceFlush = true;
+            }
+            if(forceFlush && !tupleBatch.isEmpty()) {
                 flushAllWriters(true);
                 LOG.info("acknowledging tuples after writers flushed ");
                 for(Tuple t : tupleBatch)
@@ -174,6 +184,17 @@ public class HiveBolt extends  BaseRichBolt {
         LOG.info("Hive Bolt stopped");
     }
 
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        Map<String, Object> conf = super.getComponentConfiguration();
+        if (conf == null)
+            conf = new Config();
+
+        if (options.getTickTupleInterval() > 0)
+            conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, options.getTickTupleInterval());
+
+        return conf;
+    }
 
     private void setupHeartBeatTimer() {
         if(options.getHeartBeatInterval()>0) {

http://git-wip-us.apache.org/repos/asf/storm/blob/9fd358eb/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java
index d316294..3df1600 100644
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java
@@ -39,6 +39,7 @@ public class HiveOptions implements Serializable {
     protected Boolean autoCreatePartitions = true;
     protected String kerberosPrincipal;
     protected String kerberosKeytab;
+    protected Integer tickTupleInterval = 0;
 
     public HiveOptions(String metaStoreURI,String databaseName,String tableName,HiveMapper mapper) {
         this.metaStoreURI = metaStoreURI;
@@ -47,6 +48,12 @@ public class HiveOptions implements Serializable {
         this.mapper = mapper;
     }
 
+    public HiveOptions withTickTupleInterval(Integer tickInterval)
+    {
+        this.tickTupleInterval = tickInterval;
+        return this;
+    }
+
     public HiveOptions withTxnsPerBatch(Integer txnsPerBatch) {
         this.txnsPerBatch = txnsPerBatch;
         return this;
@@ -143,4 +150,8 @@ public class HiveOptions implements Serializable {
     public String getKerberosKeytab() {
         return kerberosKeytab;
     }
+
+    public Integer getTickTupleInterval() {
+        return tickTupleInterval;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/9fd358eb/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
index b83b960..0350c6e 100644
--- a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
+++ b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
@@ -20,7 +20,6 @@ package org.apache.storm.hive.bolt;
 
 import backtype.storm.Config;
 import backtype.storm.task.GeneralTopologyContext;
-import backtype.storm.task.IOutputCollector;
 import backtype.storm.task.OutputCollector;
 import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.tuple.Fields;
@@ -95,7 +94,7 @@ public class TestHiveBolt {
     public TemporaryFolder dbFolder = new TemporaryFolder();
 
     @Mock
-    private IOutputCollector collector;
+    private OutputCollector collector;
 
     private static final Logger LOG = LoggerFactory.getLogger(HiveBolt.class);
 
@@ -150,7 +149,7 @@ public class TestHiveBolt {
             .withTxnsPerBatch(2)
             .withBatchSize(2);
         bolt = new HiveBolt(hiveOptions);
-        bolt.prepare(config,null,new OutputCollector(collector));
+        bolt.prepare(config,null,collector);
         Integer id = 100;
         String msg = "test-123";
         String city = "sunnyvale";
@@ -183,7 +182,7 @@ public class TestHiveBolt {
             .withBatchSize(2)
             .withAutoCreatePartitions(false);
         bolt = new HiveBolt(hiveOptions);
-        bolt.prepare(config,null,new OutputCollector(collector));
+        bolt.prepare(config,null,collector);
         Integer id = 100;
         String msg = "test-123";
         String city = "sunnyvale";
@@ -217,7 +216,7 @@ public class TestHiveBolt {
             .withTxnsPerBatch(2)
             .withBatchSize(1);
         bolt = new HiveBolt(hiveOptions);
-        bolt.prepare(config,null,new OutputCollector(collector));
+        bolt.prepare(config,null,collector);
         Integer id = 100;
         String msg = "test-123";
         Date d = new Date();
@@ -249,6 +248,7 @@ public class TestHiveBolt {
         bolt = new HiveBolt(hiveOptions);
         bolt.prepare(config, null, new OutputCollector(collector));
         Tuple tuple1 = generateTestTuple(1, "SJC", "Sunnyvale", "CA");
+
         //Tuple tuple2 = generateTestTuple(2,"SFO","San Jose","CA");
         bolt.execute(tuple1);
         verify(collector).ack(tuple1);
@@ -270,7 +270,7 @@ public class TestHiveBolt {
             .withTxnsPerBatch(2)
             .withBatchSize(1);
         bolt = new HiveBolt(hiveOptions);
-        bolt.prepare(config,null,new OutputCollector(collector));
+        bolt.prepare(config,null,collector);
         Tuple tuple1 = generateTestTuple(1,"SJC","Sunnyvale","CA");
         //Tuple tuple2 = generateTestTuple(2,"SFO","San Jose","CA");
         bolt.execute(tuple1);


[3/3] storm git commit: Added STORM-938 to CHANGELOG.

Posted by sr...@apache.org.
Added STORM-938 to CHANGELOG.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b8d5635e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b8d5635e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b8d5635e

Branch: refs/heads/master
Commit: b8d5635e850ee376426cca65b39f33fa9e8ff392
Parents: b3b3174
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Fri Aug 14 08:40:54 2015 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Fri Aug 14 08:40:54 2015 -0700

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b8d5635e/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 8f62775..e80ea09 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 0.11.0
+ * STORM-938: storm-hive add a time interval to flush tuples to hive.
  * STORM-977: Incorrect signal (-9) when as-user is true
  * STORM-964: Add config (with small default value) for logwriter to restrict its memory usage
  * STORM-980: Re-include storm-kafka tests from Travis CI build


[2/3] storm git commit: Merge branch 'STORM-938' of https://github.com/dossett/storm into STORM-938

Posted by sr...@apache.org.
Merge branch 'STORM-938' of https://github.com/dossett/storm into STORM-938


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b3b3174f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b3b3174f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b3b3174f

Branch: refs/heads/master
Commit: b3b3174f36f5421785a5769e2a9744678fdc984f
Parents: 80c6119 9fd358e
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Fri Aug 14 08:28:03 2015 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Fri Aug 14 08:28:03 2015 -0700

----------------------------------------------------------------------
 external/storm-hive/README.md                   |  3 +-
 .../org/apache/storm/hive/bolt/HiveBolt.java    | 39 +++++++++++++++-----
 .../apache/storm/hive/common/HiveOptions.java   | 11 ++++++
 .../apache/storm/hive/bolt/TestHiveBolt.java    | 12 +++---
 4 files changed, 49 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b3b3174f/external/storm-hive/README.md
----------------------------------------------------------------------