You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2016/06/10 19:53:18 UTC
flume git commit: FLUME-2910. AsyncHBaseSink: Failure callbacks
should log the exception that caused them
Repository: flume
Updated Branches:
refs/heads/trunk f46bee03e -> ee4999bc2
FLUME-2910. AsyncHBaseSink: Failure callbacks should log the exception that caused them
(Abraham Fine via Mike Percy)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/ee4999bc
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/ee4999bc
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/ee4999bc
Branch: refs/heads/trunk
Commit: ee4999bc23f42bc300ed87b0d46fd96418d6a185
Parents: f46bee0
Author: Mike Percy <mp...@cloudera.com>
Authored: Fri Jun 10 12:52:29 2016 -0700
Committer: Mike Percy <mp...@cloudera.com>
Committed: Fri Jun 10 12:52:29 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/flume/sink/hbase/AsyncHBaseSink.java | 11 ++++++-----
1 file changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/ee4999bc/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
index c1ff0c4..28f0de1 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
@@ -191,15 +191,15 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
Callback<Object, Object> putSuccessCallback =
new SuccessCallback<Object, Object>(
lock, callbacksReceived, condition);
- Callback<Object, Object> putFailureCallback =
- new FailureCallback<Object, Object>(
+ Callback<Object, Exception> putFailureCallback =
+ new FailureCallback<Object, Exception>(
lock, callbacksReceived, txnFail, condition);
Callback<Long, Long> incrementSuccessCallback =
new SuccessCallback<Long, Long>(
lock, callbacksReceived, condition);
- Callback<Long, Long> incrementFailureCallback =
- new FailureCallback<Long, Long>(
+ Callback<Long, Exception> incrementFailureCallback =
+ new FailureCallback<Long, Exception>(
lock, callbacksReceived, txnFail, condition);
Status status = Status.READY;
@@ -622,7 +622,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
}
}
- private class FailureCallback<R,T> implements Callback<R,T> {
+ private class FailureCallback<R,T extends Exception> implements Callback<R,T> {
private Lock lock;
private AtomicInteger callbacksReceived;
private AtomicBoolean txnFail;
@@ -639,6 +639,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
@Override
public R call(T arg) throws Exception {
+ logger.error("failure callback:", arg);
if (isTimeoutTesting) {
//tests set timeout to 10 seconds, so sleep for 4 seconds
try {