You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ro...@apache.org on 2015/09/29 23:52:00 UTC
flume git commit: FLUME-2804. Hive sink - abort remaining
transactions on shutdown
Repository: flume
Updated Branches:
refs/heads/trunk 3fccd241d -> bd80c5e67
FLUME-2804. Hive sink - abort remaining transactions on shutdown
(Sriharsha Chintalapani via Roshan Naik)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/bd80c5e6
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/bd80c5e6
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/bd80c5e6
Branch: refs/heads/trunk
Commit: bd80c5e67deda4ec146f19fc92ff0c3bff61a982
Parents: 3fccd24
Author: Roshan Naik <ro...@hortonworks.com>
Authored: Tue Sep 29 14:43:34 2015 -0700
Committer: Roshan Naik <ro...@hortonworks.com>
Committed: Tue Sep 29 14:47:12 2015 -0700
----------------------------------------------------------------------
.../org/apache/flume/sink/hive/HiveSink.java | 4 +-
.../org/apache/flume/sink/hive/HiveWriter.java | 59 +++++++++++++++++++-
2 files changed, 59 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/bd80c5e6/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java
index 6fe332a..d93bca3 100644
--- a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java
+++ b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java
@@ -320,6 +320,7 @@ public class HiveSink extends AbstractSink implements Configurable {
sinkCounter.addToEventDrainSuccessCount(txnEventCount);
return txnEventCount;
} catch (HiveWriter.Failure e) {
+ // in case of error we close all TxnBatches to start clean next time
LOG.warn(getName() + " : " + e.getMessage(), e);
abortAllWriters();
closeAllWriters();
@@ -462,8 +463,7 @@ public class HiveSink extends AbstractSink implements Configurable {
for (Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) {
try {
HiveWriter w = entry.getValue();
- LOG.info("Closing connection to {}", w);
- w.closeConnection();
+ w.close();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
http://git-wip-us.apache.org/repos/asf/flume/blob/bd80c5e6/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java
index 46309be..ec30c98 100644
--- a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java
+++ b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java
@@ -216,7 +216,7 @@ class HiveWriter {
}
/**
- * Aborts the current Txn and switches to next Txn.
+ * Aborts the current Txn
* @throws InterruptedException
*/
public void abort() throws InterruptedException {
@@ -253,11 +253,66 @@ class HiveWriter {
*/
public void close() throws InterruptedException {
batch.clear();
+ abortRemainingTxns();
closeTxnBatch();
closeConnection();
closed = true;
}
+
+ private void abortRemainingTxns() throws InterruptedException {
+ try {
+ if ( !isClosed(txnBatch.getCurrentTransactionState()) ) {
+ abortCurrTxnHelper();
+ }
+
+ // recursively abort remaining txns
+ if(txnBatch.remainingTransactions()>0) {
+ timedCall(
+ new CallRunner1<Void>() {
+ @Override
+ public Void call() throws StreamingException, InterruptedException {
+ txnBatch.beginNextTransaction();
+ return null;
+ }
+ });
+ abortRemainingTxns();
+ }
+ } catch (StreamingException e) {
+ LOG.warn("Error when aborting remaining transactions in batch " + txnBatch, e);
+ return;
+ } catch (TimeoutException e) {
+ LOG.warn("Timed out when aborting remaining transactions in batch " + txnBatch, e);
+ return;
+ }
+ }
+
+ private void abortCurrTxnHelper() throws TimeoutException, InterruptedException {
+ try {
+ timedCall(
+ new CallRunner1<Void>() {
+ @Override
+ public Void call() throws StreamingException, InterruptedException {
+ txnBatch.abort();
+ LOG.info("Aborted txn " + txnBatch.getCurrentTxnId());
+ return null;
+ }
+ }
+ );
+ } catch (StreamingException e) {
+ LOG.warn("Unable to abort transaction " + txnBatch.getCurrentTxnId(), e);
+ // continue to attempt to abort other txns in the batch
+ }
+ }
+
+ private boolean isClosed(TransactionBatch.TxnState txnState) {
+ if(txnState == TransactionBatch.TxnState.COMMITTED)
+ return true;
+ if(txnState == TransactionBatch.TxnState.ABORTED)
+ return true;
+ return false;
+ }
+
public void closeConnection() throws InterruptedException {
LOG.info("Closing connection to EndPoint : {}", endPoint);
try {
@@ -346,7 +401,7 @@ class HiveWriter {
private void closeTxnBatch() throws InterruptedException {
try {
- LOG.debug("Closing Txn Batch {}", txnBatch);
+ LOG.info("Closing Txn Batch {}.", txnBatch);
timedCall(new CallRunner1<Void>() {
@Override
public Void call() throws InterruptedException, StreamingException {