You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/07/31 16:28:41 UTC

[1/2] storm git commit: STORM-960: HiveBolt should only ack after succesful flush

Repository: storm
Updated Branches:
  refs/heads/master 14fdab8b1 -> 6de597a6d


STORM-960: HiveBolt should only ack after succesful flush


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/937346a3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/937346a3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/937346a3

Branch: refs/heads/master
Commit: 937346a3373dfaee2225ae32d53b68391bd09cf5
Parents: 14fdab8
Author: Aaron Dossett <aa...@target.com>
Authored: Thu Jul 30 10:54:17 2015 -0500
Committer: Aaron Dossett <aa...@target.com>
Committed: Thu Jul 30 10:54:17 2015 -0500

----------------------------------------------------------------------
 .../org/apache/storm/hive/bolt/HiveBolt.java    |  38 +++++--
 .../apache/storm/hive/bolt/TestHiveBolt.java    | 100 ++++++++++++++++---
 2 files changed, 115 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/937346a3/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
index b3defae..ec0ade2 100644
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
@@ -43,23 +43,25 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.List;
+import java.util.LinkedList;
 import java.io.IOException;
 
 public class HiveBolt extends  BaseRichBolt {
     private static final Logger LOG = LoggerFactory.getLogger(HiveBolt.class);
     private OutputCollector collector;
     private HiveOptions options;
-    private Integer currentBatchSize;
     private ExecutorService callTimeoutPool;
     private transient Timer heartBeatTimer;
     private Boolean kerberosEnabled = false;
     private AtomicBoolean timeToSendHeartBeat = new AtomicBoolean(false);
     private UserGroupInformation ugi = null;
     HashMap<HiveEndPoint, HiveWriter> allWriters;
+    private List<Tuple> tupleBatch;
 
     public HiveBolt(HiveOptions options) {
         this.options = options;
-        this.currentBatchSize = 0;
+        tupleBatch = new LinkedList<>();
     }
 
     @Override
@@ -90,7 +92,7 @@ public class HiveBolt extends  BaseRichBolt {
             heartBeatTimer = new Timer();
             setupHeartBeatTimer();
         } catch(Exception e) {
-            LOG.warn("unable to make connection to hive ",e);
+            LOG.warn("unable to make connection to hive ", e);
         }
     }
 
@@ -104,16 +106,31 @@ public class HiveBolt extends  BaseRichBolt {
                 enableHeartBeatOnAllWriters();
             }
             writer.write(options.getMapper().mapRecord(tuple));
-            currentBatchSize++;
-            if(currentBatchSize >= options.getBatchSize()) {
+
+            tupleBatch.add(tuple);
+            if(tupleBatch.size() >= options.getBatchSize()) {
                 flushAllWriters(true);
-                currentBatchSize = 0;
+                LOG.info("acknowledging tuples after writers flushed ");
+                for(Tuple t : tupleBatch)
+                    collector.ack(t);
+                tupleBatch.clear();
             }
-            collector.ack(tuple);
         } catch(Exception e) {
             this.collector.reportError(e);
             collector.fail(tuple);
-            flushAndCloseWriters();
+            try {
+                flushAndCloseWriters();
+                LOG.info("acknowledging tuples after writers flushed and closed");
+                for (Tuple t : tupleBatch)
+                    collector.ack(t);
+                tupleBatch.clear();
+            } catch (Exception e1) {
+                //If flushAndClose fails assume tuples are lost, do not ack
+                LOG.warn("Error while flushing and closing writers, tuples will NOT be acknowledged");
+                for (Tuple t : tupleBatch)
+                    collector.fail(t);
+                tupleBatch.clear();
+            }
         }
     }
 
@@ -170,7 +187,7 @@ public class HiveBolt extends  BaseRichBolt {
         }
     }
 
-    private void flushAllWriters(boolean rollToNext)
+    void flushAllWriters(boolean rollToNext)
         throws HiveWriter.CommitFailure, HiveWriter.TxnBatchFailure, HiveWriter.TxnFailure, InterruptedException {
         for(HiveWriter writer: allWriters.values()) {
             writer.flush(rollToNext);
@@ -194,11 +211,12 @@ public class HiveBolt extends  BaseRichBolt {
         }
     }
 
-    private void flushAndCloseWriters() {
+    void flushAndCloseWriters() throws Exception {
         try {
             flushAllWriters(false);
         } catch(Exception e) {
             LOG.warn("unable to flush hive writers. ", e);
+            throw e;
         } finally {
             closeAllWriters();
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/937346a3/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
index e7e875e..b83b960 100644
--- a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
+++ b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
@@ -45,9 +45,12 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.Spy;
 import org.mockito.MockitoAnnotations;
 import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.*;
+
 import junit.framework.Assert;
 
 import org.slf4j.Logger;
@@ -58,6 +61,8 @@ import java.util.HashMap;
 import java.util.ArrayList;
 import java.io.IOException;
 import java.util.Date;
+import java.util.Set;
+import java.util.HashSet;
 import java.text.SimpleDateFormat;
 
 
@@ -92,7 +97,6 @@ public class TestHiveBolt {
     @Mock
     private IOutputCollector collector;
 
-
     private static final Logger LOG = LoggerFactory.getLogger(HiveBolt.class);
 
     public TestHiveBolt() throws Exception {
@@ -151,12 +155,16 @@ public class TestHiveBolt {
         String msg = "test-123";
         String city = "sunnyvale";
         String state = "ca";
-        checkRecordCountInTable(tblName,dbName,0);
+        checkRecordCountInTable(tblName, dbName, 0);
+
+        Set<Tuple> tupleSet = new HashSet<Tuple>();
         for (int i=0; i < 4; i++) {
             Tuple tuple = generateTestTuple(id,msg,city,state);
             bolt.execute(tuple);
-            verify(collector).ack(tuple);
+            tupleSet.add(tuple);
         }
+        for (Tuple t : tupleSet)
+            verify(collector).ack(t);
         checkRecordCountInTable(tblName, dbName, 4);
         bolt.cleanup();
     }
@@ -181,11 +189,15 @@ public class TestHiveBolt {
         String city = "sunnyvale";
         String state = "ca";
         checkRecordCountInTable(tblName1,dbName1,0);
+
+        Set<Tuple> tupleSet = new HashSet<Tuple>();
         for (int i=0; i < 4; i++) {
             Tuple tuple = generateTestTuple(id,msg,city,state);
             bolt.execute(tuple);
-            verify(collector).ack(tuple);
+            tupleSet.add(tuple);
         }
+        for (Tuple t : tupleSet)
+            verify(collector).ack(t);
         bolt.cleanup();
         checkRecordCountInTable(tblName1, dbName1, 4);
     }
@@ -196,8 +208,8 @@ public class TestHiveBolt {
         String[] partNames1 = {"date"};
         String timeFormat = "yyyy/MM/dd";
         HiveSetupUtil.dropDB(conf,dbName1);
-        HiveSetupUtil.createDbAndTable(conf, dbName1, tblName1,null,
-                                       colNames,colTypes,partNames1, dbLocation);
+        HiveSetupUtil.createDbAndTable(conf, dbName1, tblName1, null,
+                colNames, colTypes, partNames1, dbLocation);
         DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
             .withColumnFields(new Fields(colNames))
             .withTimeAsPartitionField(timeFormat);
@@ -211,13 +223,17 @@ public class TestHiveBolt {
         Date d = new Date();
         SimpleDateFormat parseDate = new SimpleDateFormat(timeFormat);
         String today=parseDate.format(d.getTime());
-        checkRecordCountInTable(tblName1,dbName1,0);
+        checkRecordCountInTable(tblName1, dbName1, 0);
+
+        Set<Tuple> tupleSet = new HashSet<Tuple>();
         for (int i=0; i < 2; i++) {
             Tuple tuple = generateTestTuple(id,msg,null,null);
+            tupleSet.add(tuple);
             bolt.execute(tuple);
-            verify(collector).ack(tuple);
         }
-        checkDataWritten(tblName1, dbName1, "100,test-123,"+today, "100,test-123,"+today);
+        for (Tuple t : tupleSet)
+            verify(collector).ack(t);
+        checkDataWritten(tblName1, dbName1, "100,test-123," + today, "100,test-123," + today);
         bolt.cleanup();
     }
 
@@ -231,8 +247,8 @@ public class TestHiveBolt {
             .withTxnsPerBatch(2)
             .withBatchSize(1);
         bolt = new HiveBolt(hiveOptions);
-        bolt.prepare(config,null,new OutputCollector(collector));
-        Tuple tuple1 = generateTestTuple(1,"SJC","Sunnyvale","CA");
+        bolt.prepare(config, null, new OutputCollector(collector));
+        Tuple tuple1 = generateTestTuple(1, "SJC", "Sunnyvale", "CA");
         //Tuple tuple2 = generateTestTuple(2,"SFO","San Jose","CA");
         bolt.execute(tuple1);
         verify(collector).ack(tuple1);
@@ -265,6 +281,60 @@ public class TestHiveBolt {
         bolt.cleanup();
     }
 
+    @Test
+    public void testNoAcksUntilFlushed()
+    {
+        JsonRecordHiveMapper mapper = new JsonRecordHiveMapper()
+                .withColumnFields(new Fields(colNames1))
+                .withPartitionFields(new Fields(partNames));
+        HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+                .withTxnsPerBatch(2)
+                .withBatchSize(2);
+
+        bolt = new HiveBolt(hiveOptions);
+        bolt.prepare(config, null, new OutputCollector(collector));
+
+        Tuple tuple1 = generateTestTuple(1,"SJC","Sunnyvale","CA");
+        Tuple tuple2 = generateTestTuple(2,"SFO","San Jose","CA");
+
+        bolt.execute(tuple1);
+        verifyZeroInteractions(collector);
+
+        bolt.execute(tuple2);
+        verify(collector).ack(tuple1);
+        verify(collector).ack(tuple2);
+        bolt.cleanup();
+    }
+
+    @Test
+    public void testNoAcksIfFlushFails() throws Exception
+    {
+        JsonRecordHiveMapper mapper = new JsonRecordHiveMapper()
+                .withColumnFields(new Fields(colNames1))
+                .withPartitionFields(new Fields(partNames));
+        HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+                .withTxnsPerBatch(2)
+                .withBatchSize(2);
+
+        HiveBolt spyBolt = Mockito.spy(new HiveBolt(hiveOptions));
+
+        //This forces a failure of all the flush attempts
+        doThrow(new InterruptedException()).when(spyBolt).flushAllWriters(true);
+        doThrow(new Exception()).when(spyBolt).flushAndCloseWriters();
+
+        spyBolt.prepare(config, null, new OutputCollector(collector));
+
+        Tuple tuple1 = generateTestTuple(1,"SJC","Sunnyvale","CA");
+        Tuple tuple2 = generateTestTuple(2,"SFO","San Jose","CA");
+
+        spyBolt.execute(tuple1);
+        spyBolt.execute(tuple2);
+
+        verify(collector, never()).ack(tuple1);
+        verify(collector, never()).ack(tuple2);
+
+        spyBolt.cleanup();
+    }
 
     @Test
     public void testMultiPartitionTuples()
@@ -282,12 +352,16 @@ public class TestHiveBolt {
         String city = "San Jose";
         String state = "CA";
         checkRecordCountInTable(tblName,dbName,0);
+
+        Set<Tuple> tupleSet = new HashSet<Tuple>();
         for(int i=0; i < 100; i++) {
             Tuple tuple = generateTestTuple(id,msg,city,state);
+            tupleSet.add(tuple);
             bolt.execute(tuple);
-            verify(collector).ack(tuple);
         }
         checkRecordCountInTable(tblName, dbName, 100);
+        for (Tuple t : tupleSet)
+            verify(collector).ack(t);
         bolt.cleanup();
     }
 


[2/2] storm git commit: Added STORM-960 to CHANGELOG.

Posted by sr...@apache.org.
Added STORM-960 to CHANGELOG.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6de597a6
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6de597a6
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6de597a6

Branch: refs/heads/master
Commit: 6de597a6d004e56d1889df9b6a913c7391aff126
Parents: 937346a
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Fri Jul 31 07:27:43 2015 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Fri Jul 31 07:27:43 2015 -0700

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/6de597a6/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 6361734..1e511c7 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 0.11.0
+ * STORM-960: HiveBolt should ack tuples only after flushing.
  * STORM-951: Storm Hive connector leaking connections.
  * STORM_803: Better CI logs
  * STORM-806: use storm.zookeeper.connection.timeout in storm-kafka ZkState when newCurator