You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2014/01/16 06:29:13 UTC
git commit: FLUME-2259. Transaction closure not happening for all the
scenario in HBaseSink.
Updated Branches:
refs/heads/trunk bfd8e508a -> cf2ac3713
FLUME-2259. Transaction closure not happening for all the scenario in HBaseSink.
(Gopinathan A via Hari Shreedharan)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/cf2ac371
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/cf2ac371
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/cf2ac371
Branch: refs/heads/trunk
Commit: cf2ac371351812f333960604a3a8ad4a510307ae
Parents: bfd8e50
Author: Hari Shreedharan <hs...@apache.org>
Authored: Wed Jan 15 21:27:16 2014 -0800
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Wed Jan 15 21:27:16 2014 -0800
----------------------------------------------------------------------
.../org/apache/flume/sink/hbase/HBaseSink.java | 112 ++++++++++---------
.../hbase/MockSimpleHbaseEventSerializer.java | 38 +++++++
.../apache/flume/sink/hbase/TestHBaseSink.java | 82 +++++++++++++-
3 files changed, 173 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/cf2ac371/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
index d5996c3..f5cb229 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
@@ -231,66 +231,32 @@ public class HBaseSink extends AbstractSink implements Configurable {
Transaction txn = channel.getTransaction();
List<Row> actions = new LinkedList<Row>();
List<Increment> incs = new LinkedList<Increment>();
- txn.begin();
- long i = 0;
- for(; i < batchSize; i++) {
- Event event = channel.take();
- if(event == null){
- status = Status.BACKOFF;
- if (i == 0) {
- sinkCounter.incrementBatchEmptyCount();
+ try {
+ txn.begin();
+ long i = 0;
+ for (; i < batchSize; i++) {
+ Event event = channel.take();
+ if (event == null) {
+ if (i == 0) {
+ status = Status.BACKOFF;
+ sinkCounter.incrementBatchEmptyCount();
+ } else {
+ sinkCounter.incrementBatchUnderflowCount();
+ }
+ break;
} else {
- sinkCounter.incrementBatchUnderflowCount();
+ serializer.initialize(event, columnFamily);
+ actions.addAll(serializer.getActions());
+ incs.addAll(serializer.getIncrements());
}
- break;
- } else {
- serializer.initialize(event, columnFamily);
- actions.addAll(serializer.getActions());
- incs.addAll(serializer.getIncrements());
}
- }
- if (i == batchSize) {
- sinkCounter.incrementBatchCompleteCount();
- }
- sinkCounter.addToEventDrainAttemptCount(i);
-
- putEventsAndCommit(actions, incs, txn);
- return status;
- }
+ if (i == batchSize) {
+ sinkCounter.incrementBatchCompleteCount();
+ }
+ sinkCounter.addToEventDrainAttemptCount(i);
- private void putEventsAndCommit(final List<Row> actions, final List<Increment> incs,
- Transaction txn) throws EventDeliveryException {
- try {
- runPrivileged(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws Exception {
- for(Row r : actions) {
- if(r instanceof Put) {
- ((Put)r).setWriteToWAL(enableWal);
- }
- // Newer versions of HBase - Increment implements Row.
- if(r instanceof Increment) {
- ((Increment)r).setWriteToWAL(enableWal);
- }
- }
- table.batch(actions);
- return null;
- }
- });
+ putEventsAndCommit(actions, incs, txn);
- runPrivileged(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws Exception {
- for (final Increment i : incs) {
- i.setWriteToWAL(enableWal);
- table.increment(i);
- }
- return null;
- }
- });
-
- txn.commit();
- sinkCounter.addToEventDrainSuccessCount(actions.size());
} catch (Throwable e) {
try{
txn.rollback();
@@ -313,6 +279,42 @@ public class HBaseSink extends AbstractSink implements Configurable {
} finally {
txn.close();
}
+ return status;
+ }
+
+ private void putEventsAndCommit(final List<Row> actions,
+ final List<Increment> incs, Transaction txn) throws Exception {
+
+ runPrivileged(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ for (Row r : actions) {
+ if (r instanceof Put) {
+ ((Put) r).setWriteToWAL(enableWal);
+ }
+ // Newer versions of HBase - Increment implements Row.
+ if (r instanceof Increment) {
+ ((Increment) r).setWriteToWAL(enableWal);
+ }
+ }
+ table.batch(actions);
+ return null;
+ }
+ });
+
+ runPrivileged(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ for (final Increment i : incs) {
+ i.setWriteToWAL(enableWal);
+ table.increment(i);
+ }
+ return null;
+ }
+ });
+
+ txn.commit();
+ sinkCounter.addToEventDrainSuccessCount(actions.size());
}
private <T> T runPrivileged(final PrivilegedExceptionAction<T> action)
throws Exception {
http://git-wip-us.apache.org/repos/asf/flume/blob/cf2ac371/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/MockSimpleHbaseEventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/MockSimpleHbaseEventSerializer.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/MockSimpleHbaseEventSerializer.java
new file mode 100644
index 0000000..9b2a850
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/MockSimpleHbaseEventSerializer.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.sink.hbase;
+
+import java.util.List;
+
+import org.apache.flume.FlumeException;
+import org.apache.hadoop.hbase.client.Row;
+
+class MockSimpleHbaseEventSerializer extends SimpleHbaseEventSerializer {
+
+ public static boolean throwException = false;
+
+ @Override
+ public List<Row> getActions() throws FlumeException {
+ if (throwException) {
+ throw new FlumeException("Exception for testing");
+ }
+ return super.getActions();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flume/blob/cf2ac371/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
index ab4128e..f41bf53 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
@@ -18,14 +18,15 @@
*/
package org.apache.flume.sink.hbase;
+import static org.mockito.Mockito.*;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
-
import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
@@ -35,14 +36,12 @@ import org.apache.flume.Transaction;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.conf.Configurables;
import org.apache.flume.event.EventBuilder;
-import org.apache.flume.sink.hbase.HBaseSink;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
-
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
@@ -60,7 +59,6 @@ public class TestHBaseSink {
private static Context ctx = new Context();
private static String valBase = "testing hbase sink: jham";
-
@BeforeClass
public static void setUp() throws Exception {
testUtility.startMiniCluster();
@@ -368,5 +366,81 @@ public class TestHBaseSink {
}
return results;
}
+
+ @Test
+ public void testTransactionStateOnChannelException() throws Exception {
+ ctx.put("batchSize", "1");
+ testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
+ HBaseSink sink = new HBaseSink(testUtility.getConfiguration());
+ Configurables.configure(sink, ctx);
+ // Reset the context to a higher batchSize
+ Channel channel = spy(new MemoryChannel());
+ Configurables.configure(channel, new Context());
+ sink.setChannel(channel);
+ sink.start();
+ Transaction tx = channel.getTransaction();
+ tx.begin();
+ Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + 0));
+ channel.put(e);
+ tx.commit();
+ tx.close();
+ doThrow(new ChannelException("Mock Exception")).when(channel).take();
+ try {
+ sink.process();
+ Assert.fail("take() method should throw exception");
+ } catch (ChannelException ex) {
+ Assert.assertEquals("Mock Exception", ex.getMessage());
+ }
+ doReturn(e).when(channel).take();
+ sink.process();
+ sink.stop();
+ HTable table = new HTable(testUtility.getConfiguration(), tableName);
+ byte[][] results = getResults(table, 1);
+ byte[] out = results[0];
+ Assert.assertArrayEquals(e.getBody(), out);
+ out = results[1];
+ Assert.assertArrayEquals(Longs.toByteArray(1), out);
+ testUtility.deleteTable(tableName.getBytes());
+ }
+
+ @Test
+ public void testTransactionStateOnSerializationException() throws Exception {
+ ctx.put("batchSize", "1");
+ ctx.put(HBaseSinkConfigurationConstants.CONFIG_SERIALIZER,
+ "org.apache.flume.sink.hbase.MockSimpleHbaseEventSerializer");
+ testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
+ HBaseSink sink = new HBaseSink(testUtility.getConfiguration());
+ Configurables.configure(sink, ctx);
+ // Reset the context to a higher batchSize
+ Channel channel = new MemoryChannel();
+ Configurables.configure(channel, new Context());
+ sink.setChannel(channel);
+ sink.start();
+ Transaction tx = channel.getTransaction();
+ tx.begin();
+ Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + 0));
+ channel.put(e);
+ tx.commit();
+ tx.close();
+ try {
+ MockSimpleHbaseEventSerializer.throwException = true;
+ sink.process();
+ Assert.fail("FlumeException expected from serilazer");
+ } catch (FlumeException ex) {
+ Assert.assertEquals("Exception for testing", ex.getMessage());
+ }
+ MockSimpleHbaseEventSerializer.throwException = false;
+ sink.process();
+ sink.stop();
+ HTable table = new HTable(testUtility.getConfiguration(), tableName);
+ byte[][] results = getResults(table, 1);
+ byte[] out = results[0];
+ Assert.assertArrayEquals(e.getBody(), out);
+ out = results[1];
+ Assert.assertArrayEquals(Longs.toByteArray(1), out);
+ testUtility.deleteTable(tableName.getBytes());
+ }
+
+
}