You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2016/09/08 08:10:01 UTC
[2/7] incubator-tephra git commit: TEPHRA-176,
TEPHRA-177: Adding maven modules for CDH-5.7, 5.8 support,
HBase-1.1 and HBase-1.2 modules
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2246abff/tephra-hbase-compat-1.1/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java b/tephra-hbase-compat-1.1/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
deleted file mode 100644
index de1fa6b..0000000
--- a/tephra-hbase-compat-1.1/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
+++ /dev/null
@@ -1,1606 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tephra.hbase;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.Coprocessor;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.OperationWithAttributes;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.filter.CompareFilter;
-import org.apache.hadoop.hbase.filter.LongComparator;
-import org.apache.hadoop.hbase.filter.ValueFilter;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.tephra.Transaction;
-import org.apache.tephra.TransactionConflictException;
-import org.apache.tephra.TransactionContext;
-import org.apache.tephra.TransactionManager;
-import org.apache.tephra.TransactionSystemClient;
-import org.apache.tephra.TxConstants;
-import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
-import org.apache.tephra.inmemory.InMemoryTxSystemClient;
-import org.apache.tephra.metrics.TxMetricsCollector;
-import org.apache.tephra.persist.InMemoryTransactionStateStorage;
-import org.apache.tephra.persist.TransactionStateStorage;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for TransactionAwareHTables.
- */
-public class TransactionAwareHTableTest {
- private static final Logger LOG = LoggerFactory.getLogger(TransactionAwareHTableTest.class);
-
- private static HBaseTestingUtility testUtil;
- private static HBaseAdmin hBaseAdmin;
- private static TransactionStateStorage txStateStorage;
- private static TransactionManager txManager;
- private static Configuration conf;
- private TransactionContext transactionContext;
- private TransactionAwareHTable transactionAwareHTable;
- private HTable hTable;
-
- private static final class TestBytes {
- private static final byte[] table = Bytes.toBytes("testtable");
- private static final byte[] family = Bytes.toBytes("f1");
- private static final byte[] family2 = Bytes.toBytes("f2");
- private static final byte[] qualifier = Bytes.toBytes("col1");
- private static final byte[] qualifier2 = Bytes.toBytes("col2");
- private static final byte[] row = Bytes.toBytes("row");
- private static final byte[] row2 = Bytes.toBytes("row2");
- private static final byte[] row3 = Bytes.toBytes("row3");
- private static final byte[] row4 = Bytes.toBytes("row4");
- private static final byte[] value = Bytes.toBytes("value");
- private static final byte[] value2 = Bytes.toBytes("value2");
- private static final byte[] value3 = Bytes.toBytes("value3");
- }
-
- private static final String TEST_ATTRIBUTE = "TEST_ATTRIBUTE";
-
- public static class TestRegionObserver extends BaseRegionObserver {
- @Override
- public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c,
- final Put put, final WALEdit edit,
- final Durability durability) throws IOException {
- if (put.getAttribute(TEST_ATTRIBUTE) == null) {
- throw new DoNotRetryIOException("Put should preserve attributes");
- }
- if (put.getDurability() != Durability.USE_DEFAULT) {
- throw new DoNotRetryIOException("Durability is not propagated correctly");
- }
- }
-
- @Override
- public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
- final Delete delete, final WALEdit edit,
- final Durability durability) throws IOException {
- if (delete.getAttribute(TEST_ATTRIBUTE) == null) {
- throw new DoNotRetryIOException("Delete should preserve attributes");
- }
- if (delete.getDurability() != Durability.USE_DEFAULT) {
- throw new DoNotRetryIOException("Durability is not propagated correctly");
- }
- }
- }
-
- @BeforeClass
- public static void setupBeforeClass() throws Exception {
- testUtil = new HBaseTestingUtility();
- conf = testUtil.getConfiguration();
-
- // Tune down the connection thread pool size
- conf.setInt("hbase.hconnection.threads.core", 5);
- conf.setInt("hbase.hconnection.threads.max", 10);
- // Tunn down handler threads in regionserver
- conf.setInt("hbase.regionserver.handler.count", 10);
-
- // Set to random port
- conf.setInt("hbase.master.port", 0);
- conf.setInt("hbase.master.info.port", 0);
- conf.setInt("hbase.regionserver.port", 0);
- conf.setInt("hbase.regionserver.info.port", 0);
-
- testUtil.startMiniCluster();
- hBaseAdmin = testUtil.getHBaseAdmin();
- txStateStorage = new InMemoryTransactionStateStorage();
- txManager = new TransactionManager(conf, txStateStorage, new TxMetricsCollector());
- txManager.startAndWait();
- }
-
- @AfterClass
- public static void shutdownAfterClass() throws Exception {
- testUtil.shutdownMiniCluster();
- hBaseAdmin.close();
- }
-
- @Before
- public void setupBeforeTest() throws Exception {
- hTable = createTable(TestBytes.table, new byte[][]{TestBytes.family});
- transactionAwareHTable = new TransactionAwareHTable(hTable);
- transactionContext = new TransactionContext(new InMemoryTxSystemClient(txManager), transactionAwareHTable);
- }
-
- @After
- public void shutdownAfterTest() throws IOException {
- hBaseAdmin.disableTable(TestBytes.table);
- hBaseAdmin.deleteTable(TestBytes.table);
- }
-
- private HTable createTable(byte[] tableName, byte[][] columnFamilies) throws Exception {
- return createTable(tableName, columnFamilies, false, Collections.<String>emptyList());
- }
-
- private HTable createTable(byte[] tableName, byte[][] columnFamilies, boolean existingData,
- List<String> coprocessors) throws Exception {
- HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
- for (byte[] family : columnFamilies) {
- HColumnDescriptor columnDesc = new HColumnDescriptor(family);
- columnDesc.setMaxVersions(Integer.MAX_VALUE);
- columnDesc.setValue(TxConstants.PROPERTY_TTL, String.valueOf(100000)); // in millis
- desc.addFamily(columnDesc);
- }
- if (existingData) {
- desc.setValue(TxConstants.READ_NON_TX_DATA, "true");
- }
- // Divide individually to prevent any overflow
- int priority = Coprocessor.PRIORITY_USER;
- desc.addCoprocessor(TransactionProcessor.class.getName(), null, priority, null);
- // order in list is the same order that coprocessors will be invoked
- for (String coprocessor : coprocessors) {
- desc.addCoprocessor(coprocessor, null, ++priority, null);
- }
- hBaseAdmin.createTable(desc);
- testUtil.waitTableAvailable(tableName, 5000);
- return new HTable(testUtil.getConfiguration(), tableName);
- }
-
- /**
- * Test transactional put and get requests.
- *
- * @throws Exception
- */
- @Test
- public void testValidTransactionalPutAndGet() throws Exception {
- transactionContext.start();
- Put put = new Put(TestBytes.row);
- put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
- transactionAwareHTable.put(put);
- transactionContext.finish();
-
- transactionContext.start();
- Result result = transactionAwareHTable.get(new Get(TestBytes.row));
- transactionContext.finish();
-
- byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier);
- assertArrayEquals(TestBytes.value, value);
- }
-
- /**
- * Test aborted put requests, that must be rolled back.
- *
- * @throws Exception
- */
- @Test
- public void testAbortedTransactionPutAndGet() throws Exception {
- transactionContext.start();
- Put put = new Put(TestBytes.row);
- put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
- transactionAwareHTable.put(put);
-
- transactionContext.abort();
-
- transactionContext.start();
- Result result = transactionAwareHTable.get(new Get(TestBytes.row));
- transactionContext.finish();
- byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier);
- assertArrayEquals(value, null);
- }
-
- /**
- * Test transactional delete operations.
- *
- * @throws Exception
- */
- @Test
- public void testValidTransactionalDelete() throws Exception {
- try (HTable hTable = createTable(Bytes.toBytes("TestValidTransactionalDelete"),
- new byte[][]{TestBytes.family, TestBytes.family2})) {
- TransactionAwareHTable txTable = new TransactionAwareHTable(hTable);
- TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
-
- txContext.start();
- Put put = new Put(TestBytes.row);
- put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
- put.add(TestBytes.family2, TestBytes.qualifier, TestBytes.value2);
- txTable.put(put);
- txContext.finish();
-
- txContext.start();
- Result result = txTable.get(new Get(TestBytes.row));
- txContext.finish();
- byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier);
- assertArrayEquals(TestBytes.value, value);
- value = result.getValue(TestBytes.family2, TestBytes.qualifier);
- assertArrayEquals(TestBytes.value2, value);
-
- // test full row delete
- txContext.start();
- Delete delete = new Delete(TestBytes.row);
- txTable.delete(delete);
- txContext.finish();
-
- txContext.start();
- result = txTable.get(new Get(TestBytes.row));
- txContext.finish();
- assertTrue(result.isEmpty());
-
- // test column delete
- // load 10 rows
- txContext.start();
- int rowCount = 10;
- for (int i = 0; i < rowCount; i++) {
- Put p = new Put(Bytes.toBytes("row" + i));
- for (int j = 0; j < 10; j++) {
- p.add(TestBytes.family, Bytes.toBytes(j), TestBytes.value);
- }
- txTable.put(p);
- }
- txContext.finish();
-
- // verify loaded rows
- txContext.start();
- for (int i = 0; i < rowCount; i++) {
- Get g = new Get(Bytes.toBytes("row" + i));
- Result r = txTable.get(g);
- assertFalse(r.isEmpty());
- for (int j = 0; j < 10; j++) {
- assertArrayEquals(TestBytes.value, r.getValue(TestBytes.family, Bytes.toBytes(j)));
- }
- }
- txContext.finish();
-
- // delete odds columns from odd rows and even columns from even rows
- txContext.start();
- for (int i = 0; i < rowCount; i++) {
- Delete d = new Delete(Bytes.toBytes("row" + i));
- for (int j = 0; j < 10; j++) {
- if (i % 2 == j % 2) {
- d.deleteColumns(TestBytes.family, Bytes.toBytes(j));
- }
- }
- txTable.delete(d);
- }
- txContext.finish();
-
- // verify deleted columns
- txContext.start();
- for (int i = 0; i < rowCount; i++) {
- Get g = new Get(Bytes.toBytes("row" + i));
- Result r = txTable.get(g);
- assertEquals(5, r.size());
- for (Map.Entry<byte[], byte[]> entry : r.getFamilyMap(TestBytes.family).entrySet()) {
- int col = Bytes.toInt(entry.getKey());
- // each row should only have the opposite mod (odd=even, even=odd)
- assertNotEquals(i % 2, col % 2);
- assertArrayEquals(TestBytes.value, entry.getValue());
- }
- }
- txContext.finish();
-
- // test family delete
- // load 10 rows
- txContext.start();
- for (int i = 0; i < rowCount; i++) {
- Put p = new Put(Bytes.toBytes("famrow" + i));
- p.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
- p.add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value2);
- txTable.put(p);
- }
- txContext.finish();
-
- // verify all loaded rows
- txContext.start();
- for (int i = 0; i < rowCount; i++) {
- Get g = new Get(Bytes.toBytes("famrow" + i));
- Result r = txTable.get(g);
- assertEquals(2, r.size());
- assertArrayEquals(TestBytes.value, r.getValue(TestBytes.family, TestBytes.qualifier));
- assertArrayEquals(TestBytes.value2, r.getValue(TestBytes.family2, TestBytes.qualifier2));
- }
- txContext.finish();
-
- // delete family1 for even rows, family2 for odd rows
- txContext.start();
- for (int i = 0; i < rowCount; i++) {
- Delete d = new Delete(Bytes.toBytes("famrow" + i));
- d.deleteFamily((i % 2 == 0) ? TestBytes.family : TestBytes.family2);
- txTable.delete(d);
- }
- txContext.finish();
-
- // verify deleted families
- txContext.start();
- for (int i = 0; i < rowCount; i++) {
- Get g = new Get(Bytes.toBytes("famrow" + i));
- Result r = txTable.get(g);
- assertEquals(1, r.size());
- if (i % 2 == 0) {
- assertNull(r.getValue(TestBytes.family, TestBytes.qualifier));
- assertArrayEquals(TestBytes.value2, r.getValue(TestBytes.family2, TestBytes.qualifier2));
- } else {
- assertArrayEquals(TestBytes.value, r.getValue(TestBytes.family, TestBytes.qualifier));
- assertNull(r.getValue(TestBytes.family2, TestBytes.qualifier2));
- }
- }
- txContext.finish();
- }
- }
-
- /**
- * Test that put and delete attributes are preserved
- *
- * @throws Exception
- */
- @Test
- public void testAttributesPreserved() throws Exception {
- HTable hTable = createTable(Bytes.toBytes("TestAttributesPreserved"),
- new byte[][]{TestBytes.family, TestBytes.family2}, false,
- Lists.newArrayList(TestRegionObserver.class.getName()));
- try {
- TransactionAwareHTable txTable = new TransactionAwareHTable(hTable);
- TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
-
- txContext.start();
- Put put = new Put(TestBytes.row);
- put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
- put.add(TestBytes.family2, TestBytes.qualifier, TestBytes.value2);
- // set an attribute on the put, TestRegionObserver will verify it still exists
- put.setAttribute(TEST_ATTRIBUTE, new byte[]{});
- txTable.put(put);
- txContext.finish();
-
- txContext.start();
- Result result = txTable.get(new Get(TestBytes.row));
- txContext.finish();
- byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier);
- assertArrayEquals(TestBytes.value, value);
- value = result.getValue(TestBytes.family2, TestBytes.qualifier);
- assertArrayEquals(TestBytes.value2, value);
-
- // test full row delete, TestRegionObserver will verify it still exists
- txContext.start();
- Delete delete = new Delete(TestBytes.row);
- delete.setAttribute(TEST_ATTRIBUTE, new byte[]{});
- txTable.delete(delete);
- txContext.finish();
-
- txContext.start();
- result = txTable.get(new Get(TestBytes.row));
- txContext.finish();
- assertTrue(result.isEmpty());
- } finally {
- hTable.close();
- }
- }
-
- /**
- * Test aborted transactional delete requests, that must be rolled back.
- *
- * @throws Exception
- */
- @Test
- public void testAbortedTransactionalDelete() throws Exception {
- transactionContext.start();
- Put put = new Put(TestBytes.row);
- put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
- transactionAwareHTable.put(put);
- transactionContext.finish();
-
- transactionContext.start();
- Result result = transactionAwareHTable.get(new Get(TestBytes.row));
- transactionContext.finish();
- byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier);
- assertArrayEquals(TestBytes.value, value);
-
- transactionContext.start();
- Delete delete = new Delete(TestBytes.row);
- transactionAwareHTable.delete(delete);
- transactionContext.abort();
-
- transactionContext.start();
- result = transactionAwareHTable.get(new Get(TestBytes.row));
- transactionContext.finish();
- value = result.getValue(TestBytes.family, TestBytes.qualifier);
- assertArrayEquals(TestBytes.value, value);
- }
-
- private void testDeleteRollback(TxConstants.ConflictDetection conflictDetection) throws Exception {
- String tableName = String.format("%s%s", "TestColFamilyDelete", conflictDetection);
- HTable hTable = createTable(Bytes.toBytes(tableName), new byte[][]{TestBytes.family});
- try (TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, conflictDetection)) {
- TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
- txContext.start();
- txTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
- txContext.finish();
-
- // Start a tx, delete the row and then abort the tx
- txContext.start();
- txTable.delete(new Delete(TestBytes.row));
- txContext.abort();
-
- // Start a tx, delete a column family and then abort the tx
- txContext.start();
- txTable.delete(new Delete(TestBytes.row).deleteFamily(TestBytes.family));
- txContext.abort();
-
- // Above operations should have no effect on the row, since they were aborted
- txContext.start();
- Get get = new Get(TestBytes.row);
- Result result = txTable.get(get);
- assertFalse(result.isEmpty());
- assertArrayEquals(TestBytes.value, result.getValue(TestBytes.family, TestBytes.qualifier));
- txContext.finish();
- }
- }
-
- @Test
- public void testDeleteRollback() throws Exception {
- testDeleteRollback(TxConstants.ConflictDetection.ROW);
- testDeleteRollback(TxConstants.ConflictDetection.COLUMN);
- testDeleteRollback(TxConstants.ConflictDetection.NONE);
- }
-
- @Test
- public void testMultiColumnFamilyRowDeleteRollback() throws Exception {
- HTable hTable = createTable(Bytes.toBytes("TestMultColFam"), new byte[][] {TestBytes.family, TestBytes.family2});
- try (TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, TxConstants.ConflictDetection.ROW)) {
- TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
- txContext.start();
- txTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
- txContext.finish();
-
- txContext.start();
- //noinspection ConstantConditions
- txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT_ALL);
- Result result = txTable.get(new Get(TestBytes.row));
- Assert.assertEquals(1, result.getFamilyMap(TestBytes.family).size());
- Assert.assertEquals(0, result.getFamilyMap(TestBytes.family2).size());
- txContext.finish();
-
- //Start a tx, delete the row and then abort the tx
- txContext.start();
- txTable.delete(new Delete(TestBytes.row));
- txContext.abort();
-
- //Start a tx and scan all the col families to make sure none of them have delete markers
- txContext.start();
- txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT_ALL);
- result = txTable.get(new Get(TestBytes.row));
- Assert.assertEquals(1, result.getFamilyMap(TestBytes.family).size());
- Assert.assertEquals(0, result.getFamilyMap(TestBytes.family2).size());
- txContext.finish();
- }
- }
-
- @Test
- public void testRowDelete() throws Exception {
- HTable hTable = createTable(Bytes.toBytes("TestRowDelete"), new byte[][]{TestBytes.family, TestBytes.family2});
- try (TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, TxConstants.ConflictDetection.ROW)) {
- TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
-
- // Test 1: full row delete
- txContext.start();
- txTable.put(new Put(TestBytes.row)
- .add(TestBytes.family, TestBytes.qualifier, TestBytes.value)
- .add(TestBytes.family, TestBytes.qualifier2, TestBytes.value2)
- .add(TestBytes.family2, TestBytes.qualifier, TestBytes.value)
- .add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value2));
- txContext.finish();
-
- txContext.start();
- Get get = new Get(TestBytes.row);
- Result result = txTable.get(get);
- assertFalse(result.isEmpty());
- assertArrayEquals(TestBytes.value, result.getValue(TestBytes.family, TestBytes.qualifier));
- assertArrayEquals(TestBytes.value2, result.getValue(TestBytes.family, TestBytes.qualifier2));
- assertArrayEquals(TestBytes.value, result.getValue(TestBytes.family2, TestBytes.qualifier));
- assertArrayEquals(TestBytes.value2, result.getValue(TestBytes.family2, TestBytes.qualifier2));
- txContext.finish();
-
- // delete entire row
- txContext.start();
- txTable.delete(new Delete(TestBytes.row));
- txContext.finish();
-
- // verify row is now empty
- txContext.start();
- result = txTable.get(new Get(TestBytes.row));
- assertTrue(result.isEmpty());
-
- // verify row is empty for explicit column retrieval
- result = txTable.get(new Get(TestBytes.row)
- .addColumn(TestBytes.family, TestBytes.qualifier)
- .addFamily(TestBytes.family2));
- assertTrue(result.isEmpty());
-
- // verify row is empty for scan
- ResultScanner scanner = txTable.getScanner(new Scan(TestBytes.row));
- assertNull(scanner.next());
- scanner.close();
-
- // verify row is empty for scan with explicit column
- scanner = txTable.getScanner(new Scan(TestBytes.row).addColumn(TestBytes.family2, TestBytes.qualifier2));
- assertNull(scanner.next());
- scanner.close();
- txContext.finish();
-
- // write swapped values to one column per family
- txContext.start();
- txTable.put(new Put(TestBytes.row)
- .add(TestBytes.family, TestBytes.qualifier, TestBytes.value2)
- .add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value));
- txContext.finish();
-
- // verify new values appear
- txContext.start();
- result = txTable.get(new Get(TestBytes.row));
- assertFalse(result.isEmpty());
- assertEquals(2, result.size());
- assertArrayEquals(TestBytes.value2, result.getValue(TestBytes.family, TestBytes.qualifier));
- assertArrayEquals(TestBytes.value, result.getValue(TestBytes.family2, TestBytes.qualifier2));
-
- scanner = txTable.getScanner(new Scan(TestBytes.row));
- Result result1 = scanner.next();
- assertNotNull(result1);
- assertFalse(result1.isEmpty());
- assertEquals(2, result1.size());
- assertArrayEquals(TestBytes.value2, result.getValue(TestBytes.family, TestBytes.qualifier));
- assertArrayEquals(TestBytes.value, result.getValue(TestBytes.family2, TestBytes.qualifier2));
- scanner.close();
- txContext.finish();
-
- // Test 2: delete of first column family
- txContext.start();
- txTable.put(new Put(TestBytes.row2)
- .add(TestBytes.family, TestBytes.qualifier, TestBytes.value)
- .add(TestBytes.family, TestBytes.qualifier2, TestBytes.value2)
- .add(TestBytes.family2, TestBytes.qualifier, TestBytes.value)
- .add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value2));
- txContext.finish();
-
- txContext.start();
- txTable.delete(new Delete(TestBytes.row2).deleteFamily(TestBytes.family));
- txContext.finish();
-
- txContext.start();
- Result fam1Result = txTable.get(new Get(TestBytes.row2));
- assertFalse(fam1Result.isEmpty());
- assertEquals(2, fam1Result.size());
- assertArrayEquals(TestBytes.value, fam1Result.getValue(TestBytes.family2, TestBytes.qualifier));
- assertArrayEquals(TestBytes.value2, fam1Result.getValue(TestBytes.family2, TestBytes.qualifier2));
- txContext.finish();
-
- // Test 3: delete of second column family
- txContext.start();
- txTable.put(new Put(TestBytes.row3)
- .add(TestBytes.family, TestBytes.qualifier, TestBytes.value)
- .add(TestBytes.family, TestBytes.qualifier2, TestBytes.value2)
- .add(TestBytes.family2, TestBytes.qualifier, TestBytes.value)
- .add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value2));
- txContext.finish();
-
- txContext.start();
- txTable.delete(new Delete(TestBytes.row3).deleteFamily(TestBytes.family2));
- txContext.finish();
-
- txContext.start();
- Result fam2Result = txTable.get(new Get(TestBytes.row3));
- assertFalse(fam2Result.isEmpty());
- assertEquals(2, fam2Result.size());
- assertArrayEquals(TestBytes.value, fam2Result.getValue(TestBytes.family, TestBytes.qualifier));
- assertArrayEquals(TestBytes.value2, fam2Result.getValue(TestBytes.family, TestBytes.qualifier2));
- txContext.finish();
-
- // Test 4: delete specific rows in a range
- txContext.start();
- for (int i = 0; i < 10; i++) {
- txTable.put(new Put(Bytes.toBytes("z" + i))
- .add(TestBytes.family, TestBytes.qualifier, Bytes.toBytes(i))
- .add(TestBytes.family2, TestBytes.qualifier2, Bytes.toBytes(i)));
- }
- txContext.finish();
-
- txContext.start();
- // delete odd rows
- for (int i = 1; i < 10; i += 2) {
- txTable.delete(new Delete(Bytes.toBytes("z" + i)));
- }
- txContext.finish();
-
- txContext.start();
- int cnt = 0;
- ResultScanner zScanner = txTable.getScanner(new Scan(Bytes.toBytes("z0")));
- Result res;
- while ((res = zScanner.next()) != null) {
- assertFalse(res.isEmpty());
- assertArrayEquals(Bytes.toBytes("z" + cnt), res.getRow());
- assertArrayEquals(Bytes.toBytes(cnt), res.getValue(TestBytes.family, TestBytes.qualifier));
- assertArrayEquals(Bytes.toBytes(cnt), res.getValue(TestBytes.family2, TestBytes.qualifier2));
- cnt += 2;
- }
-
- // Test 5: delete prior writes in the same transaction
- txContext.start();
- txTable.put(new Put(TestBytes.row4)
- .add(TestBytes.family, TestBytes.qualifier, TestBytes.value)
- .add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value2));
- txTable.delete(new Delete(TestBytes.row4));
- txContext.finish();
-
- txContext.start();
- Result row4Result = txTable.get(new Get(TestBytes.row4));
- assertTrue(row4Result.isEmpty());
- txContext.finish();
- }
- }
-
- /**
- * Expect an exception since a transaction hasn't been started.
- *
- * @throws Exception
- */
- @Test(expected = IOException.class)
- public void testTransactionlessFailure() throws Exception {
- transactionAwareHTable.get(new Get(TestBytes.row));
- }
-
- /**
- * Tests that each transaction can see its own persisted writes, while not seeing writes from other
- * in-progress transactions.
- */
- @Test
- public void testReadYourWrites() throws Exception {
- // In-progress tx1: started before our main transaction
- HTable hTable1 = new HTable(testUtil.getConfiguration(), TestBytes.table);
- TransactionAwareHTable txHTable1 = new TransactionAwareHTable(hTable1);
- TransactionContext inprogressTxContext1 = new TransactionContext(new InMemoryTxSystemClient(txManager), txHTable1);
-
- // In-progress tx2: started while our main transaction is running
- HTable hTable2 = new HTable(testUtil.getConfiguration(), TestBytes.table);
- TransactionAwareHTable txHTable2 = new TransactionAwareHTable(hTable2);
- TransactionContext inprogressTxContext2 = new TransactionContext(new InMemoryTxSystemClient(txManager), txHTable2);
-
- // create an in-progress write that should be ignored
- byte[] col2 = Bytes.toBytes("col2");
- inprogressTxContext1.start();
- Put putCol2 = new Put(TestBytes.row);
- byte[] valueCol2 = Bytes.toBytes("writing in progress");
- putCol2.add(TestBytes.family, col2, valueCol2);
- txHTable1.put(putCol2);
-
- // start a tx and write a value to test reading in same tx
- transactionContext.start();
- Put put = new Put(TestBytes.row);
- byte[] value = Bytes.toBytes("writing");
- put.add(TestBytes.family, TestBytes.qualifier, value);
- transactionAwareHTable.put(put);
-
- // test that a write from a tx started after the first is not visible
- inprogressTxContext2.start();
- Put put2 = new Put(TestBytes.row);
- byte[] value2 = Bytes.toBytes("writing2");
- put2.add(TestBytes.family, TestBytes.qualifier, value2);
- txHTable2.put(put2);
-
- Get get = new Get(TestBytes.row);
- Result row = transactionAwareHTable.get(get);
- assertFalse(row.isEmpty());
- byte[] col1Value = row.getValue(TestBytes.family, TestBytes.qualifier);
- Assert.assertNotNull(col1Value);
- Assert.assertArrayEquals(value, col1Value);
- // write from in-progress transaction should not be visible
- byte[] col2Value = row.getValue(TestBytes.family, col2);
- assertNull(col2Value);
-
- // commit in-progress transaction, should still not be visible
- inprogressTxContext1.finish();
-
- get = new Get(TestBytes.row);
- row = transactionAwareHTable.get(get);
- assertFalse(row.isEmpty());
- col2Value = row.getValue(TestBytes.family, col2);
- assertNull(col2Value);
-
- transactionContext.finish();
-
- inprogressTxContext2.abort();
- }
-
- @Test
- public void testRowLevelConflictDetection() throws Exception {
- TransactionAwareHTable txTable1 = new TransactionAwareHTable(new HTable(conf, TestBytes.table),
- TxConstants.ConflictDetection.ROW);
- TransactionContext txContext1 = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable1);
-
- TransactionAwareHTable txTable2 = new TransactionAwareHTable(new HTable(conf, TestBytes.table),
- TxConstants.ConflictDetection.ROW);
- TransactionContext txContext2 = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable2);
-
- byte[] row1 = Bytes.toBytes("row1");
- byte[] row2 = Bytes.toBytes("row2");
- byte[] col1 = Bytes.toBytes("c1");
- byte[] col2 = Bytes.toBytes("c2");
- byte[] val1 = Bytes.toBytes("val1");
- byte[] val2 = Bytes.toBytes("val2");
-
- // test that concurrent writing to different rows succeeds
- txContext1.start();
- txTable1.put(new Put(row1).add(TestBytes.family, col1, val1));
-
- txContext2.start();
- txTable2.put(new Put(row2).add(TestBytes.family, col1, val2));
-
- // should be no conflicts
- txContext1.finish();
- txContext2.finish();
-
- transactionContext.start();
- Result res = transactionAwareHTable.get(new Get(row1));
- assertFalse(res.isEmpty());
- Cell cell = res.getColumnLatestCell(TestBytes.family, col1);
- assertNotNull(cell);
- assertArrayEquals(val1, CellUtil.cloneValue(cell));
-
- res = transactionAwareHTable.get(new Get(row2));
- assertFalse(res.isEmpty());
- cell = res.getColumnLatestCell(TestBytes.family, col1);
- assertNotNull(cell);
- assertArrayEquals(val2, CellUtil.cloneValue(cell));
- transactionContext.finish();
-
- // test that writing to different columns in the same row fails
- txContext1.start();
- txTable1.put(new Put(row1).add(TestBytes.family, col1, val2));
-
- txContext2.start();
- txTable2.put(new Put(row1).add(TestBytes.family, col2, val2));
-
- txContext1.finish();
- try {
- txContext2.finish();
- fail("txContext2 should have encountered a row-level conflict during commit");
- } catch (TransactionConflictException tce) {
- txContext2.abort();
- }
-
- transactionContext.start();
- res = transactionAwareHTable.get(new Get(row1));
- assertFalse(res.isEmpty());
- cell = res.getColumnLatestCell(TestBytes.family, col1);
- assertNotNull(cell);
- // should now be val2
- assertArrayEquals(val2, CellUtil.cloneValue(cell));
-
- cell = res.getColumnLatestCell(TestBytes.family, col2);
- // col2 should not be visible due to conflict
- assertNull(cell);
- transactionContext.finish();
-
- // test that writing to the same column in the same row fails
- txContext1.start();
- txTable1.put(new Put(row2).add(TestBytes.family, col2, val1));
-
- txContext2.start();
- txTable2.put(new Put(row2).add(TestBytes.family, col2, val2));
-
- txContext1.finish();
- try {
- txContext2.finish();
- fail("txContext2 should have encountered a row and column level conflict during commit");
- } catch (TransactionConflictException tce) {
- txContext2.abort();
- }
-
- transactionContext.start();
- res = transactionAwareHTable.get(new Get(row2));
- assertFalse(res.isEmpty());
- cell = res.getColumnLatestCell(TestBytes.family, col2);
- assertNotNull(cell);
- // should now be val1
- assertArrayEquals(val1, CellUtil.cloneValue(cell));
- transactionContext.finish();
-
- // verify change set that is being reported only on rows
- txContext1.start();
- txTable1.put(new Put(row1).add(TestBytes.family, col1, val1));
- txTable1.put(new Put(row2).add(TestBytes.family, col2, val2));
-
- Collection<byte[]> changeSet = txTable1.getTxChanges();
- assertNotNull(changeSet);
- assertEquals(2, changeSet.size());
- assertTrue(changeSet.contains(txTable1.getChangeKey(row1, null, null)));
- assertTrue(changeSet.contains(txTable1.getChangeKey(row2, null, null)));
- txContext1.finish();
- }
-
- @Test
- public void testNoneLevelConflictDetection() throws Exception {
- InMemoryTxSystemClient txClient = new InMemoryTxSystemClient(txManager);
- TransactionAwareHTable txTable1 = new TransactionAwareHTable(new HTable(conf, TestBytes.table),
- TxConstants.ConflictDetection.NONE);
- TransactionContext txContext1 = new TransactionContext(txClient, txTable1);
-
- TransactionAwareHTable txTable2 = new TransactionAwareHTable(new HTable(conf, TestBytes.table),
- TxConstants.ConflictDetection.NONE);
- TransactionContext txContext2 = new TransactionContext(txClient, txTable2);
-
- // overlapping writes to the same row + column should not conflict
-
- txContext1.start();
- txTable1.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
-
- // changes should not be visible yet
- txContext2.start();
- Result row = txTable2.get(new Get(TestBytes.row));
- assertTrue(row.isEmpty());
-
- txTable2.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value2));
-
- // both commits should succeed
- txContext1.finish();
- txContext2.finish();
-
- txContext1.start();
- row = txTable1.get(new Get(TestBytes.row));
- assertFalse(row.isEmpty());
- assertArrayEquals(TestBytes.value2, row.getValue(TestBytes.family, TestBytes.qualifier));
- txContext1.finish();
-
- // transaction abort should still rollback changes
-
- txContext1.start();
- txTable1.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
- txContext1.abort();
-
- // changes to row2 should be rolled back
- txContext2.start();
- Result row2 = txTable2.get(new Get(TestBytes.row2));
- assertTrue(row2.isEmpty());
- txContext2.finish();
-
- // transaction invalidate should still make changes invisible
-
- txContext1.start();
- Transaction tx1 = txContext1.getCurrentTransaction();
- txTable1.put(new Put(TestBytes.row3).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
- assertNotNull(tx1);
- txClient.invalidate(tx1.getWritePointer());
-
- // changes to row2 should be rolled back
- txContext2.start();
- Result row3 = txTable2.get(new Get(TestBytes.row3));
- assertTrue(row3.isEmpty());
- txContext2.finish();
- }
-
- @Test
- public void testCheckpoint() throws Exception {
- // start a transaction, using checkpoints between writes
- transactionContext.start();
- transactionAwareHTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
- Transaction origTx = transactionContext.getCurrentTransaction();
- transactionContext.checkpoint();
- Transaction postCheckpointTx = transactionContext.getCurrentTransaction();
-
- assertEquals(origTx.getTransactionId(), postCheckpointTx.getTransactionId());
- assertNotEquals(origTx.getWritePointer(), postCheckpointTx.getWritePointer());
- long[] checkpointPtrs = postCheckpointTx.getCheckpointWritePointers();
- assertEquals(1, checkpointPtrs.length);
- assertEquals(postCheckpointTx.getWritePointer(), checkpointPtrs[0]);
-
- transactionAwareHTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value2));
- transactionContext.checkpoint();
- Transaction postCheckpointTx2 = transactionContext.getCurrentTransaction();
-
- assertEquals(origTx.getTransactionId(), postCheckpointTx2.getTransactionId());
- assertNotEquals(postCheckpointTx.getWritePointer(), postCheckpointTx2.getWritePointer());
- long[] checkpointPtrs2 = postCheckpointTx2.getCheckpointWritePointers();
- assertEquals(2, checkpointPtrs2.length);
- assertEquals(postCheckpointTx.getWritePointer(), checkpointPtrs2[0]);
- assertEquals(postCheckpointTx2.getWritePointer(), checkpointPtrs2[1]);
-
- transactionAwareHTable.put(new Put(TestBytes.row3).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
-
- // by default, all rows should be visible with Read-Your-Writes
- verifyRow(transactionAwareHTable, TestBytes.row, TestBytes.value);
- verifyRow(transactionAwareHTable, TestBytes.row2, TestBytes.value2);
- verifyRow(transactionAwareHTable, TestBytes.row3, TestBytes.value);
-
- // when disabling current write pointer, only the previous checkpoints should be visible
- transactionContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
- Get get = new Get(TestBytes.row);
- verifyRow(transactionAwareHTable, get, TestBytes.value);
- get = new Get(TestBytes.row2);
- verifyRow(transactionAwareHTable, get, TestBytes.value2);
- get = new Get(TestBytes.row3);
- verifyRow(transactionAwareHTable, get, null);
-
- // test scan results excluding current write pointer
- Scan scan = new Scan();
- ResultScanner scanner = transactionAwareHTable.getScanner(scan);
-
- Result row = scanner.next();
- assertNotNull(row);
- assertArrayEquals(TestBytes.row, row.getRow());
- assertEquals(1, row.size());
- assertArrayEquals(TestBytes.value, row.getValue(TestBytes.family, TestBytes.qualifier));
-
- row = scanner.next();
- assertNotNull(row);
- assertArrayEquals(TestBytes.row2, row.getRow());
- assertEquals(1, row.size());
- assertArrayEquals(TestBytes.value2, row.getValue(TestBytes.family, TestBytes.qualifier));
-
- row = scanner.next();
- assertNull(row);
- scanner.close();
- transactionContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT);
-
- // commit transaction, verify writes are visible
- transactionContext.finish();
-
- transactionContext.start();
- verifyRow(transactionAwareHTable, TestBytes.row, TestBytes.value);
- verifyRow(transactionAwareHTable, TestBytes.row2, TestBytes.value2);
- verifyRow(transactionAwareHTable, TestBytes.row3, TestBytes.value);
- transactionContext.finish();
- }
-
- @Test
- public void testInProgressCheckpoint() throws Exception {
- // start a transaction, using checkpoints between writes
- transactionContext.start();
- transactionAwareHTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
- transactionContext.checkpoint();
- transactionAwareHTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value2));
-
- // check that writes are still not visible to other clients
- TransactionAwareHTable txTable2 = new TransactionAwareHTable(new HTable(conf, TestBytes.table));
- TransactionContext txContext2 = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable2);
-
- txContext2.start();
- verifyRow(txTable2, TestBytes.row, null);
- verifyRow(txTable2, TestBytes.row2, null);
- txContext2.finish();
- txTable2.close();
-
- transactionContext.finish();
-
- // verify writes are visible after commit
- transactionContext.start();
- verifyRow(transactionAwareHTable, TestBytes.row, TestBytes.value);
- verifyRow(transactionAwareHTable, TestBytes.row2, TestBytes.value2);
- transactionContext.finish();
- }
-
- @Test
- public void testCheckpointRollback() throws Exception {
- // start a transaction, using checkpoints between writes
- transactionContext.start();
- transactionAwareHTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
- transactionContext.checkpoint();
- transactionAwareHTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value2));
- transactionContext.checkpoint();
- transactionAwareHTable.put(new Put(TestBytes.row3).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
-
- transactionContext.abort();
-
- transactionContext.start();
- verifyRow(transactionAwareHTable, TestBytes.row, null);
- verifyRow(transactionAwareHTable, TestBytes.row2, null);
- verifyRow(transactionAwareHTable, TestBytes.row3, null);
-
- Scan scan = new Scan();
- ResultScanner scanner = transactionAwareHTable.getScanner(scan);
- assertNull(scanner.next());
- scanner.close();
- transactionContext.finish();
- }
-
- @Test
- public void testCheckpointInvalidate() throws Exception {
- // start a transaction, using checkpoints between writes
- transactionContext.start();
- Transaction origTx = transactionContext.getCurrentTransaction();
- transactionAwareHTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
- transactionContext.checkpoint();
- Transaction checkpointTx1 = transactionContext.getCurrentTransaction();
- transactionAwareHTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value2));
- transactionContext.checkpoint();
- Transaction checkpointTx2 = transactionContext.getCurrentTransaction();
- transactionAwareHTable.put(new Put(TestBytes.row3).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
-
- TransactionSystemClient txClient = new InMemoryTxSystemClient(txManager);
- txClient.invalidate(transactionContext.getCurrentTransaction().getTransactionId());
-
- // check that writes are not visible
- TransactionAwareHTable txTable2 = new TransactionAwareHTable(new HTable(conf, TestBytes.table));
- TransactionContext txContext2 = new TransactionContext(txClient, txTable2);
- txContext2.start();
- Transaction newTx = txContext2.getCurrentTransaction();
-
- // all 3 writes pointers from the previous transaction should now be excluded
- assertTrue(newTx.isExcluded(origTx.getWritePointer()));
- assertTrue(newTx.isExcluded(checkpointTx1.getWritePointer()));
- assertTrue(newTx.isExcluded(checkpointTx2.getWritePointer()));
-
- verifyRow(txTable2, TestBytes.row, null);
- verifyRow(txTable2, TestBytes.row2, null);
- verifyRow(txTable2, TestBytes.row3, null);
-
- Scan scan = new Scan();
- ResultScanner scanner = txTable2.getScanner(scan);
- assertNull(scanner.next());
- scanner.close();
- txContext2.finish();
- }
-
- @Test
- public void testExistingData() throws Exception {
- byte[] val11 = Bytes.toBytes("val11");
- byte[] val12 = Bytes.toBytes("val12");
- byte[] val21 = Bytes.toBytes("val21");
- byte[] val22 = Bytes.toBytes("val22");
- byte[] val31 = Bytes.toBytes("val31");
- byte[] val111 = Bytes.toBytes("val111");
-
- TransactionAwareHTable txTable =
- new TransactionAwareHTable(createTable(Bytes.toBytes("testExistingData"), new byte[][]{TestBytes.family}, true,
- Collections.<String>emptyList()));
- TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
-
- // Add some pre-existing, non-transactional data
- HTable nonTxTable = new HTable(testUtil.getConfiguration(), txTable.getTableName());
- nonTxTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, val11));
- nonTxTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier2, val12));
- nonTxTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, val21));
- nonTxTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier2, val22));
- nonTxTable.put(new Put(TestBytes.row4).add(TestBytes.family, TxConstants.FAMILY_DELETE_QUALIFIER,
- HConstants.EMPTY_BYTE_ARRAY));
- nonTxTable.put(new Put(TestBytes.row4).add(TestBytes.family, TestBytes.qualifier, HConstants.EMPTY_BYTE_ARRAY));
- nonTxTable.flushCommits();
-
- // Add transactional data
- txContext.start();
- txTable.put(new Put(TestBytes.row3).add(TestBytes.family, TestBytes.qualifier, val31));
- txContext.finish();
-
- txContext.start();
- // test get
- verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier), val11);
- verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier2), val12);
- verifyRow(txTable, new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier), val21);
- verifyRow(txTable, new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier2), val22);
- verifyRow(txTable, new Get(TestBytes.row3).addColumn(TestBytes.family, TestBytes.qualifier), val31);
- verifyRow(txTable, new Get(TestBytes.row4).addColumn(TestBytes.family, TxConstants.FAMILY_DELETE_QUALIFIER),
- HConstants.EMPTY_BYTE_ARRAY);
- verifyRow(txTable, new Get(TestBytes.row4).addColumn(TestBytes.family, TestBytes.qualifier),
- HConstants.EMPTY_BYTE_ARRAY);
-
- // test scan
- try (ResultScanner scanner = txTable.getScanner(new Scan())) {
- Result result = scanner.next();
- assertNotNull(result);
- assertArrayEquals(TestBytes.row, result.getRow());
- assertArrayEquals(val11, result.getValue(TestBytes.family, TestBytes.qualifier));
- assertArrayEquals(val12, result.getValue(TestBytes.family, TestBytes.qualifier2));
- result = scanner.next();
- assertNotNull(result);
- assertArrayEquals(TestBytes.row2, result.getRow());
- assertArrayEquals(val21, result.getValue(TestBytes.family, TestBytes.qualifier));
- assertArrayEquals(val22, result.getValue(TestBytes.family, TestBytes.qualifier2));
- result = scanner.next();
- assertNotNull(result);
- assertArrayEquals(TestBytes.row3, result.getRow());
- assertArrayEquals(val31, result.getValue(TestBytes.family, TestBytes.qualifier));
- result = scanner.next();
- assertNotNull(result);
- assertArrayEquals(TestBytes.row4, result.getRow());
- assertArrayEquals(HConstants.EMPTY_BYTE_ARRAY, result.getValue(TestBytes.family,
- TxConstants.FAMILY_DELETE_QUALIFIER));
- assertArrayEquals(HConstants.EMPTY_BYTE_ARRAY, result.getValue(TestBytes.family, TestBytes.qualifier));
- assertNull(scanner.next());
- }
- txContext.finish();
-
- // test update and delete
- txContext.start();
- txTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, val111));
- txTable.delete(new Delete(TestBytes.row2).deleteColumns(TestBytes.family, TestBytes.qualifier));
- txContext.finish();
-
- txContext.start();
- verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier), val111);
- verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier2), val12);
- verifyRow(txTable, new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier), null);
- verifyRow(txTable, new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier2), val22);
- verifyRow(txTable, new Get(TestBytes.row3).addColumn(TestBytes.family, TestBytes.qualifier), val31);
- verifyRow(txTable, new Get(TestBytes.row4).addColumn(TestBytes.family, TxConstants.FAMILY_DELETE_QUALIFIER),
- HConstants.EMPTY_BYTE_ARRAY);
- verifyRow(txTable, new Get(TestBytes.row4).addColumn(TestBytes.family, TestBytes.qualifier),
- HConstants.EMPTY_BYTE_ARRAY);
- txContext.finish();
-
- // test scan
- txContext.start();
- try (ResultScanner scanner = txTable.getScanner(new Scan())) {
- Result result = scanner.next();
- assertNotNull(result);
- assertArrayEquals(TestBytes.row, result.getRow());
- assertArrayEquals(val111, result.getValue(TestBytes.family, TestBytes.qualifier));
- assertArrayEquals(val12, result.getValue(TestBytes.family, TestBytes.qualifier2));
- result = scanner.next();
- assertNotNull(result);
- assertArrayEquals(TestBytes.row2, result.getRow());
- assertArrayEquals(null, result.getValue(TestBytes.family, TestBytes.qualifier));
- assertArrayEquals(val22, result.getValue(TestBytes.family, TestBytes.qualifier2));
- result = scanner.next();
- assertNotNull(result);
- assertArrayEquals(TestBytes.row3, result.getRow());
- assertArrayEquals(val31, result.getValue(TestBytes.family, TestBytes.qualifier));
- result = scanner.next();
- assertNotNull(result);
- assertArrayEquals(TestBytes.row4, result.getRow());
- assertArrayEquals(HConstants.EMPTY_BYTE_ARRAY, result.getValue(TestBytes.family,
- TxConstants.FAMILY_DELETE_QUALIFIER));
- assertArrayEquals(HConstants.EMPTY_BYTE_ARRAY, result.getValue(TestBytes.family, TestBytes.qualifier));
- assertNull(scanner.next());
- }
- txContext.finish();
- }
-
- private void verifyRow(HTableInterface table, byte[] rowkey, byte[] expectedValue) throws Exception {
- verifyRow(table, new Get(rowkey), expectedValue);
- }
-
- private void verifyRow(HTableInterface table, Get get, byte[] expectedValue) throws Exception {
- verifyRows(table, get, expectedValue == null ? null : ImmutableList.of(expectedValue));
- }
-
- private void verifyRows(HTableInterface table, Get get, List<byte[]> expectedValues) throws Exception {
- Result result = table.get(get);
- if (expectedValues == null) {
- assertTrue(result.isEmpty());
- } else {
- assertFalse(result.isEmpty());
- byte[] family = TestBytes.family;
- byte[] col = TestBytes.qualifier;
- if (get.hasFamilies()) {
- family = get.getFamilyMap().keySet().iterator().next();
- col = get.getFamilyMap().get(family).first();
- }
- Iterator<Cell> it = result.getColumnCells(family, col).iterator();
- for (byte[] expectedValue : expectedValues) {
- Assert.assertTrue(it.hasNext());
- assertArrayEquals(expectedValue, CellUtil.cloneValue(it.next()));
- }
- }
- }
-
- private Cell[] getRow(HTableInterface table, Get get) throws Exception {
- Result result = table.get(get);
- return result.rawCells();
- }
-
- private void verifyScan(HTableInterface table, Scan scan, List<KeyValue> expectedCells) throws Exception {
- List<Cell> actualCells = new ArrayList<>();
- try (ResultScanner scanner = table.getScanner(scan)) {
- Result[] results = scanner.next(expectedCells.size() + 1);
- for (Result result : results) {
- actualCells.addAll(Lists.newArrayList(result.rawCells()));
- }
- Assert.assertEquals(expectedCells, actualCells);
- }
- }
-
- @Test
- public void testVisibilityAll() throws Exception {
- HTable nonTxTable = createTable(Bytes.toBytes("testVisibilityAll"),
- new byte[][]{TestBytes.family, TestBytes.family2}, true, Collections.<String>emptyList());
- TransactionAwareHTable txTable =
- new TransactionAwareHTable(nonTxTable,
- TxConstants.ConflictDetection.ROW); // ROW conflict detection to verify family deletes
- TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
-
- // start a transaction and create a delete marker
- txContext.start();
- //noinspection ConstantConditions
- long txWp0 = txContext.getCurrentTransaction().getWritePointer();
- txTable.delete(new Delete(TestBytes.row).deleteColumn(TestBytes.family, TestBytes.qualifier2));
- txContext.finish();
-
- // start a new transaction and write some values
- txContext.start();
- @SuppressWarnings("ConstantConditions")
- long txWp1 = txContext.getCurrentTransaction().getWritePointer();
- txTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
- txTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier2, TestBytes.value2));
- txTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
- txTable.put(new Put(TestBytes.row).add(TestBytes.family2, TestBytes.qualifier, TestBytes.value));
- txTable.put(new Put(TestBytes.row).add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value2));
-
- // verify written data
- verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier),
- TestBytes.value);
- verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier2),
- TestBytes.value2);
- verifyRow(txTable, new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier),
- TestBytes.value);
- verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family2, TestBytes.qualifier),
- TestBytes.value);
- verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family2, TestBytes.qualifier2),
- TestBytes.value2);
-
- // checkpoint and make changes to written data now
- txContext.checkpoint();
- long txWp2 = txContext.getCurrentTransaction().getWritePointer();
- // delete a column
- txTable.delete(new Delete(TestBytes.row).deleteColumn(TestBytes.family, TestBytes.qualifier));
- // no change to a column
- txTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier2, TestBytes.value2));
- // update a column
- txTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value3));
- // delete column family
- txTable.delete(new Delete(TestBytes.row).deleteFamily(TestBytes.family2));
-
- // verify changed values
- verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier),
- null);
- verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier2),
- TestBytes.value2);
- verifyRow(txTable, new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier),
- TestBytes.value3);
- verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family2, TestBytes.qualifier),
- null);
- verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family2, TestBytes.qualifier2),
- null);
-
- // run a scan with VisibilityLevel.ALL, this should return all raw changes by this transaction,
- // and the raw change by prior transaction
- //noinspection ConstantConditions
- txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT_ALL);
- List<KeyValue> expected = ImmutableList.of(
- new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier, txWp2, KeyValue.Type.DeleteColumn),
- new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier, txWp1, TestBytes.value),
- new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier2, txWp2, TestBytes.value2),
- new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier2, txWp1, TestBytes.value2),
- new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier2, txWp0, KeyValue.Type.DeleteColumn),
- new KeyValue(TestBytes.row, TestBytes.family2, null, txWp2, KeyValue.Type.DeleteFamily),
- new KeyValue(TestBytes.row, TestBytes.family2, TestBytes.qualifier, txWp1, TestBytes.value),
- new KeyValue(TestBytes.row, TestBytes.family2, TestBytes.qualifier2, txWp1, TestBytes.value2),
- new KeyValue(TestBytes.row2, TestBytes.family, TestBytes.qualifier, txWp2, TestBytes.value3),
- new KeyValue(TestBytes.row2, TestBytes.family, TestBytes.qualifier, txWp1, TestBytes.value)
- );
- verifyScan(txTable, new Scan(), expected);
-
- // verify a Get is also able to return all snapshot versions
- Get get = new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier);
- Cell[] cells = getRow(txTable, get);
- Assert.assertEquals(2, cells.length);
- Assert.assertTrue(CellUtil.isDelete(cells[0]));
- Assert.assertArrayEquals(TestBytes.value, CellUtil.cloneValue(cells[1]));
-
- get = new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier2);
- cells = getRow(txTable, get);
- Assert.assertEquals(3, cells.length);
- Assert.assertArrayEquals(TestBytes.value2, CellUtil.cloneValue(cells[0]));
- Assert.assertArrayEquals(TestBytes.value2, CellUtil.cloneValue(cells[1]));
- Assert.assertTrue(CellUtil.isDeleteColumns(cells[2]));
-
- verifyRows(txTable, new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier),
- ImmutableList.of(TestBytes.value3, TestBytes.value));
-
- get = new Get(TestBytes.row).addColumn(TestBytes.family2, TestBytes.qualifier);
- cells = getRow(txTable, get);
- Assert.assertEquals(2, cells.length);
- Assert.assertTrue(CellUtil.isDelete(cells[0]));
- Assert.assertArrayEquals(TestBytes.value, CellUtil.cloneValue(cells[1]));
-
- get = new Get(TestBytes.row).addColumn(TestBytes.family2, TestBytes.qualifier2);
- cells = getRow(txTable, get);
- Assert.assertEquals(2, cells.length);
- Assert.assertTrue(CellUtil.isDelete(cells[0]));
- Assert.assertArrayEquals(TestBytes.value2, CellUtil.cloneValue(cells[1]));
-
- // Verify VisibilityLevel.SNAPSHOT
- txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT);
- expected = ImmutableList.of(
- new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier2, txWp2, TestBytes.value2),
- new KeyValue(TestBytes.row2, TestBytes.family, TestBytes.qualifier, txWp2, TestBytes.value3)
- );
- verifyScan(txTable, new Scan(), expected);
-
- // Verify VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT
- txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
- expected = ImmutableList.of(
- new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier, txWp1, TestBytes.value),
- new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier2, txWp1, TestBytes.value2),
- new KeyValue(TestBytes.row, TestBytes.family2, TestBytes.qualifier, txWp1, TestBytes.value),
- new KeyValue(TestBytes.row, TestBytes.family2, TestBytes.qualifier2, txWp1, TestBytes.value2),
- new KeyValue(TestBytes.row2, TestBytes.family, TestBytes.qualifier, txWp1, TestBytes.value)
- );
- verifyScan(txTable, new Scan(), expected);
- txContext.finish();
-
- // finally verify values once more after commit, this time we should get only committed raw values for
- // all visibility levels
- txContext.start();
- txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT_ALL);
- expected = ImmutableList.of(
- new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier, txWp2, KeyValue.Type.DeleteColumn),
- new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier2, txWp2, TestBytes.value2),
- new KeyValue(TestBytes.row, TestBytes.family2, null, txWp2, KeyValue.Type.DeleteFamily),
- new KeyValue(TestBytes.row2, TestBytes.family, TestBytes.qualifier, txWp2, TestBytes.value3)
- );
- verifyScan(txTable, new Scan(), expected);
-
- txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT);
- expected = ImmutableList.of(
- new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier2, txWp2, TestBytes.value2),
- new KeyValue(TestBytes.row2, TestBytes.family, TestBytes.qualifier, txWp2, TestBytes.value3)
- );
- verifyScan(txTable, new Scan(), expected);
-
- txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
- expected = ImmutableList.of(
- new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier2, txWp2, TestBytes.value2),
- new KeyValue(TestBytes.row2, TestBytes.family, TestBytes.qualifier, txWp2, TestBytes.value3)
- );
- verifyScan(txTable, new Scan(), expected);
-
- verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier),
- null);
- verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier2),
- TestBytes.value2);
- verifyRow(txTable, new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier),
- TestBytes.value3);
- txContext.finish();
-
- // Test with regular HBase deletes in pre-existing data
- long now = System.currentTimeMillis();
- Delete deleteColumn = new Delete(TestBytes.row3).deleteColumn(TestBytes.family, TestBytes.qualifier, now - 1);
- // to prevent Tephra from replacing delete with delete marker
- deleteColumn.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
- nonTxTable.delete(deleteColumn);
- Delete deleteFamily = new Delete(TestBytes.row3).deleteFamily(TestBytes.family2, now);
- // to prevent Tephra from replacing delete with delete marker
- deleteFamily.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
- nonTxTable.delete(deleteFamily);
- nonTxTable.flushCommits();
-
- txContext.start();
- txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT_ALL);
- expected = ImmutableList.of(
- new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier, txWp2, KeyValue.Type.DeleteColumn),
- new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier2, txWp2, TestBytes.value2),
- new KeyValue(TestBytes.row, TestBytes.family2, null, txWp2, KeyValue.Type.DeleteFamily),
- new KeyValue(TestBytes.row2, TestBytes.family, TestBytes.qualifier, txWp2, TestBytes.value3),
- new KeyValue(TestBytes.row3, TestBytes.family, TestBytes.qualifier, now - 1, KeyValue.Type.Delete),
- new KeyValue(TestBytes.row3, TestBytes.family2, null, now, KeyValue.Type.DeleteFamily)
- );
- // test scan
- Scan scan = new Scan();
- scan.setRaw(true);
- verifyScan(txTable, scan, expected);
- txContext.finish();
- }
-
- @Test
- public void testFilters() throws Exception {
- // Add some values to table
- transactionContext.start();
- Put put = new Put(TestBytes.row);
- byte[] val1 = Bytes.toBytes(1L);
- put.add(TestBytes.family, TestBytes.qualifier, val1);
- transactionAwareHTable.put(put);
- put = new Put(TestBytes.row2);
- byte[] val2 = Bytes.toBytes(2L);
- put.add(TestBytes.family, TestBytes.qualifier, val2);
- transactionAwareHTable.put(put);
- put = new Put(TestBytes.row3);
- byte[] val3 = Bytes.toBytes(3L);
- put.add(TestBytes.family, TestBytes.qualifier, val3);
- transactionAwareHTable.put(put);
- put = new Put(TestBytes.row4);
- byte[] val4 = Bytes.toBytes(4L);
- put.add(TestBytes.family, TestBytes.qualifier, val4);
- transactionAwareHTable.put(put);
- transactionContext.finish();
-
- // Delete cell with value 2
- transactionContext.start();
- Delete delete = new Delete(TestBytes.row2);
- delete.addColumn(TestBytes.family, TestBytes.qualifier);
- transactionAwareHTable.delete(delete);
- transactionContext.finish();
-
- // Scan for values less than 4, should get only values 1 and 3
- transactionContext.start();
- Scan scan = new Scan(TestBytes.row, new ValueFilter(CompareFilter.CompareOp.LESS, new LongComparator(4)));
- try (ResultScanner scanner = transactionAwareHTable.getScanner(scan)) {
- Result result = scanner.next();
- assertNotNull(result);
- assertArrayEquals(TestBytes.row, result.getRow());
- assertArrayEquals(val1, result.getValue(TestBytes.family, TestBytes.qualifier));
- result = scanner.next();
- assertNotNull(result);
- assertArrayEquals(TestBytes.row3, result.getRow());
- assertArrayEquals(val3, result.getValue(TestBytes.family, TestBytes.qualifier));
- result = scanner.next();
- assertNull(result);
- }
- transactionContext.finish();
-
- // Run a Get with a filter for less than 10 on row4, should get value 4
- transactionContext.start();
- Get get = new Get(TestBytes.row4);
- get.setFilter(new ValueFilter(CompareFilter.CompareOp.LESS, new LongComparator(10)));
- Result result = transactionAwareHTable.get(get);
- assertFalse(result.isEmpty());
- assertArrayEquals(val4, result.getValue(TestBytes.family, TestBytes.qualifier));
- transactionContext.finish();
-
- // Change value of row4 to 40
- transactionContext.start();
- put = new Put(TestBytes.row4);
- byte[] val40 = Bytes.toBytes(40L);
- put.add(TestBytes.family, TestBytes.qualifier, val40);
- transactionAwareHTable.put(put);
- transactionContext.finish();
-
- // Scan for values less than 10, should get only values 1 and 3
- transactionContext.start();
- scan = new Scan(TestBytes.row, new ValueFilter(CompareFilter.CompareOp.LESS, new LongComparator(10)));
- try (ResultScanner scanner = transactionAwareHTable.getScanner(scan)) {
- result = scanner.next();
- assertNotNull(result);
- assertArrayEquals(TestBytes.row, result.getRow());
- assertArrayEquals(val1, result.getValue(TestBytes.family, TestBytes.qualifier));
- result = scanner.next();
- assertNotNull(result);
- assertArrayEquals(TestBytes.row3, result.getRow());
- assertArrayEquals(val3, result.getValue(TestBytes.family, TestBytes.qualifier));
- result = scanner.next();
- assertNull(result);
- }
- transactionContext.finish();
-
- // Run the Get again with a filter for less than 10 on row4, this time should not get any results
- transactionContext.start();
- result = transactionAwareHTable.get(get);
- assertTrue(result.isEmpty());
- transactionContext.finish();
- }
-
- /**
- * Tests that transaction co-processor works with older clients
- *
- * @throws Exception
- */
- @Test
- public void testOlderClientOperations() throws Exception {
- // Use old HTable to test
- TransactionAwareHTable oldTxAware = new OldTransactionAwareHTable(hTable);
- transactionContext.addTransactionAware(oldTxAware);
-
- transactionContext.start();
- Put put = new Put(TestBytes.row);
- put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
- oldTxAware.put(put);
- transactionContext.finish();
-
- transactionContext.start();
- long txId = transactionContext.getCurrentTransaction().getTransactionId();
- put = new Put(TestBytes.row);
- put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value2);
- oldTxAware.put(put);
- // Invalidate the second Put
- TransactionSystemClient txClient = new InMemoryTxSystemClient(txManager);
- txClient.invalidate(txId);
-
- transactionContext.start();
- put = new Put(TestBytes.row);
- put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value3);
- oldTxAware.put(put);
- // Abort the third Put
- transactionContext.abort();
-
- // Get should now return the first value
- transactionContext.start();
- Result result = oldTxAware.get(new Get(TestBytes.row));
- transactionContext.finish();
-
- byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier);
- assertArrayEquals(TestBytes.value, value);
- }
-
- /**
- * Represents older transaction clients
- */
- private static class OldTransactionAwareHTable extends TransactionAwareHTable {
- public OldTransactionAwareHTable(HTableInterface hTable) {
- super(hTable);
- }
-
- @Override
- public void addToOperation(OperationWithAttributes op, Transaction tx) throws IOException {
- op.setAttribute(TxConstants.OLD_TX_OPERATION_ATTRIBUTE_KEY, txCodec.encode(tx));
- }
-
- @Override
- protected void makeRollbackOperation(Delete delete) {
- delete.setAttribute(TxConstants.OLD_TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2246abff/tephra-hbase-compat-1.1/src/test/java/org/apache/tephra/hbase/coprocessor/CellSkipFilterTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1/src/test/java/org/apache/tephra/hbase/coprocessor/CellSkipFilterTest.java b/tephra-hbase-compat-1.1/src/test/java/org/apache/tephra/hbase/coprocessor/CellSkipFilterTest.java
deleted file mode 100644
index 428d3b0..0000000
--- a/tephra-hbase-compat-1.1/src/test/java/org/apache/tephra/hbase/coprocessor/CellSkipFilterTest.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tephra.hbase.coprocessor;
-
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.FilterBase;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * HBase 1.1 specific test for testing {@link CellSkipFilter}.
- */
-public class CellSkipFilterTest {
-
- private static final String ROW1KEY = "row1";
- private static final String ROW2KEY = "row2";
- private static final String FAM1KEY = "fam1";
- private static final String COL1KEY = "col1";
- private static final String FAM2KEY = "fam2";
- private static final String COL2KEY = "col2";
- private static final String VALUE = "value";
-
- @Test
- public void testSkipFiltering() throws Exception {
- long timestamp = System.currentTimeMillis();
- // Test to check that we get NEXT_COL once the INCLUDE_AND_NEXT_COL is returned for the same key
- Filter filter = new CellSkipFilter(new MyFilter(0));
- assertEquals(Filter.ReturnCode.INCLUDE, filter.filterKeyValue(newKeyValue(ROW1KEY, FAM1KEY, COL1KEY, VALUE,
- timestamp)));
- assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, filter.filterKeyValue(newKeyValue(ROW1KEY, FAM1KEY, COL1KEY,
- VALUE, timestamp - 1)));
-
- // Next call should get NEXT_COL instead of SKIP, as it would be returned by CellSkipFilter
- assertEquals(Filter.ReturnCode.NEXT_COL, filter.filterKeyValue(newKeyValue(ROW1KEY, FAM1KEY, COL1KEY, VALUE,
- timestamp - 2)));
-
- // Next call with the same key should return the NEXT_COL again, as it would be returned by CellSkipFilter
- assertEquals(Filter.ReturnCode.NEXT_COL, filter.filterKeyValue(newKeyValue(ROW1KEY, FAM1KEY, COL1KEY, VALUE,
- timestamp - 3)));
-
- // Since MyFilter counter is not incremented in the previous call, filtering for the different keyvalue should
- // give SKIP from MyFilter
- assertEquals(Filter.ReturnCode.SKIP, filter.filterKeyValue(newKeyValue(ROW1KEY, FAM2KEY, COL1KEY, VALUE,
- timestamp - 4)));
-
- // Test to check that we get NEXT_COL once the NEXT_COL is returned for the same key
- filter = new CellSkipFilter(new MyFilter(2));
- assertEquals(Filter.ReturnCode.SKIP, filter.filterKeyValue(newKeyValue(ROW1KEY, FAM1KEY, COL1KEY, VALUE,
- timestamp)));
- assertEquals(Filter.ReturnCode.NEXT_COL, filter.filterKeyValue(newKeyValue(ROW1KEY, FAM1KEY, COL1KEY, VALUE,
- timestamp - 1)));
-
- // Next call should get NEXT_COL instead of NEXT_ROW, as it would be returned by CellSkipFilter
- assertEquals(Filter.ReturnCode.NEXT_COL, filter.filterKeyValue(newKeyValue(ROW1KEY, FAM1KEY, COL1KEY, VALUE,
- timestamp - 2)));
-
- // Next call with the same key should return the NEXT_COL again, as it would be returned by CellSkipFilter
- assertEquals(Filter.ReturnCode.NEXT_COL, filter.filterKeyValue(newKeyValue(ROW1KEY, FAM1KEY, COL1KEY, VALUE,
- timestamp - 3)));
-
- // Since MyFilter counter is not incremented in the previous call, filtering for the different keyvalue should
- // give NEXT_ROW from MyFilter
- assertEquals(Filter.ReturnCode.NEXT_ROW, filter.filterKeyValue(newKeyValue(ROW1KEY, FAM1KEY, COL2KEY, VALUE,
- timestamp - 4)));
-
- // Next call with the new key should returned the SEEK_NEXT_USING_HINT
- assertEquals(Filter.ReturnCode.SEEK_NEXT_USING_HINT, filter.filterKeyValue(newKeyValue(ROW2KEY, FAM1KEY, COL1KEY,
- VALUE, timestamp - 5)));
- }
-
- private KeyValue newKeyValue(String rowkey, String family, String column, String value, long timestamp) {
- return new KeyValue(Bytes.toBytes(rowkey), Bytes.toBytes(family), Bytes.toBytes(column),
- timestamp, Bytes.toBytes(value));
- }
-
- /**
- * Sample filter for testing. This filter maintains the {@link List} of {@link ReturnCode}s. It accepts the
- * start index in the list and start serving the return codes corresponding that that index. Every time the
- * return code is served, index is incremented.
- */
- class MyFilter extends FilterBase {
-
- private final List<ReturnCode> returnCodes;
- private int counter;
-
- public MyFilter(int startIndex) {
- returnCodes = Arrays.asList(ReturnCode.INCLUDE, ReturnCode.INCLUDE_AND_NEXT_COL, ReturnCode.SKIP,
- ReturnCode.NEXT_COL, ReturnCode.NEXT_ROW, ReturnCode.SEEK_NEXT_USING_HINT);
- counter = startIndex;
- }
-
- @Override
- public ReturnCode filterKeyValue(Cell cell) throws IOException {
- ReturnCode code = returnCodes.get(counter % returnCodes.size());
- counter++;
- return code;
- }
- }
-}