You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2016/06/10 19:53:18 UTC

flume git commit: FLUME-2910. AsyncHBaseSink: Failure callbacks should log the exception that caused them

Repository: flume
Updated Branches:
  refs/heads/trunk f46bee03e -> ee4999bc2


FLUME-2910. AsyncHBaseSink: Failure callbacks should log the exception that caused them

(Abraham Fine via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/ee4999bc
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/ee4999bc
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/ee4999bc

Branch: refs/heads/trunk
Commit: ee4999bc23f42bc300ed87b0d46fd96418d6a185
Parents: f46bee0
Author: Mike Percy <mp...@cloudera.com>
Authored: Fri Jun 10 12:52:29 2016 -0700
Committer: Mike Percy <mp...@cloudera.com>
Committed: Fri Jun 10 12:52:29 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/flume/sink/hbase/AsyncHBaseSink.java | 11 ++++++-----
 1 file changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/ee4999bc/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
index c1ff0c4..28f0de1 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
@@ -191,15 +191,15 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
     Callback<Object, Object> putSuccessCallback =
             new SuccessCallback<Object, Object>(
             lock, callbacksReceived, condition);
-    Callback<Object, Object> putFailureCallback =
-            new FailureCallback<Object, Object>(
+    Callback<Object, Exception> putFailureCallback =
+            new FailureCallback<Object, Exception>(
             lock, callbacksReceived, txnFail, condition);
 
     Callback<Long, Long> incrementSuccessCallback =
             new SuccessCallback<Long, Long>(
             lock, callbacksReceived, condition);
-    Callback<Long, Long> incrementFailureCallback =
-            new FailureCallback<Long, Long>(
+    Callback<Long, Exception> incrementFailureCallback =
+            new FailureCallback<Long, Exception>(
             lock, callbacksReceived, txnFail, condition);
 
     Status status = Status.READY;
@@ -622,7 +622,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
     }
   }
 
-  private class FailureCallback<R,T> implements Callback<R,T> {
+  private class FailureCallback<R,T extends Exception> implements Callback<R,T> {
     private Lock lock;
     private AtomicInteger callbacksReceived;
     private AtomicBoolean txnFail;
@@ -639,6 +639,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
 
     @Override
     public R call(T arg) throws Exception {
+      logger.error("failure callback:", arg);
       if (isTimeoutTesting) {
         //tests set timeout to 10 seconds, so sleep for 4 seconds
         try {