You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ro...@apache.org on 2015/08/26 03:42:01 UTC
flume git commit: FLUME-2754 - Hive Sink skipping first transaction
in each Batch of Hive Transactions
Repository: flume
Updated Branches:
refs/heads/trunk fff13b5e0 -> 318da2088
FLUME-2754 - Hive Sink skipping first transaction in each Batch of Hive Transactions
(Deepesh Khandelwal via Roshan Naik)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/318da208
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/318da208
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/318da208
Branch: refs/heads/trunk
Commit: 318da208844d02ed7554724ae526cefe94dd894c
Parents: fff13b5
Author: Roshan Naik <ro...@hortonworks.com>
Authored: Tue Aug 25 18:38:14 2015 -0700
Committer: Roshan Naik <ro...@hortonworks.com>
Committed: Tue Aug 25 18:39:23 2015 -0700
----------------------------------------------------------------------
.../org/apache/flume/sink/hive/HiveWriter.java | 10 ++++--
.../apache/flume/sink/hive/TestHiveWriter.java | 32 ++++++++++++++++++++
2 files changed, 39 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/318da208/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java
index aa8576e..46309be 100644
--- a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java
+++ b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java
@@ -88,6 +88,7 @@ class HiveWriter {
this.serializer = serializer;
this.recordWriter = serializer.createRecordWriter(endPoint);
this.txnBatch = nextTxnBatch(recordWriter);
+ this.txnBatch.beginNextTransaction();
this.closed = false;
this.lastUsed = System.currentTimeMillis();
} catch (InterruptedException e) {
@@ -117,6 +118,10 @@ class HiveWriter {
hearbeatNeeded = true;
}
+ public int getRemainingTxns() {
+ return txnBatch.remainingTransactions();
+ }
+
/**
* Write data, update stats
@@ -212,7 +217,7 @@ class HiveWriter {
/**
* Aborts the current Txn and switches to next Txn.
- * @throws StreamingException if could not get new Transaction Batch, or switch to next Txn
+ * @throws InterruptedException
*/
public void abort() throws InterruptedException {
batch.clear();
@@ -332,8 +337,7 @@ class HiveWriter {
return connection.fetchTransactionBatch(txnsPerBatch, recordWriter); // could block
}
});
- LOG.info("Acquired Txn Batch {}. Switching to first txn", batch);
- batch.beginNextTransaction();
+ LOG.info("Acquired Transaction batch {}", batch);
} catch (Exception e) {
throw new TxnBatchException(endPoint, e);
}
http://git-wip-us.apache.org/repos/asf/flume/blob/318da208/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java
index 174f179..41bf0f6 100644
--- a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java
+++ b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java
@@ -174,6 +174,38 @@ public class TestHiveWriter {
checkRecordCountInTable(3);
}
+ @Test
+ public void testTxnBatchConsumption() throws Exception {
+ // get a small txn batch and consume it, then roll to new batch, very
+ // the number of remaining txns to ensure Txns are not accidentally skipped
+
+ HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, tblName, partVals);
+ SinkCounter sinkCounter = new SinkCounter(this.getClass().getName());
+
+ int txnPerBatch = 3;
+
+ HiveWriter writer = new HiveWriter(endPoint, txnPerBatch, true, timeout
+ , callTimeoutPool, "flumetest", serializer, sinkCounter);
+
+ Assert.assertEquals(writer.getRemainingTxns(),2);
+ writer.flush(true);
+
+ Assert.assertEquals(writer.getRemainingTxns(), 1);
+ writer.flush(true);
+
+ Assert.assertEquals(writer.getRemainingTxns(), 0);
+ writer.flush(true);
+
+ // flip over to next batch
+ Assert.assertEquals(writer.getRemainingTxns(), 2);
+ writer.flush(true);
+
+ Assert.assertEquals(writer.getRemainingTxns(), 1);
+
+ writer.close();
+
+ }
+
private void checkRecordCountInTable(int expectedCount)
throws CommandNeedRetryException, IOException {
int count = TestUtil.listRecordsInTable(driver, dbName, tblName).size();