You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2014/01/16 06:29:13 UTC

git commit: FLUME-2259. Transaction closure not happening for all the scenario in HBaseSink.

Updated Branches:
  refs/heads/trunk bfd8e508a -> cf2ac3713


FLUME-2259. Transaction closure not happening for all the scenario in HBaseSink.

(Gopinathan A via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/cf2ac371
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/cf2ac371
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/cf2ac371

Branch: refs/heads/trunk
Commit: cf2ac371351812f333960604a3a8ad4a510307ae
Parents: bfd8e50
Author: Hari Shreedharan <hs...@apache.org>
Authored: Wed Jan 15 21:27:16 2014 -0800
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Wed Jan 15 21:27:16 2014 -0800

----------------------------------------------------------------------
 .../org/apache/flume/sink/hbase/HBaseSink.java  | 112 ++++++++++---------
 .../hbase/MockSimpleHbaseEventSerializer.java   |  38 +++++++
 .../apache/flume/sink/hbase/TestHBaseSink.java  |  82 +++++++++++++-
 3 files changed, 173 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/cf2ac371/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
index d5996c3..f5cb229 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
@@ -231,66 +231,32 @@ public class HBaseSink extends AbstractSink implements Configurable {
     Transaction txn = channel.getTransaction();
     List<Row> actions = new LinkedList<Row>();
     List<Increment> incs = new LinkedList<Increment>();
-    txn.begin();
-    long i = 0;
-    for(; i < batchSize; i++) {
-      Event event = channel.take();
-      if(event == null){
-        status = Status.BACKOFF;
-        if (i == 0) {
-          sinkCounter.incrementBatchEmptyCount();
+    try {
+      txn.begin();
+      long i = 0;
+      for (; i < batchSize; i++) {
+        Event event = channel.take();
+        if (event == null) {
+          if (i == 0) {
+            status = Status.BACKOFF;
+            sinkCounter.incrementBatchEmptyCount();
+          } else {
+            sinkCounter.incrementBatchUnderflowCount();
+          }
+          break;
         } else {
-          sinkCounter.incrementBatchUnderflowCount();
+          serializer.initialize(event, columnFamily);
+          actions.addAll(serializer.getActions());
+          incs.addAll(serializer.getIncrements());
         }
-        break;
-      } else {
-        serializer.initialize(event, columnFamily);
-        actions.addAll(serializer.getActions());
-        incs.addAll(serializer.getIncrements());
       }
-    }
-    if (i == batchSize) {
-      sinkCounter.incrementBatchCompleteCount();
-    }
-    sinkCounter.addToEventDrainAttemptCount(i);
-
-    putEventsAndCommit(actions, incs, txn);
-    return status;
-  }
+      if (i == batchSize) {
+        sinkCounter.incrementBatchCompleteCount();
+      }
+      sinkCounter.addToEventDrainAttemptCount(i);
 
-  private void putEventsAndCommit(final List<Row> actions, final List<Increment> incs,
-      Transaction txn) throws EventDeliveryException {
-    try {
-      runPrivileged(new PrivilegedExceptionAction<Void>() {
-        @Override
-        public Void run() throws Exception {
-          for(Row r : actions) {
-            if(r instanceof Put) {
-              ((Put)r).setWriteToWAL(enableWal);
-            }
-            // Newer versions of HBase - Increment implements Row.
-            if(r instanceof Increment) {
-              ((Increment)r).setWriteToWAL(enableWal);
-            }
-          }
-          table.batch(actions);
-          return null;
-        }
-      });
+      putEventsAndCommit(actions, incs, txn);
 
-      runPrivileged(new PrivilegedExceptionAction<Void>() {
-        @Override
-        public Void run() throws Exception {
-          for (final Increment i : incs) {
-            i.setWriteToWAL(enableWal);
-            table.increment(i);
-          }
-          return null;
-        }
-      });
-
-      txn.commit();
-      sinkCounter.addToEventDrainSuccessCount(actions.size());
     } catch (Throwable e) {
       try{
         txn.rollback();
@@ -313,6 +279,42 @@ public class HBaseSink extends AbstractSink implements Configurable {
     } finally {
       txn.close();
     }
+    return status;
+  }
+
+  private void putEventsAndCommit(final List<Row> actions,
+      final List<Increment> incs, Transaction txn) throws Exception {
+
+    runPrivileged(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        for (Row r : actions) {
+          if (r instanceof Put) {
+            ((Put) r).setWriteToWAL(enableWal);
+          }
+          // Newer versions of HBase - Increment implements Row.
+          if (r instanceof Increment) {
+            ((Increment) r).setWriteToWAL(enableWal);
+          }
+        }
+        table.batch(actions);
+        return null;
+      }
+    });
+
+    runPrivileged(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        for (final Increment i : incs) {
+          i.setWriteToWAL(enableWal);
+          table.increment(i);
+        }
+        return null;
+      }
+    });
+
+    txn.commit();
+    sinkCounter.addToEventDrainSuccessCount(actions.size());
   }
   private <T> T runPrivileged(final PrivilegedExceptionAction<T> action)
           throws Exception {

http://git-wip-us.apache.org/repos/asf/flume/blob/cf2ac371/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/MockSimpleHbaseEventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/MockSimpleHbaseEventSerializer.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/MockSimpleHbaseEventSerializer.java
new file mode 100644
index 0000000..9b2a850
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/MockSimpleHbaseEventSerializer.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.sink.hbase;
+
+import java.util.List;
+
+import org.apache.flume.FlumeException;
+import org.apache.hadoop.hbase.client.Row;
+
+class MockSimpleHbaseEventSerializer extends SimpleHbaseEventSerializer {
+
+  public static boolean throwException = false;
+
+  @Override
+  public List<Row> getActions() throws FlumeException {
+    if (throwException) {
+      throw new FlumeException("Exception for testing");
+    }
+    return super.getActions();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/cf2ac371/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
index ab4128e..f41bf53 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
@@ -18,14 +18,15 @@
  */
 package org.apache.flume.sink.hbase;
 
+import static org.mockito.Mockito.*;
 
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
-
 import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
@@ -35,14 +36,12 @@ import org.apache.flume.Transaction;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
-import org.apache.flume.sink.hbase.HBaseSink;
 import org.apache.hadoop.hbase.*;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
-
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
@@ -60,7 +59,6 @@ public class TestHBaseSink {
   private static Context ctx = new Context();
   private static String valBase = "testing hbase sink: jham";
 
-
   @BeforeClass
   public static void setUp() throws Exception {
     testUtility.startMiniCluster();
@@ -368,5 +366,81 @@ public class TestHBaseSink {
     }
     return results;
   }
+
+  @Test
+  public void testTransactionStateOnChannelException() throws Exception {
+    ctx.put("batchSize", "1");
+    testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
+    HBaseSink sink = new HBaseSink(testUtility.getConfiguration());
+    Configurables.configure(sink, ctx);
+    // Reset the context to a higher batchSize
+    Channel channel = spy(new MemoryChannel());
+    Configurables.configure(channel, new Context());
+    sink.setChannel(channel);
+    sink.start();
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + 0));
+    channel.put(e);
+    tx.commit();
+    tx.close();
+    doThrow(new ChannelException("Mock Exception")).when(channel).take();
+    try {
+      sink.process();
+      Assert.fail("take() method should throw exception");
+    } catch (ChannelException ex) {
+      Assert.assertEquals("Mock Exception", ex.getMessage());
+    }
+    doReturn(e).when(channel).take();
+    sink.process();
+    sink.stop();
+    HTable table = new HTable(testUtility.getConfiguration(), tableName);
+    byte[][] results = getResults(table, 1);
+    byte[] out = results[0];
+    Assert.assertArrayEquals(e.getBody(), out);
+    out = results[1];
+    Assert.assertArrayEquals(Longs.toByteArray(1), out);
+    testUtility.deleteTable(tableName.getBytes());
+  }
+
+  @Test
+  public void testTransactionStateOnSerializationException() throws Exception {
+    ctx.put("batchSize", "1");
+    ctx.put(HBaseSinkConfigurationConstants.CONFIG_SERIALIZER,
+        "org.apache.flume.sink.hbase.MockSimpleHbaseEventSerializer");
+    testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
+    HBaseSink sink = new HBaseSink(testUtility.getConfiguration());
+    Configurables.configure(sink, ctx);
+    // Reset the context to a higher batchSize
+    Channel channel = new MemoryChannel();
+    Configurables.configure(channel, new Context());
+    sink.setChannel(channel);
+    sink.start();
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + 0));
+    channel.put(e);
+    tx.commit();
+    tx.close();
+    try {
+      MockSimpleHbaseEventSerializer.throwException = true;
+      sink.process();
+      Assert.fail("FlumeException expected from serilazer");
+    } catch (FlumeException ex) {
+      Assert.assertEquals("Exception for testing", ex.getMessage());
+    }
+    MockSimpleHbaseEventSerializer.throwException = false;
+    sink.process();
+    sink.stop();
+    HTable table = new HTable(testUtility.getConfiguration(), tableName);
+    byte[][] results = getResults(table, 1);
+    byte[] out = results[0];
+    Assert.assertArrayEquals(e.getBody(), out);
+    out = results[1];
+    Assert.assertArrayEquals(Longs.toByteArray(1), out);
+    testUtility.deleteTable(tableName.getBytes());
+  }
+
+
 }