You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ro...@apache.org on 2015/09/29 23:52:00 UTC

flume git commit: FLUME-2804. Hive sink - abort remaining transactions on shutdown

Repository: flume
Updated Branches:
  refs/heads/trunk 3fccd241d -> bd80c5e67


FLUME-2804. Hive sink - abort remaining transactions on shutdown

 (Sriharsha Chintalapani via Roshan Naik)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/bd80c5e6
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/bd80c5e6
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/bd80c5e6

Branch: refs/heads/trunk
Commit: bd80c5e67deda4ec146f19fc92ff0c3bff61a982
Parents: 3fccd24
Author: Roshan Naik <ro...@hortonworks.com>
Authored: Tue Sep 29 14:43:34 2015 -0700
Committer: Roshan Naik <ro...@hortonworks.com>
Committed: Tue Sep 29 14:47:12 2015 -0700

----------------------------------------------------------------------
 .../org/apache/flume/sink/hive/HiveSink.java    |  4 +-
 .../org/apache/flume/sink/hive/HiveWriter.java  | 59 +++++++++++++++++++-
 2 files changed, 59 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/bd80c5e6/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java
index 6fe332a..d93bca3 100644
--- a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java
+++ b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java
@@ -320,6 +320,7 @@ public class HiveSink extends AbstractSink implements Configurable {
       sinkCounter.addToEventDrainSuccessCount(txnEventCount);
       return txnEventCount;
     } catch (HiveWriter.Failure e) {
+      // in case of error we close all TxnBatches to start clean next time
       LOG.warn(getName() + " : " + e.getMessage(), e);
       abortAllWriters();
       closeAllWriters();
@@ -462,8 +463,7 @@ public class HiveSink extends AbstractSink implements Configurable {
     for (Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) {
       try {
         HiveWriter w = entry.getValue();
-        LOG.info("Closing connection to {}", w);
-        w.closeConnection();
+        w.close();
       } catch (InterruptedException ex) {
         Thread.currentThread().interrupt();
       }

http://git-wip-us.apache.org/repos/asf/flume/blob/bd80c5e6/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java
index 46309be..ec30c98 100644
--- a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java
+++ b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java
@@ -216,7 +216,7 @@ class HiveWriter {
   }
 
   /**
-   * Aborts the current Txn and switches to next Txn.
+   * Aborts the current Txn
    * @throws InterruptedException
    */
   public void abort()  throws InterruptedException {
@@ -253,11 +253,66 @@ class HiveWriter {
    */
   public void close() throws InterruptedException {
     batch.clear();
+    abortRemainingTxns();
     closeTxnBatch();
     closeConnection();
     closed = true;
   }
 
+
+  private void abortRemainingTxns() throws InterruptedException {
+      try {
+        if ( !isClosed(txnBatch.getCurrentTransactionState()) ) {
+          abortCurrTxnHelper();
+        }
+
+        // recursively abort remaining txns
+        if(txnBatch.remainingTransactions()>0) {
+          timedCall(
+                  new CallRunner1<Void>() {
+                    @Override
+                    public Void call() throws StreamingException, InterruptedException {
+                      txnBatch.beginNextTransaction();
+                      return null;
+                    }
+                  });
+          abortRemainingTxns();
+        }
+      } catch (StreamingException e) {
+        LOG.warn("Error when aborting remaining transactions in batch " + txnBatch, e);
+        return;
+      } catch (TimeoutException e) {
+        LOG.warn("Timed out when aborting remaining transactions in batch " + txnBatch, e);
+        return;
+      }
+  }
+
+  private void abortCurrTxnHelper() throws TimeoutException, InterruptedException {
+    try {
+      timedCall(
+              new CallRunner1<Void>() {
+                @Override
+                public Void call() throws StreamingException, InterruptedException {
+                  txnBatch.abort();
+                  LOG.info("Aborted txn " + txnBatch.getCurrentTxnId());
+                  return null;
+                }
+              }
+      );
+    } catch (StreamingException e) {
+      LOG.warn("Unable to abort transaction " + txnBatch.getCurrentTxnId(), e);
+      // continue to attempt to abort other txns in the batch
+    }
+  }
+
+  private boolean isClosed(TransactionBatch.TxnState txnState) {
+    if(txnState == TransactionBatch.TxnState.COMMITTED)
+      return true;
+    if(txnState == TransactionBatch.TxnState.ABORTED)
+      return true;
+    return false;
+  }
+
   public void closeConnection() throws InterruptedException {
     LOG.info("Closing connection to EndPoint : {}", endPoint);
     try {
@@ -346,7 +401,7 @@ class HiveWriter {
 
   private void closeTxnBatch() throws InterruptedException {
     try {
-      LOG.debug("Closing Txn Batch {}", txnBatch);
+      LOG.info("Closing Txn Batch {}.", txnBatch);
       timedCall(new CallRunner1<Void>() {
         @Override
         public Void call() throws InterruptedException, StreamingException {