You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sa...@apache.org on 2017/03/21 09:35:03 UTC

[1/3] storm git commit: STORM-2425: Storm Hive Bolt not closing open transactions

Repository: storm
Updated Branches:
  refs/heads/master f42981760 -> c722940b6


STORM-2425: Storm Hive Bolt not closing open transactions

Invoking `retireIdleWriters` when the hive bolt receives a tick tuple will periodically close the idle connections.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3ce63b85
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3ce63b85
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3ce63b85

Branch: refs/heads/master
Commit: 3ce63b8551bd710451308f5d6e140a1a622564e4
Parents: 809c4b2
Author: Arun Mahadevan <ar...@apache.org>
Authored: Thu Mar 16 12:16:10 2017 +0530
Committer: Arun Mahadevan <ar...@apache.org>
Committed: Mon Mar 20 14:38:20 2017 +0530

----------------------------------------------------------------------
 .../org/apache/storm/hive/bolt/HiveBolt.java    | 33 +++++++++++---------
 1 file changed, 19 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/3ce63b85/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
index aab575e..dc8be91 100644
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
@@ -119,6 +119,9 @@ public class HiveBolt extends BaseRichBolt {
                 LOG.info("acknowledging tuples after writers flushed ");
                 batchHelper.ack();
             }
+            if (TupleUtils.isTick(tuple)) {
+                retireIdleWriters();
+            }
         } catch(SerializationError se) {
             LOG.info("Serialization exception occurred, tuple is acknowledged but not written to Hive.", tuple);
             this.collector.reportError(se);
@@ -308,30 +311,32 @@ public class HiveBolt extends BaseRichBolt {
         LOG.info("Attempting close idle writers");
         int count = 0;
         long now = System.currentTimeMillis();
-        ArrayList<HiveEndPoint> retirees = new ArrayList<HiveEndPoint>();
 
         //1) Find retirement candidates
         for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
             if(now - entry.getValue().getLastUsed() > options.getIdleTimeout()) {
                 ++count;
-                retirees.add(entry.getKey());
+                retire(entry.getKey());
             }
         }
-        //2) Retire them
-        for(HiveEndPoint ep : retirees) {
-            try {
+        return count;
+    }
+
+    private void retire(HiveEndPoint ep) {
+        try {
+            HiveWriter writer = allWriters.remove(ep);
+            if (writer != null) {
                 LOG.info("Closing idle Writer to Hive end point : {}", ep);
-                allWriters.remove(ep).flushAndClose();
-            } catch (IOException e) {
-                LOG.warn("Failed to close writer for end point: {}. Error: "+ ep, e);
-            } catch (InterruptedException e) {
-                LOG.warn("Interrupted when attempting to close writer for end point: " + ep, e);
-                Thread.currentThread().interrupt();
-            } catch (Exception e) {
-                LOG.warn("Interrupted when attempting to close writer for end point: " + ep, e);
+                writer.flushAndClose();
             }
+        } catch (IOException e) {
+            LOG.warn("Failed to close writer for end point: {}. Error: " + ep, e);
+        } catch (InterruptedException e) {
+            LOG.warn("Interrupted when attempting to close writer for end point: " + ep, e);
+            Thread.currentThread().interrupt();
+        } catch (Exception e) {
+            LOG.warn("Interrupted when attempting to close writer for end point: " + ep, e);
         }
-        return count;
     }
 
 }


[2/3] storm git commit: Merge branch 'STORM-2425' of https://github.com/arunmahadevan/storm into 2425

Posted by sa...@apache.org.
Merge branch 'STORM-2425' of https://github.com/arunmahadevan/storm into 2425


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0d40fa9e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0d40fa9e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0d40fa9e

Branch: refs/heads/master
Commit: 0d40fa9e9587b6ee9003bcea72a668614df432c9
Parents: f429817 3ce63b8
Author: Satish Duggana <sd...@hortonworks.com>
Authored: Tue Mar 21 14:50:00 2017 +0530
Committer: Satish Duggana <sd...@hortonworks.com>
Committed: Tue Mar 21 14:50:00 2017 +0530

----------------------------------------------------------------------
 .../org/apache/storm/hive/bolt/HiveBolt.java    | 33 +++++++++++---------
 1 file changed, 19 insertions(+), 14 deletions(-)
----------------------------------------------------------------------



[3/3] storm git commit: Added STORM-2425 to CHANGELOG.md

Posted by sa...@apache.org.
Added STORM-2425 to CHANGELOG.md


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c722940b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c722940b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c722940b

Branch: refs/heads/master
Commit: c722940b68b34ef8ea8f07017c1fb7af6df2e6d4
Parents: 0d40fa9
Author: Satish Duggana <sd...@hortonworks.com>
Authored: Tue Mar 21 14:50:51 2017 +0530
Committer: Satish Duggana <sd...@hortonworks.com>
Committed: Tue Mar 21 14:50:51 2017 +0530

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c722940b/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index b160a6c..3463571 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 \ufeff## 2.0.0
+ * STORM-2425: Storm Hive Bolt not closing open transactions
  * STORM-2409: Storm-Kafka-Client KafkaSpout Support for Failed and NullTuples
  * STORM-2423: Join Bolt should use explicit instead of default window anchoring for emitted tuples
  * STORM-2411: Setting topology.eventlogger.executors=0 in defaults.yaml