You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2016/09/08 08:10:04 UTC
[5/7] incubator-tephra git commit: TEPHRA-176,
TEPHRA-177: Adding maven modules for CDH-5.7, 5.8 support,
HBase-1.1 and HBase-1.2 modules
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2246abff/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
new file mode 100644
index 0000000..de1fa6b
--- /dev/null
+++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
@@ -0,0 +1,1606 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tephra.hbase;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.OperationWithAttributes;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.LongComparator;
+import org.apache.hadoop.hbase.filter.ValueFilter;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.TransactionConflictException;
+import org.apache.tephra.TransactionContext;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TransactionSystemClient;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.apache.tephra.inmemory.InMemoryTxSystemClient;
+import org.apache.tephra.metrics.TxMetricsCollector;
+import org.apache.tephra.persist.InMemoryTransactionStateStorage;
+import org.apache.tephra.persist.TransactionStateStorage;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for TransactionAwareHTables.
+ */
+public class TransactionAwareHTableTest {
+ private static final Logger LOG = LoggerFactory.getLogger(TransactionAwareHTableTest.class);
+
+ private static HBaseTestingUtility testUtil;
+ private static HBaseAdmin hBaseAdmin;
+ private static TransactionStateStorage txStateStorage;
+ private static TransactionManager txManager;
+ private static Configuration conf;
+ private TransactionContext transactionContext;
+ private TransactionAwareHTable transactionAwareHTable;
+ private HTable hTable;
+
+ private static final class TestBytes {
+ private static final byte[] table = Bytes.toBytes("testtable");
+ private static final byte[] family = Bytes.toBytes("f1");
+ private static final byte[] family2 = Bytes.toBytes("f2");
+ private static final byte[] qualifier = Bytes.toBytes("col1");
+ private static final byte[] qualifier2 = Bytes.toBytes("col2");
+ private static final byte[] row = Bytes.toBytes("row");
+ private static final byte[] row2 = Bytes.toBytes("row2");
+ private static final byte[] row3 = Bytes.toBytes("row3");
+ private static final byte[] row4 = Bytes.toBytes("row4");
+ private static final byte[] value = Bytes.toBytes("value");
+ private static final byte[] value2 = Bytes.toBytes("value2");
+ private static final byte[] value3 = Bytes.toBytes("value3");
+ }
+
+ private static final String TEST_ATTRIBUTE = "TEST_ATTRIBUTE";
+
+ public static class TestRegionObserver extends BaseRegionObserver {
+ @Override
+ public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c,
+ final Put put, final WALEdit edit,
+ final Durability durability) throws IOException {
+ if (put.getAttribute(TEST_ATTRIBUTE) == null) {
+ throw new DoNotRetryIOException("Put should preserve attributes");
+ }
+ if (put.getDurability() != Durability.USE_DEFAULT) {
+ throw new DoNotRetryIOException("Durability is not propagated correctly");
+ }
+ }
+
+ @Override
+ public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
+ final Delete delete, final WALEdit edit,
+ final Durability durability) throws IOException {
+ if (delete.getAttribute(TEST_ATTRIBUTE) == null) {
+ throw new DoNotRetryIOException("Delete should preserve attributes");
+ }
+ if (delete.getDurability() != Durability.USE_DEFAULT) {
+ throw new DoNotRetryIOException("Durability is not propagated correctly");
+ }
+ }
+ }
+
+ @BeforeClass
+ public static void setupBeforeClass() throws Exception {
+ testUtil = new HBaseTestingUtility();
+ conf = testUtil.getConfiguration();
+
+ // Tune down the connection thread pool size
+ conf.setInt("hbase.hconnection.threads.core", 5);
+ conf.setInt("hbase.hconnection.threads.max", 10);
+ // Tunn down handler threads in regionserver
+ conf.setInt("hbase.regionserver.handler.count", 10);
+
+ // Set to random port
+ conf.setInt("hbase.master.port", 0);
+ conf.setInt("hbase.master.info.port", 0);
+ conf.setInt("hbase.regionserver.port", 0);
+ conf.setInt("hbase.regionserver.info.port", 0);
+
+ testUtil.startMiniCluster();
+ hBaseAdmin = testUtil.getHBaseAdmin();
+ txStateStorage = new InMemoryTransactionStateStorage();
+ txManager = new TransactionManager(conf, txStateStorage, new TxMetricsCollector());
+ txManager.startAndWait();
+ }
+
+ @AfterClass
+ public static void shutdownAfterClass() throws Exception {
+ testUtil.shutdownMiniCluster();
+ hBaseAdmin.close();
+ }
+
+ @Before
+ public void setupBeforeTest() throws Exception {
+ hTable = createTable(TestBytes.table, new byte[][]{TestBytes.family});
+ transactionAwareHTable = new TransactionAwareHTable(hTable);
+ transactionContext = new TransactionContext(new InMemoryTxSystemClient(txManager), transactionAwareHTable);
+ }
+
+ @After
+ public void shutdownAfterTest() throws IOException {
+ hBaseAdmin.disableTable(TestBytes.table);
+ hBaseAdmin.deleteTable(TestBytes.table);
+ }
+
+ private HTable createTable(byte[] tableName, byte[][] columnFamilies) throws Exception {
+ return createTable(tableName, columnFamilies, false, Collections.<String>emptyList());
+ }
+
+ private HTable createTable(byte[] tableName, byte[][] columnFamilies, boolean existingData,
+ List<String> coprocessors) throws Exception {
+ HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
+ for (byte[] family : columnFamilies) {
+ HColumnDescriptor columnDesc = new HColumnDescriptor(family);
+ columnDesc.setMaxVersions(Integer.MAX_VALUE);
+ columnDesc.setValue(TxConstants.PROPERTY_TTL, String.valueOf(100000)); // in millis
+ desc.addFamily(columnDesc);
+ }
+ if (existingData) {
+ desc.setValue(TxConstants.READ_NON_TX_DATA, "true");
+ }
+ // Divide individually to prevent any overflow
+ int priority = Coprocessor.PRIORITY_USER;
+ desc.addCoprocessor(TransactionProcessor.class.getName(), null, priority, null);
+ // order in list is the same order that coprocessors will be invoked
+ for (String coprocessor : coprocessors) {
+ desc.addCoprocessor(coprocessor, null, ++priority, null);
+ }
+ hBaseAdmin.createTable(desc);
+ testUtil.waitTableAvailable(tableName, 5000);
+ return new HTable(testUtil.getConfiguration(), tableName);
+ }
+
+ /**
+ * Test transactional put and get requests.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testValidTransactionalPutAndGet() throws Exception {
+ transactionContext.start();
+ Put put = new Put(TestBytes.row);
+ put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
+ transactionAwareHTable.put(put);
+ transactionContext.finish();
+
+ transactionContext.start();
+ Result result = transactionAwareHTable.get(new Get(TestBytes.row));
+ transactionContext.finish();
+
+ byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier);
+ assertArrayEquals(TestBytes.value, value);
+ }
+
+ /**
+ * Test aborted put requests, that must be rolled back.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testAbortedTransactionPutAndGet() throws Exception {
+ transactionContext.start();
+ Put put = new Put(TestBytes.row);
+ put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
+ transactionAwareHTable.put(put);
+
+ transactionContext.abort();
+
+ transactionContext.start();
+ Result result = transactionAwareHTable.get(new Get(TestBytes.row));
+ transactionContext.finish();
+ byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier);
+ assertArrayEquals(value, null);
+ }
+
+ /**
+ * Test transactional delete operations.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testValidTransactionalDelete() throws Exception {
+ try (HTable hTable = createTable(Bytes.toBytes("TestValidTransactionalDelete"),
+ new byte[][]{TestBytes.family, TestBytes.family2})) {
+ TransactionAwareHTable txTable = new TransactionAwareHTable(hTable);
+ TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
+
+ txContext.start();
+ Put put = new Put(TestBytes.row);
+ put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
+ put.add(TestBytes.family2, TestBytes.qualifier, TestBytes.value2);
+ txTable.put(put);
+ txContext.finish();
+
+ txContext.start();
+ Result result = txTable.get(new Get(TestBytes.row));
+ txContext.finish();
+ byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier);
+ assertArrayEquals(TestBytes.value, value);
+ value = result.getValue(TestBytes.family2, TestBytes.qualifier);
+ assertArrayEquals(TestBytes.value2, value);
+
+ // test full row delete
+ txContext.start();
+ Delete delete = new Delete(TestBytes.row);
+ txTable.delete(delete);
+ txContext.finish();
+
+ txContext.start();
+ result = txTable.get(new Get(TestBytes.row));
+ txContext.finish();
+ assertTrue(result.isEmpty());
+
+ // test column delete
+ // load 10 rows
+ txContext.start();
+ int rowCount = 10;
+ for (int i = 0; i < rowCount; i++) {
+ Put p = new Put(Bytes.toBytes("row" + i));
+ for (int j = 0; j < 10; j++) {
+ p.add(TestBytes.family, Bytes.toBytes(j), TestBytes.value);
+ }
+ txTable.put(p);
+ }
+ txContext.finish();
+
+ // verify loaded rows
+ txContext.start();
+ for (int i = 0; i < rowCount; i++) {
+ Get g = new Get(Bytes.toBytes("row" + i));
+ Result r = txTable.get(g);
+ assertFalse(r.isEmpty());
+ for (int j = 0; j < 10; j++) {
+ assertArrayEquals(TestBytes.value, r.getValue(TestBytes.family, Bytes.toBytes(j)));
+ }
+ }
+ txContext.finish();
+
+ // delete odds columns from odd rows and even columns from even rows
+ txContext.start();
+ for (int i = 0; i < rowCount; i++) {
+ Delete d = new Delete(Bytes.toBytes("row" + i));
+ for (int j = 0; j < 10; j++) {
+ if (i % 2 == j % 2) {
+ d.deleteColumns(TestBytes.family, Bytes.toBytes(j));
+ }
+ }
+ txTable.delete(d);
+ }
+ txContext.finish();
+
+ // verify deleted columns
+ txContext.start();
+ for (int i = 0; i < rowCount; i++) {
+ Get g = new Get(Bytes.toBytes("row" + i));
+ Result r = txTable.get(g);
+ assertEquals(5, r.size());
+ for (Map.Entry<byte[], byte[]> entry : r.getFamilyMap(TestBytes.family).entrySet()) {
+ int col = Bytes.toInt(entry.getKey());
+ // each row should only have the opposite mod (odd=even, even=odd)
+ assertNotEquals(i % 2, col % 2);
+ assertArrayEquals(TestBytes.value, entry.getValue());
+ }
+ }
+ txContext.finish();
+
+ // test family delete
+ // load 10 rows
+ txContext.start();
+ for (int i = 0; i < rowCount; i++) {
+ Put p = new Put(Bytes.toBytes("famrow" + i));
+ p.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
+ p.add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value2);
+ txTable.put(p);
+ }
+ txContext.finish();
+
+ // verify all loaded rows
+ txContext.start();
+ for (int i = 0; i < rowCount; i++) {
+ Get g = new Get(Bytes.toBytes("famrow" + i));
+ Result r = txTable.get(g);
+ assertEquals(2, r.size());
+ assertArrayEquals(TestBytes.value, r.getValue(TestBytes.family, TestBytes.qualifier));
+ assertArrayEquals(TestBytes.value2, r.getValue(TestBytes.family2, TestBytes.qualifier2));
+ }
+ txContext.finish();
+
+ // delete family1 for even rows, family2 for odd rows
+ txContext.start();
+ for (int i = 0; i < rowCount; i++) {
+ Delete d = new Delete(Bytes.toBytes("famrow" + i));
+ d.deleteFamily((i % 2 == 0) ? TestBytes.family : TestBytes.family2);
+ txTable.delete(d);
+ }
+ txContext.finish();
+
+ // verify deleted families
+ txContext.start();
+ for (int i = 0; i < rowCount; i++) {
+ Get g = new Get(Bytes.toBytes("famrow" + i));
+ Result r = txTable.get(g);
+ assertEquals(1, r.size());
+ if (i % 2 == 0) {
+ assertNull(r.getValue(TestBytes.family, TestBytes.qualifier));
+ assertArrayEquals(TestBytes.value2, r.getValue(TestBytes.family2, TestBytes.qualifier2));
+ } else {
+ assertArrayEquals(TestBytes.value, r.getValue(TestBytes.family, TestBytes.qualifier));
+ assertNull(r.getValue(TestBytes.family2, TestBytes.qualifier2));
+ }
+ }
+ txContext.finish();
+ }
+ }
+
+ /**
+ * Test that put and delete attributes are preserved
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testAttributesPreserved() throws Exception {
+ HTable hTable = createTable(Bytes.toBytes("TestAttributesPreserved"),
+ new byte[][]{TestBytes.family, TestBytes.family2}, false,
+ Lists.newArrayList(TestRegionObserver.class.getName()));
+ try {
+ TransactionAwareHTable txTable = new TransactionAwareHTable(hTable);
+ TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
+
+ txContext.start();
+ Put put = new Put(TestBytes.row);
+ put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
+ put.add(TestBytes.family2, TestBytes.qualifier, TestBytes.value2);
+ // set an attribute on the put, TestRegionObserver will verify it still exists
+ put.setAttribute(TEST_ATTRIBUTE, new byte[]{});
+ txTable.put(put);
+ txContext.finish();
+
+ txContext.start();
+ Result result = txTable.get(new Get(TestBytes.row));
+ txContext.finish();
+ byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier);
+ assertArrayEquals(TestBytes.value, value);
+ value = result.getValue(TestBytes.family2, TestBytes.qualifier);
+ assertArrayEquals(TestBytes.value2, value);
+
+ // test full row delete, TestRegionObserver will verify it still exists
+ txContext.start();
+ Delete delete = new Delete(TestBytes.row);
+ delete.setAttribute(TEST_ATTRIBUTE, new byte[]{});
+ txTable.delete(delete);
+ txContext.finish();
+
+ txContext.start();
+ result = txTable.get(new Get(TestBytes.row));
+ txContext.finish();
+ assertTrue(result.isEmpty());
+ } finally {
+ hTable.close();
+ }
+ }
+
+ /**
+ * Test aborted transactional delete requests, that must be rolled back.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testAbortedTransactionalDelete() throws Exception {
+ transactionContext.start();
+ Put put = new Put(TestBytes.row);
+ put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
+ transactionAwareHTable.put(put);
+ transactionContext.finish();
+
+ transactionContext.start();
+ Result result = transactionAwareHTable.get(new Get(TestBytes.row));
+ transactionContext.finish();
+ byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier);
+ assertArrayEquals(TestBytes.value, value);
+
+ transactionContext.start();
+ Delete delete = new Delete(TestBytes.row);
+ transactionAwareHTable.delete(delete);
+ transactionContext.abort();
+
+ transactionContext.start();
+ result = transactionAwareHTable.get(new Get(TestBytes.row));
+ transactionContext.finish();
+ value = result.getValue(TestBytes.family, TestBytes.qualifier);
+ assertArrayEquals(TestBytes.value, value);
+ }
+
+ private void testDeleteRollback(TxConstants.ConflictDetection conflictDetection) throws Exception {
+ String tableName = String.format("%s%s", "TestColFamilyDelete", conflictDetection);
+ HTable hTable = createTable(Bytes.toBytes(tableName), new byte[][]{TestBytes.family});
+ try (TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, conflictDetection)) {
+ TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
+ txContext.start();
+ txTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
+ txContext.finish();
+
+ // Start a tx, delete the row and then abort the tx
+ txContext.start();
+ txTable.delete(new Delete(TestBytes.row));
+ txContext.abort();
+
+ // Start a tx, delete a column family and then abort the tx
+ txContext.start();
+ txTable.delete(new Delete(TestBytes.row).deleteFamily(TestBytes.family));
+ txContext.abort();
+
+ // Above operations should have no effect on the row, since they were aborted
+ txContext.start();
+ Get get = new Get(TestBytes.row);
+ Result result = txTable.get(get);
+ assertFalse(result.isEmpty());
+ assertArrayEquals(TestBytes.value, result.getValue(TestBytes.family, TestBytes.qualifier));
+ txContext.finish();
+ }
+ }
+
+ @Test
+ public void testDeleteRollback() throws Exception {
+ testDeleteRollback(TxConstants.ConflictDetection.ROW);
+ testDeleteRollback(TxConstants.ConflictDetection.COLUMN);
+ testDeleteRollback(TxConstants.ConflictDetection.NONE);
+ }
+
+ @Test
+ public void testMultiColumnFamilyRowDeleteRollback() throws Exception {
+ HTable hTable = createTable(Bytes.toBytes("TestMultColFam"), new byte[][] {TestBytes.family, TestBytes.family2});
+ try (TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, TxConstants.ConflictDetection.ROW)) {
+ TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
+ txContext.start();
+ txTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
+ txContext.finish();
+
+ txContext.start();
+ //noinspection ConstantConditions
+ txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT_ALL);
+ Result result = txTable.get(new Get(TestBytes.row));
+ Assert.assertEquals(1, result.getFamilyMap(TestBytes.family).size());
+ Assert.assertEquals(0, result.getFamilyMap(TestBytes.family2).size());
+ txContext.finish();
+
+ //Start a tx, delete the row and then abort the tx
+ txContext.start();
+ txTable.delete(new Delete(TestBytes.row));
+ txContext.abort();
+
+ //Start a tx and scan all the col families to make sure none of them have delete markers
+ txContext.start();
+ txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT_ALL);
+ result = txTable.get(new Get(TestBytes.row));
+ Assert.assertEquals(1, result.getFamilyMap(TestBytes.family).size());
+ Assert.assertEquals(0, result.getFamilyMap(TestBytes.family2).size());
+ txContext.finish();
+ }
+ }
+
+ @Test
+ public void testRowDelete() throws Exception {
+ HTable hTable = createTable(Bytes.toBytes("TestRowDelete"), new byte[][]{TestBytes.family, TestBytes.family2});
+ try (TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, TxConstants.ConflictDetection.ROW)) {
+ TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
+
+ // Test 1: full row delete
+ txContext.start();
+ txTable.put(new Put(TestBytes.row)
+ .add(TestBytes.family, TestBytes.qualifier, TestBytes.value)
+ .add(TestBytes.family, TestBytes.qualifier2, TestBytes.value2)
+ .add(TestBytes.family2, TestBytes.qualifier, TestBytes.value)
+ .add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value2));
+ txContext.finish();
+
+ txContext.start();
+ Get get = new Get(TestBytes.row);
+ Result result = txTable.get(get);
+ assertFalse(result.isEmpty());
+ assertArrayEquals(TestBytes.value, result.getValue(TestBytes.family, TestBytes.qualifier));
+ assertArrayEquals(TestBytes.value2, result.getValue(TestBytes.family, TestBytes.qualifier2));
+ assertArrayEquals(TestBytes.value, result.getValue(TestBytes.family2, TestBytes.qualifier));
+ assertArrayEquals(TestBytes.value2, result.getValue(TestBytes.family2, TestBytes.qualifier2));
+ txContext.finish();
+
+ // delete entire row
+ txContext.start();
+ txTable.delete(new Delete(TestBytes.row));
+ txContext.finish();
+
+ // verify row is now empty
+ txContext.start();
+ result = txTable.get(new Get(TestBytes.row));
+ assertTrue(result.isEmpty());
+
+ // verify row is empty for explicit column retrieval
+ result = txTable.get(new Get(TestBytes.row)
+ .addColumn(TestBytes.family, TestBytes.qualifier)
+ .addFamily(TestBytes.family2));
+ assertTrue(result.isEmpty());
+
+ // verify row is empty for scan
+ ResultScanner scanner = txTable.getScanner(new Scan(TestBytes.row));
+ assertNull(scanner.next());
+ scanner.close();
+
+ // verify row is empty for scan with explicit column
+ scanner = txTable.getScanner(new Scan(TestBytes.row).addColumn(TestBytes.family2, TestBytes.qualifier2));
+ assertNull(scanner.next());
+ scanner.close();
+ txContext.finish();
+
+ // write swapped values to one column per family
+ txContext.start();
+ txTable.put(new Put(TestBytes.row)
+ .add(TestBytes.family, TestBytes.qualifier, TestBytes.value2)
+ .add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value));
+ txContext.finish();
+
+ // verify new values appear
+ txContext.start();
+ result = txTable.get(new Get(TestBytes.row));
+ assertFalse(result.isEmpty());
+ assertEquals(2, result.size());
+ assertArrayEquals(TestBytes.value2, result.getValue(TestBytes.family, TestBytes.qualifier));
+ assertArrayEquals(TestBytes.value, result.getValue(TestBytes.family2, TestBytes.qualifier2));
+
+ scanner = txTable.getScanner(new Scan(TestBytes.row));
+ Result result1 = scanner.next();
+ assertNotNull(result1);
+ assertFalse(result1.isEmpty());
+ assertEquals(2, result1.size());
+ assertArrayEquals(TestBytes.value2, result.getValue(TestBytes.family, TestBytes.qualifier));
+ assertArrayEquals(TestBytes.value, result.getValue(TestBytes.family2, TestBytes.qualifier2));
+ scanner.close();
+ txContext.finish();
+
+ // Test 2: delete of first column family
+ txContext.start();
+ txTable.put(new Put(TestBytes.row2)
+ .add(TestBytes.family, TestBytes.qualifier, TestBytes.value)
+ .add(TestBytes.family, TestBytes.qualifier2, TestBytes.value2)
+ .add(TestBytes.family2, TestBytes.qualifier, TestBytes.value)
+ .add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value2));
+ txContext.finish();
+
+ txContext.start();
+ txTable.delete(new Delete(TestBytes.row2).deleteFamily(TestBytes.family));
+ txContext.finish();
+
+ txContext.start();
+ Result fam1Result = txTable.get(new Get(TestBytes.row2));
+ assertFalse(fam1Result.isEmpty());
+ assertEquals(2, fam1Result.size());
+ assertArrayEquals(TestBytes.value, fam1Result.getValue(TestBytes.family2, TestBytes.qualifier));
+ assertArrayEquals(TestBytes.value2, fam1Result.getValue(TestBytes.family2, TestBytes.qualifier2));
+ txContext.finish();
+
+ // Test 3: delete of second column family
+ txContext.start();
+ txTable.put(new Put(TestBytes.row3)
+ .add(TestBytes.family, TestBytes.qualifier, TestBytes.value)
+ .add(TestBytes.family, TestBytes.qualifier2, TestBytes.value2)
+ .add(TestBytes.family2, TestBytes.qualifier, TestBytes.value)
+ .add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value2));
+ txContext.finish();
+
+ txContext.start();
+ txTable.delete(new Delete(TestBytes.row3).deleteFamily(TestBytes.family2));
+ txContext.finish();
+
+ txContext.start();
+ Result fam2Result = txTable.get(new Get(TestBytes.row3));
+ assertFalse(fam2Result.isEmpty());
+ assertEquals(2, fam2Result.size());
+ assertArrayEquals(TestBytes.value, fam2Result.getValue(TestBytes.family, TestBytes.qualifier));
+ assertArrayEquals(TestBytes.value2, fam2Result.getValue(TestBytes.family, TestBytes.qualifier2));
+ txContext.finish();
+
+ // Test 4: delete specific rows in a range
+ txContext.start();
+ for (int i = 0; i < 10; i++) {
+ txTable.put(new Put(Bytes.toBytes("z" + i))
+ .add(TestBytes.family, TestBytes.qualifier, Bytes.toBytes(i))
+ .add(TestBytes.family2, TestBytes.qualifier2, Bytes.toBytes(i)));
+ }
+ txContext.finish();
+
+ txContext.start();
+ // delete odd rows
+ for (int i = 1; i < 10; i += 2) {
+ txTable.delete(new Delete(Bytes.toBytes("z" + i)));
+ }
+ txContext.finish();
+
+ txContext.start();
+ int cnt = 0;
+ ResultScanner zScanner = txTable.getScanner(new Scan(Bytes.toBytes("z0")));
+ Result res;
+ while ((res = zScanner.next()) != null) {
+ assertFalse(res.isEmpty());
+ assertArrayEquals(Bytes.toBytes("z" + cnt), res.getRow());
+ assertArrayEquals(Bytes.toBytes(cnt), res.getValue(TestBytes.family, TestBytes.qualifier));
+ assertArrayEquals(Bytes.toBytes(cnt), res.getValue(TestBytes.family2, TestBytes.qualifier2));
+ cnt += 2;
+ }
+
+ // Test 5: delete prior writes in the same transaction
+ txContext.start();
+ txTable.put(new Put(TestBytes.row4)
+ .add(TestBytes.family, TestBytes.qualifier, TestBytes.value)
+ .add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value2));
+ txTable.delete(new Delete(TestBytes.row4));
+ txContext.finish();
+
+ txContext.start();
+ Result row4Result = txTable.get(new Get(TestBytes.row4));
+ assertTrue(row4Result.isEmpty());
+ txContext.finish();
+ }
+ }
+
+ /**
+ * Expect an exception since a transaction hasn't been started.
+ *
+ * @throws Exception
+ */
+ @Test(expected = IOException.class)
+ public void testTransactionlessFailure() throws Exception {
+ transactionAwareHTable.get(new Get(TestBytes.row));
+ }
+
+ /**
+ * Tests that each transaction can see its own persisted writes, while not seeing writes from other
+ * in-progress transactions.
+ */
+ @Test
+ public void testReadYourWrites() throws Exception {
+ // In-progress tx1: started before our main transaction
+ HTable hTable1 = new HTable(testUtil.getConfiguration(), TestBytes.table);
+ TransactionAwareHTable txHTable1 = new TransactionAwareHTable(hTable1);
+ TransactionContext inprogressTxContext1 = new TransactionContext(new InMemoryTxSystemClient(txManager), txHTable1);
+
+ // In-progress tx2: started while our main transaction is running
+ HTable hTable2 = new HTable(testUtil.getConfiguration(), TestBytes.table);
+ TransactionAwareHTable txHTable2 = new TransactionAwareHTable(hTable2);
+ TransactionContext inprogressTxContext2 = new TransactionContext(new InMemoryTxSystemClient(txManager), txHTable2);
+
+ // create an in-progress write that should be ignored
+ byte[] col2 = Bytes.toBytes("col2");
+ inprogressTxContext1.start();
+ Put putCol2 = new Put(TestBytes.row);
+ byte[] valueCol2 = Bytes.toBytes("writing in progress");
+ putCol2.add(TestBytes.family, col2, valueCol2);
+ txHTable1.put(putCol2);
+
+ // start a tx and write a value to test reading in same tx
+ transactionContext.start();
+ Put put = new Put(TestBytes.row);
+ byte[] value = Bytes.toBytes("writing");
+ put.add(TestBytes.family, TestBytes.qualifier, value);
+ transactionAwareHTable.put(put);
+
+ // test that a write from a tx started after the first is not visible
+ inprogressTxContext2.start();
+ Put put2 = new Put(TestBytes.row);
+ byte[] value2 = Bytes.toBytes("writing2");
+ put2.add(TestBytes.family, TestBytes.qualifier, value2);
+ txHTable2.put(put2);
+
+ Get get = new Get(TestBytes.row);
+ Result row = transactionAwareHTable.get(get);
+ assertFalse(row.isEmpty());
+ byte[] col1Value = row.getValue(TestBytes.family, TestBytes.qualifier);
+ Assert.assertNotNull(col1Value);
+ Assert.assertArrayEquals(value, col1Value);
+ // write from in-progress transaction should not be visible
+ byte[] col2Value = row.getValue(TestBytes.family, col2);
+ assertNull(col2Value);
+
+ // commit in-progress transaction, should still not be visible
+ inprogressTxContext1.finish();
+
+ get = new Get(TestBytes.row);
+ row = transactionAwareHTable.get(get);
+ assertFalse(row.isEmpty());
+ col2Value = row.getValue(TestBytes.family, col2);
+ assertNull(col2Value);
+
+ transactionContext.finish();
+
+ inprogressTxContext2.abort();
+ }
+
+ @Test
+ public void testRowLevelConflictDetection() throws Exception {
+ TransactionAwareHTable txTable1 = new TransactionAwareHTable(new HTable(conf, TestBytes.table),
+ TxConstants.ConflictDetection.ROW);
+ TransactionContext txContext1 = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable1);
+
+ TransactionAwareHTable txTable2 = new TransactionAwareHTable(new HTable(conf, TestBytes.table),
+ TxConstants.ConflictDetection.ROW);
+ TransactionContext txContext2 = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable2);
+
+ byte[] row1 = Bytes.toBytes("row1");
+ byte[] row2 = Bytes.toBytes("row2");
+ byte[] col1 = Bytes.toBytes("c1");
+ byte[] col2 = Bytes.toBytes("c2");
+ byte[] val1 = Bytes.toBytes("val1");
+ byte[] val2 = Bytes.toBytes("val2");
+
+ // test that concurrent writing to different rows succeeds
+ txContext1.start();
+ txTable1.put(new Put(row1).add(TestBytes.family, col1, val1));
+
+ txContext2.start();
+ txTable2.put(new Put(row2).add(TestBytes.family, col1, val2));
+
+ // should be no conflicts
+ txContext1.finish();
+ txContext2.finish();
+
+ transactionContext.start();
+ Result res = transactionAwareHTable.get(new Get(row1));
+ assertFalse(res.isEmpty());
+ Cell cell = res.getColumnLatestCell(TestBytes.family, col1);
+ assertNotNull(cell);
+ assertArrayEquals(val1, CellUtil.cloneValue(cell));
+
+ res = transactionAwareHTable.get(new Get(row2));
+ assertFalse(res.isEmpty());
+ cell = res.getColumnLatestCell(TestBytes.family, col1);
+ assertNotNull(cell);
+ assertArrayEquals(val2, CellUtil.cloneValue(cell));
+ transactionContext.finish();
+
+ // test that writing to different columns in the same row fails
+ txContext1.start();
+ txTable1.put(new Put(row1).add(TestBytes.family, col1, val2));
+
+ txContext2.start();
+ txTable2.put(new Put(row1).add(TestBytes.family, col2, val2));
+
+ txContext1.finish();
+ try {
+ txContext2.finish();
+ fail("txContext2 should have encountered a row-level conflict during commit");
+ } catch (TransactionConflictException tce) {
+ txContext2.abort();
+ }
+
+ transactionContext.start();
+ res = transactionAwareHTable.get(new Get(row1));
+ assertFalse(res.isEmpty());
+ cell = res.getColumnLatestCell(TestBytes.family, col1);
+ assertNotNull(cell);
+ // should now be val2
+ assertArrayEquals(val2, CellUtil.cloneValue(cell));
+
+ cell = res.getColumnLatestCell(TestBytes.family, col2);
+ // col2 should not be visible due to conflict
+ assertNull(cell);
+ transactionContext.finish();
+
+ // test that writing to the same column in the same row fails
+ txContext1.start();
+ txTable1.put(new Put(row2).add(TestBytes.family, col2, val1));
+
+ txContext2.start();
+ txTable2.put(new Put(row2).add(TestBytes.family, col2, val2));
+
+ txContext1.finish();
+ try {
+ txContext2.finish();
+ fail("txContext2 should have encountered a row and column level conflict during commit");
+ } catch (TransactionConflictException tce) {
+ txContext2.abort();
+ }
+
+ transactionContext.start();
+ res = transactionAwareHTable.get(new Get(row2));
+ assertFalse(res.isEmpty());
+ cell = res.getColumnLatestCell(TestBytes.family, col2);
+ assertNotNull(cell);
+ // should now be val1
+ assertArrayEquals(val1, CellUtil.cloneValue(cell));
+ transactionContext.finish();
+
+ // verify change set that is being reported only on rows
+ txContext1.start();
+ txTable1.put(new Put(row1).add(TestBytes.family, col1, val1));
+ txTable1.put(new Put(row2).add(TestBytes.family, col2, val2));
+
+ Collection<byte[]> changeSet = txTable1.getTxChanges();
+ assertNotNull(changeSet);
+ assertEquals(2, changeSet.size());
+ assertTrue(changeSet.contains(txTable1.getChangeKey(row1, null, null)));
+ assertTrue(changeSet.contains(txTable1.getChangeKey(row2, null, null)));
+ txContext1.finish();
+ }
+
+ @Test
+ public void testNoneLevelConflictDetection() throws Exception {
+ InMemoryTxSystemClient txClient = new InMemoryTxSystemClient(txManager);
+ TransactionAwareHTable txTable1 = new TransactionAwareHTable(new HTable(conf, TestBytes.table),
+ TxConstants.ConflictDetection.NONE);
+ TransactionContext txContext1 = new TransactionContext(txClient, txTable1);
+
+ TransactionAwareHTable txTable2 = new TransactionAwareHTable(new HTable(conf, TestBytes.table),
+ TxConstants.ConflictDetection.NONE);
+ TransactionContext txContext2 = new TransactionContext(txClient, txTable2);
+
+ // overlapping writes to the same row + column should not conflict
+
+ txContext1.start();
+ txTable1.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
+
+ // changes should not be visible yet
+ txContext2.start();
+ Result row = txTable2.get(new Get(TestBytes.row));
+ assertTrue(row.isEmpty());
+
+ txTable2.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value2));
+
+ // both commits should succeed
+ txContext1.finish();
+ txContext2.finish();
+
+ txContext1.start();
+ row = txTable1.get(new Get(TestBytes.row));
+ assertFalse(row.isEmpty());
+ assertArrayEquals(TestBytes.value2, row.getValue(TestBytes.family, TestBytes.qualifier));
+ txContext1.finish();
+
+ // transaction abort should still rollback changes
+
+ txContext1.start();
+ txTable1.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
+ txContext1.abort();
+
+ // changes to row2 should be rolled back
+ txContext2.start();
+ Result row2 = txTable2.get(new Get(TestBytes.row2));
+ assertTrue(row2.isEmpty());
+ txContext2.finish();
+
+ // transaction invalidate should still make changes invisible
+
+ txContext1.start();
+ Transaction tx1 = txContext1.getCurrentTransaction();
+ txTable1.put(new Put(TestBytes.row3).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
+ assertNotNull(tx1);
+ txClient.invalidate(tx1.getWritePointer());
+
+ // changes to row2 should be rolled back
+ txContext2.start();
+ Result row3 = txTable2.get(new Get(TestBytes.row3));
+ assertTrue(row3.isEmpty());
+ txContext2.finish();
+ }
+
+ @Test
+ public void testCheckpoint() throws Exception {
+ // start a transaction, using checkpoints between writes
+ transactionContext.start();
+ transactionAwareHTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
+ Transaction origTx = transactionContext.getCurrentTransaction();
+ transactionContext.checkpoint();
+ Transaction postCheckpointTx = transactionContext.getCurrentTransaction();
+
+ assertEquals(origTx.getTransactionId(), postCheckpointTx.getTransactionId());
+ assertNotEquals(origTx.getWritePointer(), postCheckpointTx.getWritePointer());
+ long[] checkpointPtrs = postCheckpointTx.getCheckpointWritePointers();
+ assertEquals(1, checkpointPtrs.length);
+ assertEquals(postCheckpointTx.getWritePointer(), checkpointPtrs[0]);
+
+ transactionAwareHTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value2));
+ transactionContext.checkpoint();
+ Transaction postCheckpointTx2 = transactionContext.getCurrentTransaction();
+
+ assertEquals(origTx.getTransactionId(), postCheckpointTx2.getTransactionId());
+ assertNotEquals(postCheckpointTx.getWritePointer(), postCheckpointTx2.getWritePointer());
+ long[] checkpointPtrs2 = postCheckpointTx2.getCheckpointWritePointers();
+ assertEquals(2, checkpointPtrs2.length);
+ assertEquals(postCheckpointTx.getWritePointer(), checkpointPtrs2[0]);
+ assertEquals(postCheckpointTx2.getWritePointer(), checkpointPtrs2[1]);
+
+ transactionAwareHTable.put(new Put(TestBytes.row3).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
+
+ // by default, all rows should be visible with Read-Your-Writes
+ verifyRow(transactionAwareHTable, TestBytes.row, TestBytes.value);
+ verifyRow(transactionAwareHTable, TestBytes.row2, TestBytes.value2);
+ verifyRow(transactionAwareHTable, TestBytes.row3, TestBytes.value);
+
+ // when disabling current write pointer, only the previous checkpoints should be visible
+ transactionContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
+ Get get = new Get(TestBytes.row);
+ verifyRow(transactionAwareHTable, get, TestBytes.value);
+ get = new Get(TestBytes.row2);
+ verifyRow(transactionAwareHTable, get, TestBytes.value2);
+ get = new Get(TestBytes.row3);
+ verifyRow(transactionAwareHTable, get, null);
+
+ // test scan results excluding current write pointer
+ Scan scan = new Scan();
+ ResultScanner scanner = transactionAwareHTable.getScanner(scan);
+
+ Result row = scanner.next();
+ assertNotNull(row);
+ assertArrayEquals(TestBytes.row, row.getRow());
+ assertEquals(1, row.size());
+ assertArrayEquals(TestBytes.value, row.getValue(TestBytes.family, TestBytes.qualifier));
+
+ row = scanner.next();
+ assertNotNull(row);
+ assertArrayEquals(TestBytes.row2, row.getRow());
+ assertEquals(1, row.size());
+ assertArrayEquals(TestBytes.value2, row.getValue(TestBytes.family, TestBytes.qualifier));
+
+ row = scanner.next();
+ assertNull(row);
+ scanner.close();
+ transactionContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT);
+
+ // commit transaction, verify writes are visible
+ transactionContext.finish();
+
+ transactionContext.start();
+ verifyRow(transactionAwareHTable, TestBytes.row, TestBytes.value);
+ verifyRow(transactionAwareHTable, TestBytes.row2, TestBytes.value2);
+ verifyRow(transactionAwareHTable, TestBytes.row3, TestBytes.value);
+ transactionContext.finish();
+ }
+
+ @Test
+ public void testInProgressCheckpoint() throws Exception {
+ // start a transaction, using checkpoints between writes
+ transactionContext.start();
+ transactionAwareHTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
+ transactionContext.checkpoint();
+ transactionAwareHTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value2));
+
+ // check that writes are still not visible to other clients
+ TransactionAwareHTable txTable2 = new TransactionAwareHTable(new HTable(conf, TestBytes.table));
+ TransactionContext txContext2 = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable2);
+
+ txContext2.start();
+ verifyRow(txTable2, TestBytes.row, null);
+ verifyRow(txTable2, TestBytes.row2, null);
+ txContext2.finish();
+ txTable2.close();
+
+ transactionContext.finish();
+
+ // verify writes are visible after commit
+ transactionContext.start();
+ verifyRow(transactionAwareHTable, TestBytes.row, TestBytes.value);
+ verifyRow(transactionAwareHTable, TestBytes.row2, TestBytes.value2);
+ transactionContext.finish();
+ }
+
+ @Test
+ public void testCheckpointRollback() throws Exception {
+ // start a transaction, using checkpoints between writes
+ transactionContext.start();
+ transactionAwareHTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
+ transactionContext.checkpoint();
+ transactionAwareHTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value2));
+ transactionContext.checkpoint();
+ transactionAwareHTable.put(new Put(TestBytes.row3).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
+
+ transactionContext.abort();
+
+ transactionContext.start();
+ verifyRow(transactionAwareHTable, TestBytes.row, null);
+ verifyRow(transactionAwareHTable, TestBytes.row2, null);
+ verifyRow(transactionAwareHTable, TestBytes.row3, null);
+
+ Scan scan = new Scan();
+ ResultScanner scanner = transactionAwareHTable.getScanner(scan);
+ assertNull(scanner.next());
+ scanner.close();
+ transactionContext.finish();
+ }
+
+ @Test
+ public void testCheckpointInvalidate() throws Exception {
+ // start a transaction, using checkpoints between writes
+ transactionContext.start();
+ Transaction origTx = transactionContext.getCurrentTransaction();
+ transactionAwareHTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
+ transactionContext.checkpoint();
+ Transaction checkpointTx1 = transactionContext.getCurrentTransaction();
+ transactionAwareHTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value2));
+ transactionContext.checkpoint();
+ Transaction checkpointTx2 = transactionContext.getCurrentTransaction();
+ transactionAwareHTable.put(new Put(TestBytes.row3).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
+
+ TransactionSystemClient txClient = new InMemoryTxSystemClient(txManager);
+ txClient.invalidate(transactionContext.getCurrentTransaction().getTransactionId());
+
+ // check that writes are not visible
+ TransactionAwareHTable txTable2 = new TransactionAwareHTable(new HTable(conf, TestBytes.table));
+ TransactionContext txContext2 = new TransactionContext(txClient, txTable2);
+ txContext2.start();
+ Transaction newTx = txContext2.getCurrentTransaction();
+
+ // all 3 writes pointers from the previous transaction should now be excluded
+ assertTrue(newTx.isExcluded(origTx.getWritePointer()));
+ assertTrue(newTx.isExcluded(checkpointTx1.getWritePointer()));
+ assertTrue(newTx.isExcluded(checkpointTx2.getWritePointer()));
+
+ verifyRow(txTable2, TestBytes.row, null);
+ verifyRow(txTable2, TestBytes.row2, null);
+ verifyRow(txTable2, TestBytes.row3, null);
+
+ Scan scan = new Scan();
+ ResultScanner scanner = txTable2.getScanner(scan);
+ assertNull(scanner.next());
+ scanner.close();
+ txContext2.finish();
+ }
+
+ @Test
+ public void testExistingData() throws Exception {
+ byte[] val11 = Bytes.toBytes("val11");
+ byte[] val12 = Bytes.toBytes("val12");
+ byte[] val21 = Bytes.toBytes("val21");
+ byte[] val22 = Bytes.toBytes("val22");
+ byte[] val31 = Bytes.toBytes("val31");
+ byte[] val111 = Bytes.toBytes("val111");
+
+ TransactionAwareHTable txTable =
+ new TransactionAwareHTable(createTable(Bytes.toBytes("testExistingData"), new byte[][]{TestBytes.family}, true,
+ Collections.<String>emptyList()));
+ TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
+
+ // Add some pre-existing, non-transactional data
+ HTable nonTxTable = new HTable(testUtil.getConfiguration(), txTable.getTableName());
+ nonTxTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, val11));
+ nonTxTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier2, val12));
+ nonTxTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, val21));
+ nonTxTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier2, val22));
+ nonTxTable.put(new Put(TestBytes.row4).add(TestBytes.family, TxConstants.FAMILY_DELETE_QUALIFIER,
+ HConstants.EMPTY_BYTE_ARRAY));
+ nonTxTable.put(new Put(TestBytes.row4).add(TestBytes.family, TestBytes.qualifier, HConstants.EMPTY_BYTE_ARRAY));
+ nonTxTable.flushCommits();
+
+ // Add transactional data
+ txContext.start();
+ txTable.put(new Put(TestBytes.row3).add(TestBytes.family, TestBytes.qualifier, val31));
+ txContext.finish();
+
+ txContext.start();
+ // test get
+ verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier), val11);
+ verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier2), val12);
+ verifyRow(txTable, new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier), val21);
+ verifyRow(txTable, new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier2), val22);
+ verifyRow(txTable, new Get(TestBytes.row3).addColumn(TestBytes.family, TestBytes.qualifier), val31);
+ verifyRow(txTable, new Get(TestBytes.row4).addColumn(TestBytes.family, TxConstants.FAMILY_DELETE_QUALIFIER),
+ HConstants.EMPTY_BYTE_ARRAY);
+ verifyRow(txTable, new Get(TestBytes.row4).addColumn(TestBytes.family, TestBytes.qualifier),
+ HConstants.EMPTY_BYTE_ARRAY);
+
+ // test scan
+ try (ResultScanner scanner = txTable.getScanner(new Scan())) {
+ Result result = scanner.next();
+ assertNotNull(result);
+ assertArrayEquals(TestBytes.row, result.getRow());
+ assertArrayEquals(val11, result.getValue(TestBytes.family, TestBytes.qualifier));
+ assertArrayEquals(val12, result.getValue(TestBytes.family, TestBytes.qualifier2));
+ result = scanner.next();
+ assertNotNull(result);
+ assertArrayEquals(TestBytes.row2, result.getRow());
+ assertArrayEquals(val21, result.getValue(TestBytes.family, TestBytes.qualifier));
+ assertArrayEquals(val22, result.getValue(TestBytes.family, TestBytes.qualifier2));
+ result = scanner.next();
+ assertNotNull(result);
+ assertArrayEquals(TestBytes.row3, result.getRow());
+ assertArrayEquals(val31, result.getValue(TestBytes.family, TestBytes.qualifier));
+ result = scanner.next();
+ assertNotNull(result);
+ assertArrayEquals(TestBytes.row4, result.getRow());
+ assertArrayEquals(HConstants.EMPTY_BYTE_ARRAY, result.getValue(TestBytes.family,
+ TxConstants.FAMILY_DELETE_QUALIFIER));
+ assertArrayEquals(HConstants.EMPTY_BYTE_ARRAY, result.getValue(TestBytes.family, TestBytes.qualifier));
+ assertNull(scanner.next());
+ }
+ txContext.finish();
+
+ // test update and delete
+ txContext.start();
+ txTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, val111));
+ txTable.delete(new Delete(TestBytes.row2).deleteColumns(TestBytes.family, TestBytes.qualifier));
+ txContext.finish();
+
+ txContext.start();
+ verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier), val111);
+ verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier2), val12);
+ verifyRow(txTable, new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier), null);
+ verifyRow(txTable, new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier2), val22);
+ verifyRow(txTable, new Get(TestBytes.row3).addColumn(TestBytes.family, TestBytes.qualifier), val31);
+ verifyRow(txTable, new Get(TestBytes.row4).addColumn(TestBytes.family, TxConstants.FAMILY_DELETE_QUALIFIER),
+ HConstants.EMPTY_BYTE_ARRAY);
+ verifyRow(txTable, new Get(TestBytes.row4).addColumn(TestBytes.family, TestBytes.qualifier),
+ HConstants.EMPTY_BYTE_ARRAY);
+ txContext.finish();
+
+ // test scan
+ txContext.start();
+ try (ResultScanner scanner = txTable.getScanner(new Scan())) {
+ Result result = scanner.next();
+ assertNotNull(result);
+ assertArrayEquals(TestBytes.row, result.getRow());
+ assertArrayEquals(val111, result.getValue(TestBytes.family, TestBytes.qualifier));
+ assertArrayEquals(val12, result.getValue(TestBytes.family, TestBytes.qualifier2));
+ result = scanner.next();
+ assertNotNull(result);
+ assertArrayEquals(TestBytes.row2, result.getRow());
+ assertArrayEquals(null, result.getValue(TestBytes.family, TestBytes.qualifier));
+ assertArrayEquals(val22, result.getValue(TestBytes.family, TestBytes.qualifier2));
+ result = scanner.next();
+ assertNotNull(result);
+ assertArrayEquals(TestBytes.row3, result.getRow());
+ assertArrayEquals(val31, result.getValue(TestBytes.family, TestBytes.qualifier));
+ result = scanner.next();
+ assertNotNull(result);
+ assertArrayEquals(TestBytes.row4, result.getRow());
+ assertArrayEquals(HConstants.EMPTY_BYTE_ARRAY, result.getValue(TestBytes.family,
+ TxConstants.FAMILY_DELETE_QUALIFIER));
+ assertArrayEquals(HConstants.EMPTY_BYTE_ARRAY, result.getValue(TestBytes.family, TestBytes.qualifier));
+ assertNull(scanner.next());
+ }
+ txContext.finish();
+ }
+
+ private void verifyRow(HTableInterface table, byte[] rowkey, byte[] expectedValue) throws Exception {
+ verifyRow(table, new Get(rowkey), expectedValue);
+ }
+
+ private void verifyRow(HTableInterface table, Get get, byte[] expectedValue) throws Exception {
+ verifyRows(table, get, expectedValue == null ? null : ImmutableList.of(expectedValue));
+ }
+
+ private void verifyRows(HTableInterface table, Get get, List<byte[]> expectedValues) throws Exception {
+ Result result = table.get(get);
+ if (expectedValues == null) {
+ assertTrue(result.isEmpty());
+ } else {
+ assertFalse(result.isEmpty());
+ byte[] family = TestBytes.family;
+ byte[] col = TestBytes.qualifier;
+ if (get.hasFamilies()) {
+ family = get.getFamilyMap().keySet().iterator().next();
+ col = get.getFamilyMap().get(family).first();
+ }
+ Iterator<Cell> it = result.getColumnCells(family, col).iterator();
+ for (byte[] expectedValue : expectedValues) {
+ Assert.assertTrue(it.hasNext());
+ assertArrayEquals(expectedValue, CellUtil.cloneValue(it.next()));
+ }
+ }
+ }
+
+ private Cell[] getRow(HTableInterface table, Get get) throws Exception {
+ Result result = table.get(get);
+ return result.rawCells();
+ }
+
+ private void verifyScan(HTableInterface table, Scan scan, List<KeyValue> expectedCells) throws Exception {
+ List<Cell> actualCells = new ArrayList<>();
+ try (ResultScanner scanner = table.getScanner(scan)) {
+ Result[] results = scanner.next(expectedCells.size() + 1);
+ for (Result result : results) {
+ actualCells.addAll(Lists.newArrayList(result.rawCells()));
+ }
+ Assert.assertEquals(expectedCells, actualCells);
+ }
+ }
+
+ @Test
+ public void testVisibilityAll() throws Exception {
+ HTable nonTxTable = createTable(Bytes.toBytes("testVisibilityAll"),
+ new byte[][]{TestBytes.family, TestBytes.family2}, true, Collections.<String>emptyList());
+ TransactionAwareHTable txTable =
+ new TransactionAwareHTable(nonTxTable,
+ TxConstants.ConflictDetection.ROW); // ROW conflict detection to verify family deletes
+ TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
+
+ // start a transaction and create a delete marker
+ txContext.start();
+ //noinspection ConstantConditions
+ long txWp0 = txContext.getCurrentTransaction().getWritePointer();
+ txTable.delete(new Delete(TestBytes.row).deleteColumn(TestBytes.family, TestBytes.qualifier2));
+ txContext.finish();
+
+ // start a new transaction and write some values
+ txContext.start();
+ @SuppressWarnings("ConstantConditions")
+ long txWp1 = txContext.getCurrentTransaction().getWritePointer();
+ txTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
+ txTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier2, TestBytes.value2));
+ txTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
+ txTable.put(new Put(TestBytes.row).add(TestBytes.family2, TestBytes.qualifier, TestBytes.value));
+ txTable.put(new Put(TestBytes.row).add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value2));
+
+ // verify written data
+ verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier),
+ TestBytes.value);
+ verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier2),
+ TestBytes.value2);
+ verifyRow(txTable, new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier),
+ TestBytes.value);
+ verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family2, TestBytes.qualifier),
+ TestBytes.value);
+ verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family2, TestBytes.qualifier2),
+ TestBytes.value2);
+
+ // checkpoint and make changes to written data now
+ txContext.checkpoint();
+ long txWp2 = txContext.getCurrentTransaction().getWritePointer();
+ // delete a column
+ txTable.delete(new Delete(TestBytes.row).deleteColumn(TestBytes.family, TestBytes.qualifier));
+ // no change to a column
+ txTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier2, TestBytes.value2));
+ // update a column
+ txTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value3));
+ // delete column family
+ txTable.delete(new Delete(TestBytes.row).deleteFamily(TestBytes.family2));
+
+ // verify changed values
+ verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier),
+ null);
+ verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier2),
+ TestBytes.value2);
+ verifyRow(txTable, new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier),
+ TestBytes.value3);
+ verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family2, TestBytes.qualifier),
+ null);
+ verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family2, TestBytes.qualifier2),
+ null);
+
+ // run a scan with VisibilityLevel.ALL, this should return all raw changes by this transaction,
+ // and the raw change by prior transaction
+ //noinspection ConstantConditions
+ txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT_ALL);
+ List<KeyValue> expected = ImmutableList.of(
+ new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier, txWp2, KeyValue.Type.DeleteColumn),
+ new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier, txWp1, TestBytes.value),
+ new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier2, txWp2, TestBytes.value2),
+ new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier2, txWp1, TestBytes.value2),
+ new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier2, txWp0, KeyValue.Type.DeleteColumn),
+ new KeyValue(TestBytes.row, TestBytes.family2, null, txWp2, KeyValue.Type.DeleteFamily),
+ new KeyValue(TestBytes.row, TestBytes.family2, TestBytes.qualifier, txWp1, TestBytes.value),
+ new KeyValue(TestBytes.row, TestBytes.family2, TestBytes.qualifier2, txWp1, TestBytes.value2),
+ new KeyValue(TestBytes.row2, TestBytes.family, TestBytes.qualifier, txWp2, TestBytes.value3),
+ new KeyValue(TestBytes.row2, TestBytes.family, TestBytes.qualifier, txWp1, TestBytes.value)
+ );
+ verifyScan(txTable, new Scan(), expected);
+
+ // verify a Get is also able to return all snapshot versions
+ Get get = new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier);
+ Cell[] cells = getRow(txTable, get);
+ Assert.assertEquals(2, cells.length);
+ Assert.assertTrue(CellUtil.isDelete(cells[0]));
+ Assert.assertArrayEquals(TestBytes.value, CellUtil.cloneValue(cells[1]));
+
+ get = new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier2);
+ cells = getRow(txTable, get);
+ Assert.assertEquals(3, cells.length);
+ Assert.assertArrayEquals(TestBytes.value2, CellUtil.cloneValue(cells[0]));
+ Assert.assertArrayEquals(TestBytes.value2, CellUtil.cloneValue(cells[1]));
+ Assert.assertTrue(CellUtil.isDeleteColumns(cells[2]));
+
+ verifyRows(txTable, new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier),
+ ImmutableList.of(TestBytes.value3, TestBytes.value));
+
+ get = new Get(TestBytes.row).addColumn(TestBytes.family2, TestBytes.qualifier);
+ cells = getRow(txTable, get);
+ Assert.assertEquals(2, cells.length);
+ Assert.assertTrue(CellUtil.isDelete(cells[0]));
+ Assert.assertArrayEquals(TestBytes.value, CellUtil.cloneValue(cells[1]));
+
+ get = new Get(TestBytes.row).addColumn(TestBytes.family2, TestBytes.qualifier2);
+ cells = getRow(txTable, get);
+ Assert.assertEquals(2, cells.length);
+ Assert.assertTrue(CellUtil.isDelete(cells[0]));
+ Assert.assertArrayEquals(TestBytes.value2, CellUtil.cloneValue(cells[1]));
+
+ // Verify VisibilityLevel.SNAPSHOT
+ txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT);
+ expected = ImmutableList.of(
+ new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier2, txWp2, TestBytes.value2),
+ new KeyValue(TestBytes.row2, TestBytes.family, TestBytes.qualifier, txWp2, TestBytes.value3)
+ );
+ verifyScan(txTable, new Scan(), expected);
+
+ // Verify VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT
+ txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
+ expected = ImmutableList.of(
+ new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier, txWp1, TestBytes.value),
+ new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier2, txWp1, TestBytes.value2),
+ new KeyValue(TestBytes.row, TestBytes.family2, TestBytes.qualifier, txWp1, TestBytes.value),
+ new KeyValue(TestBytes.row, TestBytes.family2, TestBytes.qualifier2, txWp1, TestBytes.value2),
+ new KeyValue(TestBytes.row2, TestBytes.family, TestBytes.qualifier, txWp1, TestBytes.value)
+ );
+ verifyScan(txTable, new Scan(), expected);
+ txContext.finish();
+
+ // finally verify values once more after commit, this time we should get only committed raw values for
+ // all visibility levels
+ txContext.start();
+ txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT_ALL);
+ expected = ImmutableList.of(
+ new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier, txWp2, KeyValue.Type.DeleteColumn),
+ new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier2, txWp2, TestBytes.value2),
+ new KeyValue(TestBytes.row, TestBytes.family2, null, txWp2, KeyValue.Type.DeleteFamily),
+ new KeyValue(TestBytes.row2, TestBytes.family, TestBytes.qualifier, txWp2, TestBytes.value3)
+ );
+ verifyScan(txTable, new Scan(), expected);
+
+ txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT);
+ expected = ImmutableList.of(
+ new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier2, txWp2, TestBytes.value2),
+ new KeyValue(TestBytes.row2, TestBytes.family, TestBytes.qualifier, txWp2, TestBytes.value3)
+ );
+ verifyScan(txTable, new Scan(), expected);
+
+ txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
+ expected = ImmutableList.of(
+ new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier2, txWp2, TestBytes.value2),
+ new KeyValue(TestBytes.row2, TestBytes.family, TestBytes.qualifier, txWp2, TestBytes.value3)
+ );
+ verifyScan(txTable, new Scan(), expected);
+
+ verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier),
+ null);
+ verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier2),
+ TestBytes.value2);
+ verifyRow(txTable, new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier),
+ TestBytes.value3);
+ txContext.finish();
+
+ // Test with regular HBase deletes in pre-existing data
+ long now = System.currentTimeMillis();
+ Delete deleteColumn = new Delete(TestBytes.row3).deleteColumn(TestBytes.family, TestBytes.qualifier, now - 1);
+ // to prevent Tephra from replacing delete with delete marker
+ deleteColumn.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
+ nonTxTable.delete(deleteColumn);
+ Delete deleteFamily = new Delete(TestBytes.row3).deleteFamily(TestBytes.family2, now);
+ // to prevent Tephra from replacing delete with delete marker
+ deleteFamily.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
+ nonTxTable.delete(deleteFamily);
+ nonTxTable.flushCommits();
+
+ txContext.start();
+ txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT_ALL);
+ expected = ImmutableList.of(
+ new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier, txWp2, KeyValue.Type.DeleteColumn),
+ new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier2, txWp2, TestBytes.value2),
+ new KeyValue(TestBytes.row, TestBytes.family2, null, txWp2, KeyValue.Type.DeleteFamily),
+ new KeyValue(TestBytes.row2, TestBytes.family, TestBytes.qualifier, txWp2, TestBytes.value3),
+ new KeyValue(TestBytes.row3, TestBytes.family, TestBytes.qualifier, now - 1, KeyValue.Type.Delete),
+ new KeyValue(TestBytes.row3, TestBytes.family2, null, now, KeyValue.Type.DeleteFamily)
+ );
+ // test scan
+ Scan scan = new Scan();
+ scan.setRaw(true);
+ verifyScan(txTable, scan, expected);
+ txContext.finish();
+ }
+
+ @Test
+ public void testFilters() throws Exception {
+ // Add some values to table
+ transactionContext.start();
+ Put put = new Put(TestBytes.row);
+ byte[] val1 = Bytes.toBytes(1L);
+ put.add(TestBytes.family, TestBytes.qualifier, val1);
+ transactionAwareHTable.put(put);
+ put = new Put(TestBytes.row2);
+ byte[] val2 = Bytes.toBytes(2L);
+ put.add(TestBytes.family, TestBytes.qualifier, val2);
+ transactionAwareHTable.put(put);
+ put = new Put(TestBytes.row3);
+ byte[] val3 = Bytes.toBytes(3L);
+ put.add(TestBytes.family, TestBytes.qualifier, val3);
+ transactionAwareHTable.put(put);
+ put = new Put(TestBytes.row4);
+ byte[] val4 = Bytes.toBytes(4L);
+ put.add(TestBytes.family, TestBytes.qualifier, val4);
+ transactionAwareHTable.put(put);
+ transactionContext.finish();
+
+ // Delete cell with value 2
+ transactionContext.start();
+ Delete delete = new Delete(TestBytes.row2);
+ delete.addColumn(TestBytes.family, TestBytes.qualifier);
+ transactionAwareHTable.delete(delete);
+ transactionContext.finish();
+
+ // Scan for values less than 4, should get only values 1 and 3
+ transactionContext.start();
+ Scan scan = new Scan(TestBytes.row, new ValueFilter(CompareFilter.CompareOp.LESS, new LongComparator(4)));
+ try (ResultScanner scanner = transactionAwareHTable.getScanner(scan)) {
+ Result result = scanner.next();
+ assertNotNull(result);
+ assertArrayEquals(TestBytes.row, result.getRow());
+ assertArrayEquals(val1, result.getValue(TestBytes.family, TestBytes.qualifier));
+ result = scanner.next();
+ assertNotNull(result);
+ assertArrayEquals(TestBytes.row3, result.getRow());
+ assertArrayEquals(val3, result.getValue(TestBytes.family, TestBytes.qualifier));
+ result = scanner.next();
+ assertNull(result);
+ }
+ transactionContext.finish();
+
+ // Run a Get with a filter for less than 10 on row4, should get value 4
+ transactionContext.start();
+ Get get = new Get(TestBytes.row4);
+ get.setFilter(new ValueFilter(CompareFilter.CompareOp.LESS, new LongComparator(10)));
+ Result result = transactionAwareHTable.get(get);
+ assertFalse(result.isEmpty());
+ assertArrayEquals(val4, result.getValue(TestBytes.family, TestBytes.qualifier));
+ transactionContext.finish();
+
+ // Change value of row4 to 40
+ transactionContext.start();
+ put = new Put(TestBytes.row4);
+ byte[] val40 = Bytes.toBytes(40L);
+ put.add(TestBytes.family, TestBytes.qualifier, val40);
+ transactionAwareHTable.put(put);
+ transactionContext.finish();
+
+ // Scan for values less than 10, should get only values 1 and 3
+ transactionContext.start();
+ scan = new Scan(TestBytes.row, new ValueFilter(CompareFilter.CompareOp.LESS, new LongComparator(10)));
+ try (ResultScanner scanner = transactionAwareHTable.getScanner(scan)) {
+ result = scanner.next();
+ assertNotNull(result);
+ assertArrayEquals(TestBytes.row, result.getRow());
+ assertArrayEquals(val1, result.getValue(TestBytes.family, TestBytes.qualifier));
+ result = scanner.next();
+ assertNotNull(result);
+ assertArrayEquals(TestBytes.row3, result.getRow());
+ assertArrayEquals(val3, result.getValue(TestBytes.family, TestBytes.qualifier));
+ result = scanner.next();
+ assertNull(result);
+ }
+ transactionContext.finish();
+
+ // Run the Get again with a filter for less than 10 on row4, this time should not get any results
+ transactionContext.start();
+ result = transactionAwareHTable.get(get);
+ assertTrue(result.isEmpty());
+ transactionContext.finish();
+ }
+
+ /**
+ * Tests that transaction co-processor works with older clients
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testOlderClientOperations() throws Exception {
+ // Use old HTable to test
+ TransactionAwareHTable oldTxAware = new OldTransactionAwareHTable(hTable);
+ transactionContext.addTransactionAware(oldTxAware);
+
+ transactionContext.start();
+ Put put = new Put(TestBytes.row);
+ put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
+ oldTxAware.put(put);
+ transactionContext.finish();
+
+ transactionContext.start();
+ long txId = transactionContext.getCurrentTransaction().getTransactionId();
+ put = new Put(TestBytes.row);
+ put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value2);
+ oldTxAware.put(put);
+ // Invalidate the second Put
+ TransactionSystemClient txClient = new InMemoryTxSystemClient(txManager);
+ txClient.invalidate(txId);
+
+ transactionContext.start();
+ put = new Put(TestBytes.row);
+ put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value3);
+ oldTxAware.put(put);
+ // Abort the third Put
+ transactionContext.abort();
+
+ // Get should now return the first value
+ transactionContext.start();
+ Result result = oldTxAware.get(new Get(TestBytes.row));
+ transactionContext.finish();
+
+ byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier);
+ assertArrayEquals(TestBytes.value, value);
+ }
+
+ /**
+ * Represents older transaction clients
+ */
+ private static class OldTransactionAwareHTable extends TransactionAwareHTable {
+ public OldTransactionAwareHTable(HTableInterface hTable) {
+ super(hTable);
+ }
+
+ @Override
+ public void addToOperation(OperationWithAttributes op, Transaction tx) throws IOException {
+ op.setAttribute(TxConstants.OLD_TX_OPERATION_ATTRIBUTE_KEY, txCodec.encode(tx));
+ }
+
+ @Override
+ protected void makeRollbackOperation(Delete delete) {
+ delete.setAttribute(TxConstants.OLD_TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2246abff/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/CellSkipFilterTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/CellSkipFilterTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/CellSkipFilterTest.java
new file mode 100644
index 0000000..428d3b0
--- /dev/null
+++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/CellSkipFilterTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.coprocessor;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * HBase 1.1 specific test for testing {@link CellSkipFilter}.
+ */
+public class CellSkipFilterTest {
+
+ private static final String ROW1KEY = "row1";
+ private static final String ROW2KEY = "row2";
+ private static final String FAM1KEY = "fam1";
+ private static final String COL1KEY = "col1";
+ private static final String FAM2KEY = "fam2";
+ private static final String COL2KEY = "col2";
+ private static final String VALUE = "value";
+
+ @Test
+ public void testSkipFiltering() throws Exception {
+ long timestamp = System.currentTimeMillis();
+ // Test to check that we get NEXT_COL once the INCLUDE_AND_NEXT_COL is returned for the same key
+ Filter filter = new CellSkipFilter(new MyFilter(0));
+ assertEquals(Filter.ReturnCode.INCLUDE, filter.filterKeyValue(newKeyValue(ROW1KEY, FAM1KEY, COL1KEY, VALUE,
+ timestamp)));
+ assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, filter.filterKeyValue(newKeyValue(ROW1KEY, FAM1KEY, COL1KEY,
+ VALUE, timestamp - 1)));
+
+ // Next call should get NEXT_COL instead of SKIP, as it would be returned by CellSkipFilter
+ assertEquals(Filter.ReturnCode.NEXT_COL, filter.filterKeyValue(newKeyValue(ROW1KEY, FAM1KEY, COL1KEY, VALUE,
+ timestamp - 2)));
+
+ // Next call with the same key should return the NEXT_COL again, as it would be returned by CellSkipFilter
+ assertEquals(Filter.ReturnCode.NEXT_COL, filter.filterKeyValue(newKeyValue(ROW1KEY, FAM1KEY, COL1KEY, VALUE,
+ timestamp - 3)));
+
+ // Since MyFilter counter is not incremented in the previous call, filtering for the different keyvalue should
+ // give SKIP from MyFilter
+ assertEquals(Filter.ReturnCode.SKIP, filter.filterKeyValue(newKeyValue(ROW1KEY, FAM2KEY, COL1KEY, VALUE,
+ timestamp - 4)));
+
+ // Test to check that we get NEXT_COL once the NEXT_COL is returned for the same key
+ filter = new CellSkipFilter(new MyFilter(2));
+ assertEquals(Filter.ReturnCode.SKIP, filter.filterKeyValue(newKeyValue(ROW1KEY, FAM1KEY, COL1KEY, VALUE,
+ timestamp)));
+ assertEquals(Filter.ReturnCode.NEXT_COL, filter.filterKeyValue(newKeyValue(ROW1KEY, FAM1KEY, COL1KEY, VALUE,
+ timestamp - 1)));
+
+ // Next call should get NEXT_COL instead of NEXT_ROW, as it would be returned by CellSkipFilter
+ assertEquals(Filter.ReturnCode.NEXT_COL, filter.filterKeyValue(newKeyValue(ROW1KEY, FAM1KEY, COL1KEY, VALUE,
+ timestamp - 2)));
+
+ // Next call with the same key should return the NEXT_COL again, as it would be returned by CellSkipFilter
+ assertEquals(Filter.ReturnCode.NEXT_COL, filter.filterKeyValue(newKeyValue(ROW1KEY, FAM1KEY, COL1KEY, VALUE,
+ timestamp - 3)));
+
+ // Since MyFilter counter is not incremented in the previous call, filtering for the different keyvalue should
+ // give NEXT_ROW from MyFilter
+ assertEquals(Filter.ReturnCode.NEXT_ROW, filter.filterKeyValue(newKeyValue(ROW1KEY, FAM1KEY, COL2KEY, VALUE,
+ timestamp - 4)));
+
+ // Next call with the new key should returned the SEEK_NEXT_USING_HINT
+ assertEquals(Filter.ReturnCode.SEEK_NEXT_USING_HINT, filter.filterKeyValue(newKeyValue(ROW2KEY, FAM1KEY, COL1KEY,
+ VALUE, timestamp - 5)));
+ }
+
+ private KeyValue newKeyValue(String rowkey, String family, String column, String value, long timestamp) {
+ return new KeyValue(Bytes.toBytes(rowkey), Bytes.toBytes(family), Bytes.toBytes(column),
+ timestamp, Bytes.toBytes(value));
+ }
+
+ /**
+ * Sample filter for testing. This filter maintains the {@link List} of {@link ReturnCode}s. It accepts the
+ * start index in the list and start serving the return codes corresponding that that index. Every time the
+ * return code is served, index is incremented.
+ */
+ class MyFilter extends FilterBase {
+
+ private final List<ReturnCode> returnCodes;
+ private int counter;
+
+ public MyFilter(int startIndex) {
+ returnCodes = Arrays.asList(ReturnCode.INCLUDE, ReturnCode.INCLUDE_AND_NEXT_COL, ReturnCode.SKIP,
+ ReturnCode.NEXT_COL, ReturnCode.NEXT_ROW, ReturnCode.SEEK_NEXT_USING_HINT);
+ counter = startIndex;
+ }
+
+ @Override
+ public ReturnCode filterKeyValue(Cell cell) throws IOException {
+ ReturnCode code = returnCodes.get(counter % returnCodes.size());
+ counter++;
+ return code;
+ }
+ }
+}