You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/07/31 16:28:41 UTC
[1/2] storm git commit: STORM-960: HiveBolt should only ack after
succesful flush
Repository: storm
Updated Branches:
refs/heads/master 14fdab8b1 -> 6de597a6d
STORM-960: HiveBolt should only ack after succesful flush
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/937346a3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/937346a3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/937346a3
Branch: refs/heads/master
Commit: 937346a3373dfaee2225ae32d53b68391bd09cf5
Parents: 14fdab8
Author: Aaron Dossett <aa...@target.com>
Authored: Thu Jul 30 10:54:17 2015 -0500
Committer: Aaron Dossett <aa...@target.com>
Committed: Thu Jul 30 10:54:17 2015 -0500
----------------------------------------------------------------------
.../org/apache/storm/hive/bolt/HiveBolt.java | 38 +++++--
.../apache/storm/hive/bolt/TestHiveBolt.java | 100 ++++++++++++++++---
2 files changed, 115 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/937346a3/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
index b3defae..ec0ade2 100644
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
@@ -43,23 +43,25 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.List;
+import java.util.LinkedList;
import java.io.IOException;
public class HiveBolt extends BaseRichBolt {
private static final Logger LOG = LoggerFactory.getLogger(HiveBolt.class);
private OutputCollector collector;
private HiveOptions options;
- private Integer currentBatchSize;
private ExecutorService callTimeoutPool;
private transient Timer heartBeatTimer;
private Boolean kerberosEnabled = false;
private AtomicBoolean timeToSendHeartBeat = new AtomicBoolean(false);
private UserGroupInformation ugi = null;
HashMap<HiveEndPoint, HiveWriter> allWriters;
+ private List<Tuple> tupleBatch;
public HiveBolt(HiveOptions options) {
this.options = options;
- this.currentBatchSize = 0;
+ tupleBatch = new LinkedList<>();
}
@Override
@@ -90,7 +92,7 @@ public class HiveBolt extends BaseRichBolt {
heartBeatTimer = new Timer();
setupHeartBeatTimer();
} catch(Exception e) {
- LOG.warn("unable to make connection to hive ",e);
+ LOG.warn("unable to make connection to hive ", e);
}
}
@@ -104,16 +106,31 @@ public class HiveBolt extends BaseRichBolt {
enableHeartBeatOnAllWriters();
}
writer.write(options.getMapper().mapRecord(tuple));
- currentBatchSize++;
- if(currentBatchSize >= options.getBatchSize()) {
+
+ tupleBatch.add(tuple);
+ if(tupleBatch.size() >= options.getBatchSize()) {
flushAllWriters(true);
- currentBatchSize = 0;
+ LOG.info("acknowledging tuples after writers flushed ");
+ for(Tuple t : tupleBatch)
+ collector.ack(t);
+ tupleBatch.clear();
}
- collector.ack(tuple);
} catch(Exception e) {
this.collector.reportError(e);
collector.fail(tuple);
- flushAndCloseWriters();
+ try {
+ flushAndCloseWriters();
+ LOG.info("acknowledging tuples after writers flushed and closed");
+ for (Tuple t : tupleBatch)
+ collector.ack(t);
+ tupleBatch.clear();
+ } catch (Exception e1) {
+ //If flushAndClose fails assume tuples are lost, do not ack
+ LOG.warn("Error while flushing and closing writers, tuples will NOT be acknowledged");
+ for (Tuple t : tupleBatch)
+ collector.fail(t);
+ tupleBatch.clear();
+ }
}
}
@@ -170,7 +187,7 @@ public class HiveBolt extends BaseRichBolt {
}
}
- private void flushAllWriters(boolean rollToNext)
+ void flushAllWriters(boolean rollToNext)
throws HiveWriter.CommitFailure, HiveWriter.TxnBatchFailure, HiveWriter.TxnFailure, InterruptedException {
for(HiveWriter writer: allWriters.values()) {
writer.flush(rollToNext);
@@ -194,11 +211,12 @@ public class HiveBolt extends BaseRichBolt {
}
}
- private void flushAndCloseWriters() {
+ void flushAndCloseWriters() throws Exception {
try {
flushAllWriters(false);
} catch(Exception e) {
LOG.warn("unable to flush hive writers. ", e);
+ throw e;
} finally {
closeAllWriters();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/937346a3/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
index e7e875e..b83b960 100644
--- a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
+++ b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
@@ -45,9 +45,12 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.Spy;
import org.mockito.MockitoAnnotations;
import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.*;
+
import junit.framework.Assert;
import org.slf4j.Logger;
@@ -58,6 +61,8 @@ import java.util.HashMap;
import java.util.ArrayList;
import java.io.IOException;
import java.util.Date;
+import java.util.Set;
+import java.util.HashSet;
import java.text.SimpleDateFormat;
@@ -92,7 +97,6 @@ public class TestHiveBolt {
@Mock
private IOutputCollector collector;
-
private static final Logger LOG = LoggerFactory.getLogger(HiveBolt.class);
public TestHiveBolt() throws Exception {
@@ -151,12 +155,16 @@ public class TestHiveBolt {
String msg = "test-123";
String city = "sunnyvale";
String state = "ca";
- checkRecordCountInTable(tblName,dbName,0);
+ checkRecordCountInTable(tblName, dbName, 0);
+
+ Set<Tuple> tupleSet = new HashSet<Tuple>();
for (int i=0; i < 4; i++) {
Tuple tuple = generateTestTuple(id,msg,city,state);
bolt.execute(tuple);
- verify(collector).ack(tuple);
+ tupleSet.add(tuple);
}
+ for (Tuple t : tupleSet)
+ verify(collector).ack(t);
checkRecordCountInTable(tblName, dbName, 4);
bolt.cleanup();
}
@@ -181,11 +189,15 @@ public class TestHiveBolt {
String city = "sunnyvale";
String state = "ca";
checkRecordCountInTable(tblName1,dbName1,0);
+
+ Set<Tuple> tupleSet = new HashSet<Tuple>();
for (int i=0; i < 4; i++) {
Tuple tuple = generateTestTuple(id,msg,city,state);
bolt.execute(tuple);
- verify(collector).ack(tuple);
+ tupleSet.add(tuple);
}
+ for (Tuple t : tupleSet)
+ verify(collector).ack(t);
bolt.cleanup();
checkRecordCountInTable(tblName1, dbName1, 4);
}
@@ -196,8 +208,8 @@ public class TestHiveBolt {
String[] partNames1 = {"date"};
String timeFormat = "yyyy/MM/dd";
HiveSetupUtil.dropDB(conf,dbName1);
- HiveSetupUtil.createDbAndTable(conf, dbName1, tblName1,null,
- colNames,colTypes,partNames1, dbLocation);
+ HiveSetupUtil.createDbAndTable(conf, dbName1, tblName1, null,
+ colNames, colTypes, partNames1, dbLocation);
DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
.withColumnFields(new Fields(colNames))
.withTimeAsPartitionField(timeFormat);
@@ -211,13 +223,17 @@ public class TestHiveBolt {
Date d = new Date();
SimpleDateFormat parseDate = new SimpleDateFormat(timeFormat);
String today=parseDate.format(d.getTime());
- checkRecordCountInTable(tblName1,dbName1,0);
+ checkRecordCountInTable(tblName1, dbName1, 0);
+
+ Set<Tuple> tupleSet = new HashSet<Tuple>();
for (int i=0; i < 2; i++) {
Tuple tuple = generateTestTuple(id,msg,null,null);
+ tupleSet.add(tuple);
bolt.execute(tuple);
- verify(collector).ack(tuple);
}
- checkDataWritten(tblName1, dbName1, "100,test-123,"+today, "100,test-123,"+today);
+ for (Tuple t : tupleSet)
+ verify(collector).ack(t);
+ checkDataWritten(tblName1, dbName1, "100,test-123," + today, "100,test-123," + today);
bolt.cleanup();
}
@@ -231,8 +247,8 @@ public class TestHiveBolt {
.withTxnsPerBatch(2)
.withBatchSize(1);
bolt = new HiveBolt(hiveOptions);
- bolt.prepare(config,null,new OutputCollector(collector));
- Tuple tuple1 = generateTestTuple(1,"SJC","Sunnyvale","CA");
+ bolt.prepare(config, null, new OutputCollector(collector));
+ Tuple tuple1 = generateTestTuple(1, "SJC", "Sunnyvale", "CA");
//Tuple tuple2 = generateTestTuple(2,"SFO","San Jose","CA");
bolt.execute(tuple1);
verify(collector).ack(tuple1);
@@ -265,6 +281,60 @@ public class TestHiveBolt {
bolt.cleanup();
}
+ @Test
+ public void testNoAcksUntilFlushed()
+ {
+ JsonRecordHiveMapper mapper = new JsonRecordHiveMapper()
+ .withColumnFields(new Fields(colNames1))
+ .withPartitionFields(new Fields(partNames));
+ HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+ .withTxnsPerBatch(2)
+ .withBatchSize(2);
+
+ bolt = new HiveBolt(hiveOptions);
+ bolt.prepare(config, null, new OutputCollector(collector));
+
+ Tuple tuple1 = generateTestTuple(1,"SJC","Sunnyvale","CA");
+ Tuple tuple2 = generateTestTuple(2,"SFO","San Jose","CA");
+
+ bolt.execute(tuple1);
+ verifyZeroInteractions(collector);
+
+ bolt.execute(tuple2);
+ verify(collector).ack(tuple1);
+ verify(collector).ack(tuple2);
+ bolt.cleanup();
+ }
+
+ @Test
+ public void testNoAcksIfFlushFails() throws Exception
+ {
+ JsonRecordHiveMapper mapper = new JsonRecordHiveMapper()
+ .withColumnFields(new Fields(colNames1))
+ .withPartitionFields(new Fields(partNames));
+ HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+ .withTxnsPerBatch(2)
+ .withBatchSize(2);
+
+ HiveBolt spyBolt = Mockito.spy(new HiveBolt(hiveOptions));
+
+ //This forces a failure of all the flush attempts
+ doThrow(new InterruptedException()).when(spyBolt).flushAllWriters(true);
+ doThrow(new Exception()).when(spyBolt).flushAndCloseWriters();
+
+ spyBolt.prepare(config, null, new OutputCollector(collector));
+
+ Tuple tuple1 = generateTestTuple(1,"SJC","Sunnyvale","CA");
+ Tuple tuple2 = generateTestTuple(2,"SFO","San Jose","CA");
+
+ spyBolt.execute(tuple1);
+ spyBolt.execute(tuple2);
+
+ verify(collector, never()).ack(tuple1);
+ verify(collector, never()).ack(tuple2);
+
+ spyBolt.cleanup();
+ }
@Test
public void testMultiPartitionTuples()
@@ -282,12 +352,16 @@ public class TestHiveBolt {
String city = "San Jose";
String state = "CA";
checkRecordCountInTable(tblName,dbName,0);
+
+ Set<Tuple> tupleSet = new HashSet<Tuple>();
for(int i=0; i < 100; i++) {
Tuple tuple = generateTestTuple(id,msg,city,state);
+ tupleSet.add(tuple);
bolt.execute(tuple);
- verify(collector).ack(tuple);
}
checkRecordCountInTable(tblName, dbName, 100);
+ for (Tuple t : tupleSet)
+ verify(collector).ack(t);
bolt.cleanup();
}
[2/2] storm git commit: Added STORM-960 to CHANGELOG.
Posted by sr...@apache.org.
Added STORM-960 to CHANGELOG.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6de597a6
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6de597a6
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6de597a6
Branch: refs/heads/master
Commit: 6de597a6d004e56d1889df9b6a913c7391aff126
Parents: 937346a
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Fri Jul 31 07:27:43 2015 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Fri Jul 31 07:27:43 2015 -0700
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/6de597a6/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 6361734..1e511c7 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 0.11.0
+ * STORM-960: HiveBolt should ack tuples only after flushing.
* STORM-951: Storm Hive connector leaking connections.
* STORM_803: Better CI logs
* STORM-806: use storm.zookeeper.connection.timeout in storm-kafka ZkState when newCurator