You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ro...@apache.org on 2015/08/26 03:42:01 UTC

flume git commit: FLUME-2754 - Hive Sink skipping first transaction in each Batch of Hive Transactions

Repository: flume
Updated Branches:
  refs/heads/trunk fff13b5e0 -> 318da2088


FLUME-2754 - Hive Sink skipping first transaction in each Batch of Hive Transactions

(Deepesh Khandelwal via Roshan Naik)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/318da208
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/318da208
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/318da208

Branch: refs/heads/trunk
Commit: 318da208844d02ed7554724ae526cefe94dd894c
Parents: fff13b5
Author: Roshan Naik <ro...@hortonworks.com>
Authored: Tue Aug 25 18:38:14 2015 -0700
Committer: Roshan Naik <ro...@hortonworks.com>
Committed: Tue Aug 25 18:39:23 2015 -0700

----------------------------------------------------------------------
 .../org/apache/flume/sink/hive/HiveWriter.java  | 10 ++++--
 .../apache/flume/sink/hive/TestHiveWriter.java  | 32 ++++++++++++++++++++
 2 files changed, 39 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/318da208/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java
index aa8576e..46309be 100644
--- a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java
+++ b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java
@@ -88,6 +88,7 @@ class HiveWriter {
       this.serializer = serializer;
       this.recordWriter = serializer.createRecordWriter(endPoint);
       this.txnBatch = nextTxnBatch(recordWriter);
+      this.txnBatch.beginNextTransaction();
       this.closed = false;
       this.lastUsed = System.currentTimeMillis();
     } catch (InterruptedException e) {
@@ -117,6 +118,10 @@ class HiveWriter {
     hearbeatNeeded = true;
   }
 
+  public int getRemainingTxns() {
+    return txnBatch.remainingTransactions();
+  }
+
 
   /**
    * Write data, update stats
@@ -212,7 +217,7 @@ class HiveWriter {
 
   /**
    * Aborts the current Txn and switches to next Txn.
-   * @throws StreamingException if could not get new Transaction Batch, or switch to next Txn
+   * @throws InterruptedException
    */
   public void abort()  throws InterruptedException {
     batch.clear();
@@ -332,8 +337,7 @@ class HiveWriter {
           return connection.fetchTransactionBatch(txnsPerBatch, recordWriter); // could block
         }
       });
-      LOG.info("Acquired Txn Batch {}. Switching to first txn", batch);
-      batch.beginNextTransaction();
+      LOG.info("Acquired Transaction batch {}", batch);
     } catch (Exception e) {
       throw new TxnBatchException(endPoint, e);
     }

http://git-wip-us.apache.org/repos/asf/flume/blob/318da208/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java
index 174f179..41bf0f6 100644
--- a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java
+++ b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java
@@ -174,6 +174,38 @@ public class TestHiveWriter {
     checkRecordCountInTable(3);
   }
 
+  @Test
+  public void testTxnBatchConsumption() throws Exception {
+    // get a small txn batch and consume it, then roll to new batch, very
+    // the number of remaining txns to ensure Txns are not accidentally skipped
+
+    HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, tblName, partVals);
+    SinkCounter sinkCounter = new SinkCounter(this.getClass().getName());
+
+    int txnPerBatch = 3;
+
+    HiveWriter writer = new HiveWriter(endPoint, txnPerBatch, true, timeout
+            , callTimeoutPool, "flumetest", serializer, sinkCounter);
+
+    Assert.assertEquals(writer.getRemainingTxns(),2);
+    writer.flush(true);
+
+    Assert.assertEquals(writer.getRemainingTxns(), 1);
+    writer.flush(true);
+
+    Assert.assertEquals(writer.getRemainingTxns(), 0);
+    writer.flush(true);
+
+    // flip over to next batch
+    Assert.assertEquals(writer.getRemainingTxns(), 2);
+    writer.flush(true);
+
+    Assert.assertEquals(writer.getRemainingTxns(), 1);
+
+    writer.close();
+
+  }
+
   private void checkRecordCountInTable(int expectedCount)
           throws CommandNeedRetryException, IOException {
     int count = TestUtil.listRecordsInTable(driver, dbName, tblName).size();