You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2016/05/06 23:02:13 UTC
[07/51] [partial] incubator-tephra git commit: Rename package to
org.apache.tephra
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase10cdh/coprocessor/TransactionProcessorTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase10cdh/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase10cdh/coprocessor/TransactionProcessorTest.java
new file mode 100644
index 0000000..c4efbae
--- /dev/null
+++ b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase10cdh/coprocessor/TransactionProcessorTest.java
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase10cdh.coprocessor;
+
+import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MockRegionServerServices;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.tephra.ChangeId;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TransactionType;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.coprocessor.TransactionStateCache;
+import org.apache.tephra.coprocessor.TransactionStateCacheSupplier;
+import org.apache.tephra.metrics.TxMetricsCollector;
+import org.apache.tephra.persist.HDFSTransactionStateStorage;
+import org.apache.tephra.persist.TransactionSnapshot;
+import org.apache.tephra.persist.TransactionVisibilityState;
+import org.apache.tephra.snapshot.DefaultSnapshotCodec;
+import org.apache.tephra.snapshot.SnapshotCodecProvider;
+import org.apache.tephra.util.TxUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests filtering of invalid transaction data by the {@link TransactionProcessor} coprocessor.
+ */
+public class TransactionProcessorTest {
+ private static final Logger LOG = LoggerFactory.getLogger(TransactionProcessorTest.class);
+
+ // 8 versions, 1 hour apart, latest is current ts.
+ private static final long[] V;
+
+ static {
+ long now = System.currentTimeMillis();
+ V = new long[9];
+ for (int i = 0; i < V.length; i++) {
+ V[i] = (now - TimeUnit.HOURS.toMillis(8 - i)) * TxConstants.MAX_TX_PER_MS;
+ }
+ }
+
+ @ClassRule
+ public static TemporaryFolder tmpFolder = new TemporaryFolder();
+ private static MiniDFSCluster dfsCluster;
+ private static Configuration conf;
+ private static LongArrayList invalidSet = new LongArrayList(new long[]{V[3], V[5], V[7]});
+ private static TransactionVisibilityState txVisibilityState;
+
+ @BeforeClass
+ public static void setupBeforeClass() throws Exception {
+ Configuration hConf = new Configuration();
+ String rootDir = tmpFolder.newFolder().getAbsolutePath();
+ hConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, rootDir);
+ hConf.set(HConstants.HBASE_DIR, rootDir + "/hbase");
+
+ dfsCluster = new MiniDFSCluster.Builder(hConf).numDataNodes(1).build();
+ dfsCluster.waitActive();
+ conf = HBaseConfiguration.create(dfsCluster.getFileSystem().getConf());
+
+ conf.unset(TxConstants.Manager.CFG_TX_HDFS_USER);
+ conf.unset(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES);
+ String localTestDir = tmpFolder.newFolder().getAbsolutePath();
+ conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, localTestDir);
+ conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName());
+
+ // write an initial transaction snapshot
+ TransactionSnapshot txSnapshot = TransactionSnapshot.copyFrom(
+ System.currentTimeMillis(), V[6] - 1, V[7], invalidSet,
+ // this will set visibility upper bound to V[6]
+ Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx(V[6] - 1, Long.MAX_VALUE,
+ TransactionType.SHORT))),
+ new HashMap<Long, Set<ChangeId>>(), new TreeMap<Long, Set<ChangeId>>());
+ txVisibilityState = new TransactionSnapshot(txSnapshot.getTimestamp(), txSnapshot.getReadPointer(),
+ txSnapshot.getWritePointer(), txSnapshot.getInvalid(),
+ txSnapshot.getInProgress());
+ HDFSTransactionStateStorage tmpStorage =
+ new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf), new TxMetricsCollector());
+ tmpStorage.startAndWait();
+ tmpStorage.writeSnapshot(txSnapshot);
+ tmpStorage.stopAndWait();
+ }
+
+ @AfterClass
+ public static void shutdownAfterClass() throws Exception {
+ dfsCluster.shutdown();
+ }
+
+ @Test
+ public void testDataJanitorRegionScanner() throws Exception {
+ String tableName = "TestRegionScanner";
+ byte[] familyBytes = Bytes.toBytes("f");
+ byte[] columnBytes = Bytes.toBytes("c");
+ HRegion region = createRegion(tableName, familyBytes, TimeUnit.HOURS.toMillis(3));
+ try {
+ region.initialize();
+ TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
+ LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+
+ for (int i = 1; i <= 8; i++) {
+ for (int k = 1; k <= i; k++) {
+ Put p = new Put(Bytes.toBytes(i));
+ p.add(familyBytes, columnBytes, V[k], Bytes.toBytes(V[k]));
+ region.put(p);
+ }
+ }
+
+ List<Cell> results = Lists.newArrayList();
+
+ // force a flush to clear the data
+ // during flush, the coprocessor should drop all KeyValues with timestamps in the invalid set
+ LOG.info("Flushing region " + region.getRegionNameAsString());
+ region.flushcache();
+
+ // now a normal scan should only return the valid rows
+ // do not use a filter here to test that cleanup works on flush
+ Scan scan = new Scan();
+ scan.setMaxVersions(10);
+ RegionScanner regionScanner = region.getScanner(scan);
+
+ // first returned value should be "4" with version "4"
+ results.clear();
+ assertTrue(regionScanner.next(results));
+ assertKeyValueMatches(results, 4, new long[] {V[4]});
+
+ results.clear();
+ assertTrue(regionScanner.next(results));
+ assertKeyValueMatches(results, 5, new long[] {V[4]});
+
+ results.clear();
+ assertTrue(regionScanner.next(results));
+ assertKeyValueMatches(results, 6, new long[] {V[6], V[4]});
+
+ results.clear();
+ assertTrue(regionScanner.next(results));
+ assertKeyValueMatches(results, 7, new long[] {V[6], V[4]});
+
+ results.clear();
+ assertFalse(regionScanner.next(results));
+ assertKeyValueMatches(results, 8, new long[] {V[8], V[6], V[4]});
+ } finally {
+ region.close();
+ }
+ }
+
+ @Test
+ public void testDeleteFiltering() throws Exception {
+ String tableName = "TestDeleteFiltering";
+ byte[] familyBytes = Bytes.toBytes("f");
+ byte[] columnBytes = Bytes.toBytes("c");
+ HRegion region = createRegion(tableName, familyBytes, 0);
+ try {
+ region.initialize();
+ TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
+ LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+
+ byte[] row = Bytes.toBytes(1);
+ for (int i = 4; i < V.length; i++) {
+ Put p = new Put(row);
+ p.add(familyBytes, columnBytes, V[i], Bytes.toBytes(V[i]));
+ region.put(p);
+ }
+
+ // delete from the third entry back
+ // take that cell's timestamp + 1 to simulate a delete in a new tx
+ long deleteTs = V[5] + 1;
+ Delete d = new Delete(row, deleteTs);
+ LOG.info("Issuing delete at timestamp " + deleteTs);
+ // row deletes are not yet supported (TransactionAwareHTable normally handles this)
+ d.deleteColumns(familyBytes, columnBytes);
+ region.delete(d);
+
+ List<Cell> results = Lists.newArrayList();
+
+ // force a flush to clear the data
+ // during flush, we should drop the deleted version, but not the others
+ LOG.info("Flushing region " + region.getRegionNameAsString());
+ region.flushcache();
+
+ // now a normal scan should return row with versions at: V[8], V[6].
+ // V[7] is invalid and V[5] and prior are deleted.
+ Scan scan = new Scan();
+ scan.setMaxVersions(10);
+ RegionScanner regionScanner = region.getScanner(scan);
+ // should be only one row
+ assertFalse(regionScanner.next(results));
+ assertKeyValueMatches(results, 1,
+ new long[]{V[8], V[6], deleteTs},
+ new byte[][]{Bytes.toBytes(V[8]), Bytes.toBytes(V[6]), new byte[0]});
+ } finally {
+ region.close();
+ }
+ }
+
+ @Test
+ public void testDeleteMarkerCleanup() throws Exception {
+ String tableName = "TestDeleteMarkerCleanup";
+ byte[] familyBytes = Bytes.toBytes("f");
+ HRegion region = createRegion(tableName, familyBytes, 0);
+ try {
+ region.initialize();
+
+ // all puts use a timestamp before the tx snapshot's visibility upper bound, making them eligible for removal
+ long writeTs = txVisibilityState.getVisibilityUpperBound() - 10;
+ // deletes are performed after the writes, but still before the visibility upper bound
+ long deleteTs = writeTs + 1;
+ // write separate columns to confirm that delete markers survive across flushes
+ byte[] row = Bytes.toBytes(100);
+ Put p = new Put(row);
+
+ LOG.info("Writing columns at timestamp " + writeTs);
+ for (int i = 0; i < 5; i++) {
+ byte[] iBytes = Bytes.toBytes(i);
+ p.add(familyBytes, iBytes, writeTs, iBytes);
+ }
+ region.put(p);
+ // read all back
+ Scan scan = new Scan(row);
+ RegionScanner regionScanner = region.getScanner(scan);
+ List<Cell> results = Lists.newArrayList();
+ assertFalse(regionScanner.next(results));
+ for (int i = 0; i < 5; i++) {
+ Cell cell = results.get(i);
+ assertArrayEquals(row, cell.getRow());
+ byte[] idxBytes = Bytes.toBytes(i);
+ assertArrayEquals(idxBytes, cell.getQualifier());
+ assertArrayEquals(idxBytes, cell.getValue());
+ }
+
+ // force a flush to clear the memstore
+ LOG.info("Before delete, flushing region " + region.getRegionNameAsString());
+ region.flushcache();
+
+ // delete the odd entries
+ for (int i = 0; i < 5; i++) {
+ if (i % 2 == 1) {
+ // deletes are performed as puts with empty values
+ Put deletePut = new Put(row);
+ deletePut.add(familyBytes, Bytes.toBytes(i), deleteTs, new byte[0]);
+ region.put(deletePut);
+ }
+ }
+
+ // read all back
+ scan = new Scan(row);
+ scan.setFilter(TransactionFilters.getVisibilityFilter(TxUtils.createDummyTransaction(txVisibilityState),
+ new TreeMap<byte[], Long>(), false, ScanType.USER_SCAN));
+ regionScanner = region.getScanner(scan);
+ results = Lists.newArrayList();
+ assertFalse(regionScanner.next(results));
+ assertEquals(3, results.size());
+ // only even columns should exist
+ for (int i = 0; i < 3; i++) {
+ Cell cell = results.get(i);
+ LOG.info("Got cell " + cell);
+ assertArrayEquals(row, cell.getRow());
+ byte[] idxBytes = Bytes.toBytes(i * 2);
+ assertArrayEquals(idxBytes, cell.getQualifier());
+ assertArrayEquals(idxBytes, cell.getValue());
+ }
+
+ // force another flush on the delete markers
+ // during flush, we should retain the delete markers, since they can only safely be dropped by a major compaction
+ LOG.info("After delete, flushing region " + region.getRegionNameAsString());
+ region.flushcache();
+
+ scan = new Scan(row);
+ scan.setFilter(TransactionFilters.getVisibilityFilter(TxUtils.createDummyTransaction(txVisibilityState),
+ new TreeMap<byte[], Long>(), false, ScanType.USER_SCAN));
+ regionScanner = region.getScanner(scan);
+ results = Lists.newArrayList();
+ assertFalse(regionScanner.next(results));
+ assertEquals(3, results.size());
+ // only even columns should exist
+ for (int i = 0; i < 3; i++) {
+ Cell cell = results.get(i);
+ assertArrayEquals(row, cell.getRow());
+ byte[] idxBytes = Bytes.toBytes(i * 2);
+ assertArrayEquals(idxBytes, cell.getQualifier());
+ assertArrayEquals(idxBytes, cell.getValue());
+ }
+
+ // force a major compaction
+ LOG.info("Forcing major compaction of region " + region.getRegionNameAsString());
+ region.compactStores(true);
+
+ // perform a raw scan (no filter) to confirm that the delete markers are now gone
+ scan = new Scan(row);
+ regionScanner = region.getScanner(scan);
+ results = Lists.newArrayList();
+ assertFalse(regionScanner.next(results));
+ assertEquals(3, results.size());
+ // only even columns should exist
+ for (int i = 0; i < 3; i++) {
+ Cell cell = results.get(i);
+ assertArrayEquals(row, cell.getRow());
+ byte[] idxBytes = Bytes.toBytes(i * 2);
+ assertArrayEquals(idxBytes, cell.getQualifier());
+ assertArrayEquals(idxBytes, cell.getValue());
+ }
+ } finally {
+ region.close();
+ }
+ }
+
+ /**
+ * Test that we correctly preserve the timestamp set for column family delete markers. This is not
+ * directly required for the TransactionAwareHTable usage, but is the right thing to do and ensures
+ * that we make it easy to interoperate with other systems.
+ */
+ @Test
+ public void testFamilyDeleteTimestamp() throws Exception {
+ String tableName = "TestFamilyDeleteTimestamp";
+ byte[] family1Bytes = Bytes.toBytes("f1");
+ byte[] columnBytes = Bytes.toBytes("c");
+ byte[] rowBytes = Bytes.toBytes("row");
+ byte[] valBytes = Bytes.toBytes("val");
+ HRegion region = createRegion(tableName, family1Bytes, 0);
+ try {
+ region.initialize();
+
+ long now = System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS;
+ Put p = new Put(rowBytes);
+ p.add(family1Bytes, columnBytes, now - 10, valBytes);
+ region.put(p);
+
+ // issue a family delete with an explicit timestamp
+ Delete delete = new Delete(rowBytes, now);
+ delete.deleteFamily(family1Bytes, now - 5);
+ region.delete(delete);
+
+ // test that the delete marker preserved the timestamp
+ Scan scan = new Scan();
+ scan.setMaxVersions();
+ RegionScanner scanner = region.getScanner(scan);
+ List<Cell> results = Lists.newArrayList();
+ scanner.next(results);
+ assertEquals(2, results.size());
+ // delete marker should appear first
+ Cell cell = results.get(0);
+ assertArrayEquals(new byte[0], cell.getQualifier());
+ assertArrayEquals(new byte[0], cell.getValue());
+ assertEquals(now - 5, cell.getTimestamp());
+ // since this is an unfiltered scan against the region, the original put should be next
+ cell = results.get(1);
+ assertArrayEquals(valBytes, cell.getValue());
+ assertEquals(now - 10, cell.getTimestamp());
+ scanner.close();
+
+
+ // with a filtered scan the original put should disappear
+ scan = new Scan();
+ scan.setMaxVersions();
+ scan.setFilter(TransactionFilters.getVisibilityFilter(TxUtils.createDummyTransaction(txVisibilityState),
+ new TreeMap<byte[], Long>(), false, ScanType.USER_SCAN));
+ scanner = region.getScanner(scan);
+ results = Lists.newArrayList();
+ scanner.next(results);
+ assertEquals(0, results.size());
+ scanner.close();
+ } finally {
+ region.close();
+ }
+ }
+
+ @Test
+ public void testPreExistingData() throws Exception {
+ String tableName = "TestPreExistingData";
+ byte[] familyBytes = Bytes.toBytes("f");
+ long ttlMillis = TimeUnit.DAYS.toMillis(14);
+ HRegion region = createRegion(tableName, familyBytes, ttlMillis);
+ try {
+ region.initialize();
+
+ // timestamps for pre-existing, non-transactional data
+ long now = txVisibilityState.getVisibilityUpperBound() / TxConstants.MAX_TX_PER_MS;
+ long older = now - ttlMillis / 2;
+ long newer = now - ttlMillis / 3;
+ // timestamps for transactional data
+ long nowTx = txVisibilityState.getVisibilityUpperBound();
+ long olderTx = nowTx - (ttlMillis / 2) * TxConstants.MAX_TX_PER_MS;
+ long newerTx = nowTx - (ttlMillis / 3) * TxConstants.MAX_TX_PER_MS;
+
+ Map<byte[], Long> ttls = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+ ttls.put(familyBytes, ttlMillis);
+
+ List<Cell> cells = new ArrayList<>();
+ cells.add(new KeyValue(Bytes.toBytes("r1"), familyBytes, Bytes.toBytes("c1"), older, Bytes.toBytes("v11")));
+ cells.add(new KeyValue(Bytes.toBytes("r1"), familyBytes, Bytes.toBytes("c2"), newer, Bytes.toBytes("v12")));
+ cells.add(new KeyValue(Bytes.toBytes("r2"), familyBytes, Bytes.toBytes("c1"), older, Bytes.toBytes("v21")));
+ cells.add(new KeyValue(Bytes.toBytes("r2"), familyBytes, Bytes.toBytes("c2"), newer, Bytes.toBytes("v22")));
+ cells.add(new KeyValue(Bytes.toBytes("r3"), familyBytes, Bytes.toBytes("c1"), olderTx, Bytes.toBytes("v31")));
+ cells.add(new KeyValue(Bytes.toBytes("r3"), familyBytes, Bytes.toBytes("c2"), newerTx, Bytes.toBytes("v32")));
+
+ // Write non-transactional and transactional data
+ for (Cell c : cells) {
+ region.put(new Put(c.getRow()).add(c.getFamily(), c.getQualifier(), c.getTimestamp(), c.getValue()));
+ }
+
+ Scan rawScan = new Scan();
+ rawScan.setMaxVersions();
+
+ Transaction dummyTransaction = TxUtils.createDummyTransaction(txVisibilityState);
+ Scan txScan = new Scan();
+ txScan.setMaxVersions();
+ txScan.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttls, dummyTransaction, true),
+ TxUtils.getMaxVisibleTimestamp(dummyTransaction));
+ txScan.setFilter(TransactionFilters.getVisibilityFilter(dummyTransaction, ttls, false, ScanType.USER_SCAN));
+
+ // read all back with raw scanner
+ scanAndAssert(region, cells, rawScan);
+
+ // read all back with transaction filter
+ scanAndAssert(region, cells, txScan);
+
+ // force a flush to clear the memstore
+ region.flushcache();
+ scanAndAssert(region, cells, txScan);
+
+ // force a major compaction to remove any expired cells
+ region.compactStores(true);
+ scanAndAssert(region, cells, txScan);
+
+ // Reduce TTL, this should make cells with timestamps older and olderTx expire
+ long newTtl = ttlMillis / 2 - 1;
+ region = updateTtl(region, familyBytes, newTtl);
+ ttls.put(familyBytes, newTtl);
+ txScan.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttls, dummyTransaction, true),
+ TxUtils.getMaxVisibleTimestamp(dummyTransaction));
+ txScan.setFilter(TransactionFilters.getVisibilityFilter(dummyTransaction, ttls, false, ScanType.USER_SCAN));
+
+ // Raw scan should still give all cells
+ scanAndAssert(region, cells, rawScan);
+ // However, tx scan should not return expired cells
+ scanAndAssert(region, select(cells, 1, 3, 5), txScan);
+
+ region.flushcache();
+ scanAndAssert(region, cells, rawScan);
+
+ // force a major compaction to remove any expired cells
+ region.compactStores(true);
+ // This time raw scan too should not return expired cells, as they would be dropped during major compaction
+ scanAndAssert(region, select(cells, 1, 3, 5), rawScan);
+
+ // Reduce TTL again to 1 ms, this should expire all cells
+ newTtl = 1;
+ region = updateTtl(region, familyBytes, newTtl);
+ ttls.put(familyBytes, newTtl);
+ txScan.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttls, dummyTransaction, true),
+ TxUtils.getMaxVisibleTimestamp(dummyTransaction));
+ txScan.setFilter(TransactionFilters.getVisibilityFilter(dummyTransaction, ttls, false, ScanType.USER_SCAN));
+
+ // force a major compaction to remove expired cells
+ region.compactStores(true);
+ // This time raw scan should not return any cells, as all cells have expired.
+ scanAndAssert(region, Collections.<Cell>emptyList(), rawScan);
+ } finally {
+ region.close();
+ }
+ }
+
+ private List<Cell> select(List<Cell> cells, int... indexes) {
+ List<Cell> newCells = new ArrayList<>();
+ for (int i : indexes) {
+ newCells.add(cells.get(i));
+ }
+ return newCells;
+ }
+
+ @SuppressWarnings("StatementWithEmptyBody")
+ private void scanAndAssert(HRegion region, List<Cell> expected, Scan scan) throws Exception {
+ try (RegionScanner regionScanner = region.getScanner(scan)) {
+ List<Cell> results = Lists.newArrayList();
+ while (regionScanner.next(results)) { }
+ assertEquals(expected, results);
+ }
+ }
+
+ private HRegion updateTtl(HRegion region, byte[] family, long ttl) throws Exception {
+ region.close();
+ HTableDescriptor htd = region.getTableDesc();
+ HColumnDescriptor cfd = htd.getFamily(family);
+ if (ttl > 0) {
+ cfd.setValue(TxConstants.PROPERTY_TTL, String.valueOf(ttl));
+ }
+ cfd.setMaxVersions(10);
+ return HRegion.openHRegion(region.getRegionInfo(), htd, region.getWAL(), conf,
+ new LocalRegionServerServices(conf, ServerName.valueOf(
+ InetAddress.getLocalHost().getHostName(), 0, System.currentTimeMillis())), null);
+ }
+
+ private HRegion createRegion(String tableName, byte[] family, long ttl) throws IOException {
+ HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
+ HColumnDescriptor cfd = new HColumnDescriptor(family);
+ if (ttl > 0) {
+ cfd.setValue(TxConstants.PROPERTY_TTL, String.valueOf(ttl));
+ }
+ cfd.setMaxVersions(10);
+ htd.addFamily(cfd);
+ htd.addCoprocessor(TransactionProcessor.class.getName());
+ Path tablePath = FSUtils.getTableDir(FSUtils.getRootDir(conf), htd.getTableName());
+ FileSystem fs = FileSystem.get(conf);
+ assertTrue(fs.mkdirs(tablePath));
+ WALFactory walFactory = new WALFactory(conf, null, tableName + ".hlog");
+ WAL hLog = walFactory.getWAL(new byte[]{1});
+ HRegionInfo regionInfo = new HRegionInfo(TableName.valueOf(tableName));
+ HRegionFileSystem regionFS = HRegionFileSystem.createRegionOnFileSystem(conf, fs, tablePath, regionInfo);
+ return new HRegion(regionFS, hLog, conf, htd,
+ new LocalRegionServerServices(conf, ServerName.valueOf(
+ InetAddress.getLocalHost().getHostName(), 0, System.currentTimeMillis())));
+ }
+
+ private void assertKeyValueMatches(List<Cell> results, int index, long[] versions) {
+ byte[][] values = new byte[versions.length][];
+ for (int i = 0; i < versions.length; i++) {
+ values[i] = Bytes.toBytes(versions[i]);
+ }
+ assertKeyValueMatches(results, index, versions, values);
+ }
+
+ private void assertKeyValueMatches(List<Cell> results, int index, long[] versions, byte[][] values) {
+ assertEquals(versions.length, results.size());
+ assertEquals(values.length, results.size());
+ for (int i = 0; i < versions.length; i++) {
+ Cell kv = results.get(i);
+ assertArrayEquals(Bytes.toBytes(index), kv.getRow());
+ assertEquals(versions[i], kv.getTimestamp());
+ assertArrayEquals(values[i], kv.getValue());
+ }
+ }
+
+ @Test
+ public void testTransactionStateCache() throws Exception {
+ TransactionStateCache cache = new TransactionStateCache();
+ cache.setConf(conf);
+ cache.startAndWait();
+ // verify that the transaction snapshot read matches what we wrote in setupBeforeClass()
+ TransactionVisibilityState cachedSnapshot = cache.getLatestState();
+ assertNotNull(cachedSnapshot);
+ assertEquals(invalidSet, cachedSnapshot.getInvalid());
+ cache.stopAndWait();
+ }
+
+ private static class LocalRegionServerServices extends MockRegionServerServices {
+ private final ServerName serverName;
+
+ public LocalRegionServerServices(Configuration conf, ServerName serverName) {
+ super(conf);
+ this.serverName = serverName;
+ }
+
+ @Override
+ public ServerName getServerName() {
+ return serverName;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase10cdh/coprocessor/TransactionVisibilityFilterTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase10cdh/coprocessor/TransactionVisibilityFilterTest.java b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase10cdh/coprocessor/TransactionVisibilityFilterTest.java
new file mode 100644
index 0000000..562a8f9
--- /dev/null
+++ b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase10cdh/coprocessor/TransactionVisibilityFilterTest.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase10cdh.coprocessor;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.hbase.AbstractTransactionVisibilityFilterTest;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * HBase 1.0 (CDH) specific test for filtering logic applied when reading data transactionally.
+ */
+public class TransactionVisibilityFilterTest extends AbstractTransactionVisibilityFilterTest {
+ /**
+ * Test filtering of KeyValues for in-progress and invalid transactions.
+ * @throws Exception
+ */
+ @Test
+ public void testFiltering() throws Exception {
+ TxFilterFactory txFilterFactory = new TxFilterFactory() {
+ @Override
+ public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
+ return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN);
+ }
+ };
+ runFilteringTest(txFilterFactory,
+ ImmutableList.of(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ Filter.ReturnCode.SKIP,
+ Filter.ReturnCode.SKIP,
+ Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ Filter.ReturnCode.INCLUDE_AND_NEXT_COL));
+ }
+
+ @Test
+ public void testSubFilter() throws Exception {
+ final FilterBase includeFilter = new FilterBase() {
+ @Override
+ public ReturnCode filterKeyValue(Cell ignored) throws IOException {
+ return ReturnCode.INCLUDE;
+ }
+ };
+ TxFilterFactory txFilterFactory = new TxFilterFactory() {
+ @Override
+ public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
+ return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeFilter);
+ }
+ };
+ runFilteringTest(txFilterFactory,
+ ImmutableList.of(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ Filter.ReturnCode.SKIP,
+ Filter.ReturnCode.SKIP,
+ Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ Filter.ReturnCode.INCLUDE_AND_NEXT_COL));
+
+ final Filter skipFilter = new FilterBase() {
+ @Override
+ public ReturnCode filterKeyValue(Cell ignored) throws IOException {
+ return ReturnCode.SKIP;
+ }
+ };
+ txFilterFactory = new TxFilterFactory() {
+ @Override
+ public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
+ return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, skipFilter);
+ }
+ };
+ runFilteringTest(txFilterFactory,
+ ImmutableList.of(Filter.ReturnCode.NEXT_COL,
+ Filter.ReturnCode.NEXT_COL,
+ Filter.ReturnCode.SKIP,
+ Filter.ReturnCode.SKIP,
+ Filter.ReturnCode.NEXT_COL,
+ Filter.ReturnCode.NEXT_COL));
+
+ final Filter includeNextFilter = new FilterBase() {
+ @Override
+ public ReturnCode filterKeyValue(Cell ignored) throws IOException {
+ return ReturnCode.INCLUDE_AND_NEXT_COL;
+ }
+ };
+ txFilterFactory = new TxFilterFactory() {
+ @Override
+ public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
+ return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeNextFilter);
+ }
+ };
+ runFilteringTest(txFilterFactory,
+ ImmutableList.of(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ Filter.ReturnCode.SKIP,
+ Filter.ReturnCode.SKIP,
+ Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ Filter.ReturnCode.INCLUDE_AND_NEXT_COL));
+
+ final Filter nextColFilter = new FilterBase() {
+ @Override
+ public ReturnCode filterKeyValue(Cell ignored) throws IOException {
+ return ReturnCode.NEXT_COL;
+ }
+ };
+ txFilterFactory = new TxFilterFactory() {
+ @Override
+ public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
+ return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, nextColFilter);
+ }
+ };
+ runFilteringTest(txFilterFactory,
+ ImmutableList.of(Filter.ReturnCode.NEXT_COL,
+ Filter.ReturnCode.NEXT_COL,
+ Filter.ReturnCode.SKIP,
+ Filter.ReturnCode.SKIP,
+ Filter.ReturnCode.NEXT_COL,
+ Filter.ReturnCode.NEXT_COL));
+
+ }
+
+ @Test
+ public void testSubFilterOverride() throws Exception {
+ final FilterBase includeFilter = new FilterBase() {
+ @Override
+ public ReturnCode filterKeyValue(Cell ignored) throws IOException {
+ return ReturnCode.INCLUDE;
+ }
+ };
+ TxFilterFactory txFilterFactory = new TxFilterFactory() {
+ @Override
+ public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
+ return new CustomTxFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeFilter);
+ }
+ };
+ runFilteringTest(txFilterFactory,
+ ImmutableList.of(Filter.ReturnCode.INCLUDE,
+ Filter.ReturnCode.INCLUDE,
+ Filter.ReturnCode.SKIP,
+ Filter.ReturnCode.SKIP,
+ Filter.ReturnCode.INCLUDE,
+ Filter.ReturnCode.INCLUDE));
+
+ final Filter skipFilter = new FilterBase() {
+ @Override
+ public ReturnCode filterKeyValue(Cell ignored) throws IOException {
+ return ReturnCode.SKIP;
+ }
+ };
+ txFilterFactory = new TxFilterFactory() {
+ @Override
+ public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
+ return new CustomTxFilter(tx, familyTTLs, false, ScanType.USER_SCAN, skipFilter);
+ }
+ };
+ runFilteringTest(txFilterFactory,
+ ImmutableList.of(Filter.ReturnCode.NEXT_COL,
+ Filter.ReturnCode.NEXT_COL,
+ Filter.ReturnCode.SKIP,
+ Filter.ReturnCode.SKIP,
+ Filter.ReturnCode.NEXT_COL,
+ Filter.ReturnCode.NEXT_COL));
+
+ final Filter includeNextFilter = new FilterBase() {
+ @Override
+ public ReturnCode filterKeyValue(Cell ignored) throws IOException {
+ return ReturnCode.INCLUDE_AND_NEXT_COL;
+ }
+ };
+ txFilterFactory = new TxFilterFactory() {
+ @Override
+ public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
+ return new CustomTxFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeNextFilter);
+ }
+ };
+ runFilteringTest(txFilterFactory,
+ ImmutableList.of(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ Filter.ReturnCode.SKIP,
+ Filter.ReturnCode.SKIP,
+ Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ Filter.ReturnCode.INCLUDE_AND_NEXT_COL));
+
+ final Filter nextColFilter = new FilterBase() {
+ @Override
+ public ReturnCode filterKeyValue(Cell ignored) throws IOException {
+ return ReturnCode.NEXT_COL;
+ }
+ };
+ txFilterFactory = new TxFilterFactory() {
+ @Override
+ public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
+ return new CustomTxFilter(tx, familyTTLs, false, ScanType.USER_SCAN, nextColFilter);
+ }
+ };
+ runFilteringTest(txFilterFactory,
+ ImmutableList.of(Filter.ReturnCode.NEXT_COL,
+ Filter.ReturnCode.NEXT_COL,
+ Filter.ReturnCode.SKIP,
+ Filter.ReturnCode.SKIP,
+ Filter.ReturnCode.NEXT_COL,
+ Filter.ReturnCode.NEXT_COL));
+
+ }
+
+ private void runFilteringTest(TxFilterFactory txFilterFactory,
+ List<Filter.ReturnCode> assertCodes) throws Exception {
+ /*
+ * Start and stop some transactions. This will give us a transaction state something like the following
+ * (numbers only reflect ordering, not actual transaction IDs):
+ * 6 - in progress
+ * 5 - committed
+ * 4 - invalid
+ * 3 - in-progress
+ * 2 - committed
+ * 1 - committed
+ *
+ * read ptr = 5
+ * write ptr = 6
+ */
+
+ Transaction tx1 = txManager.startShort();
+ assertTrue(txManager.canCommit(tx1, EMPTY_CHANGESET));
+ assertTrue(txManager.commit(tx1));
+
+ Transaction tx2 = txManager.startShort();
+ assertTrue(txManager.canCommit(tx2, EMPTY_CHANGESET));
+ assertTrue(txManager.commit(tx2));
+
+ Transaction tx3 = txManager.startShort();
+ Transaction tx4 = txManager.startShort();
+ txManager.invalidate(tx4.getTransactionId());
+
+ Transaction tx5 = txManager.startShort();
+ assertTrue(txManager.canCommit(tx5, EMPTY_CHANGESET));
+ assertTrue(txManager.commit(tx5));
+
+ Transaction tx6 = txManager.startShort();
+
+ Map<byte[], Long> ttls = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+ Filter filter = txFilterFactory.getTxFilter(tx6, ttls);
+
+ assertEquals(assertCodes.get(5),
+ filter.filterKeyValue(newKeyValue("row1", "val1", tx6.getTransactionId())));
+ assertEquals(assertCodes.get(4),
+ filter.filterKeyValue(newKeyValue("row1", "val1", tx5.getTransactionId())));
+ assertEquals(assertCodes.get(3),
+ filter.filterKeyValue(newKeyValue("row1", "val1", tx4.getTransactionId())));
+ assertEquals(assertCodes.get(2),
+ filter.filterKeyValue(newKeyValue("row1", "val1", tx3.getTransactionId())));
+ assertEquals(assertCodes.get(1),
+ filter.filterKeyValue(newKeyValue("row1", "val1", tx2.getTransactionId())));
+ assertEquals(assertCodes.get(0),
+ filter.filterKeyValue(newKeyValue("row1", "val1", tx1.getTransactionId())));
+ }
+
+ /**
+ * Test filtering for TTL settings.
+ * @throws Exception
+ */
+ @Test
+ public void testTTLFiltering() throws Exception {
+ Map<byte[], Long> ttls = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+ ttls.put(FAM, 10L);
+ ttls.put(FAM2, 30L);
+ ttls.put(FAM3, 0L);
+
+ Transaction tx = txManager.startShort();
+ long now = tx.getVisibilityUpperBound();
+ Filter filter = new TransactionVisibilityFilter(tx, ttls, false, ScanType.USER_SCAN);
+ assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now)));
+ assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now - 1 * TxConstants.MAX_TX_PER_MS)));
+ assertEquals(Filter.ReturnCode.NEXT_COL,
+ filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now - 11 * TxConstants.MAX_TX_PER_MS)));
+ assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ filter.filterKeyValue(newKeyValue("row1", FAM2, "val1", now - 11 * TxConstants.MAX_TX_PER_MS)));
+ assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ filter.filterKeyValue(newKeyValue("row1", FAM2, "val1", now - 21 * TxConstants.MAX_TX_PER_MS)));
+ assertEquals(Filter.ReturnCode.NEXT_COL,
+ filter.filterKeyValue(newKeyValue("row1", FAM2, "val1", now - 31 * TxConstants.MAX_TX_PER_MS)));
+ assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ filter.filterKeyValue(newKeyValue("row1", FAM3, "val1", now - 31 * TxConstants.MAX_TX_PER_MS)));
+ assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ filter.filterKeyValue(newKeyValue("row1", FAM3, "val1", now - 1001 * TxConstants.MAX_TX_PER_MS)));
+ assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ filter.filterKeyValue(newKeyValue("row2", FAM, "val1", now)));
+ assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ filter.filterKeyValue(newKeyValue("row2", FAM, "val1", now - 1 * TxConstants.MAX_TX_PER_MS)));
+
+ // Verify ttl for pre-existing, non-transactional data
+ long preNow = now / TxConstants.MAX_TX_PER_MS;
+ assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ filter.filterKeyValue(newKeyValue("row1", FAM, "val1", preNow)));
+ assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ filter.filterKeyValue(newKeyValue("row1", FAM, "val1", preNow - 9L)));
+ assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ filter.filterKeyValue(newKeyValue("row1", FAM, "val1", preNow - 10L)));
+ assertEquals(Filter.ReturnCode.NEXT_COL,
+ filter.filterKeyValue(newKeyValue("row1", FAM, "val1", preNow - 11L)));
+ assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ filter.filterKeyValue(newKeyValue("row1", FAM3, "val1", preNow)));
+ assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ filter.filterKeyValue(newKeyValue("row1", FAM3, "val1", preNow - 9L)));
+ assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ filter.filterKeyValue(newKeyValue("row1", FAM3, "val1", preNow - 10L)));
+ assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ filter.filterKeyValue(newKeyValue("row1", FAM3, "val1", preNow - 1001L)));
+ }
+
+ protected KeyValue newKeyValue(String rowkey, String value, long timestamp) {
+ return new KeyValue(Bytes.toBytes(rowkey), FAM, COL, timestamp, Bytes.toBytes(value));
+ }
+
+ protected KeyValue newKeyValue(String rowkey, byte[] family, String value, long timestamp) {
+ return new KeyValue(Bytes.toBytes(rowkey), family, COL, timestamp, Bytes.toBytes(value));
+ }
+
+ private interface TxFilterFactory {
+ Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs);
+ }
+
+ private class CustomTxFilter extends TransactionVisibilityFilter {
+ public CustomTxFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues, ScanType scanType,
+ @Nullable Filter cellFilter) {
+ super(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter);
+ }
+
+ @Override
+ protected ReturnCode determineReturnCode(ReturnCode txFilterCode, ReturnCode subFilterCode) {
+ switch (subFilterCode) {
+ case INCLUDE:
+ return ReturnCode.INCLUDE;
+ case INCLUDE_AND_NEXT_COL:
+ return ReturnCode.INCLUDE_AND_NEXT_COL;
+ case SKIP:
+ return txFilterCode == ReturnCode.INCLUDE ? ReturnCode.SKIP : ReturnCode.NEXT_COL;
+ default:
+ return subFilterCode;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-hbase-compat-1.0/src/main/java/co/cask/tephra/hbase10/HBase10ConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/main/java/co/cask/tephra/hbase10/HBase10ConfigurationProvider.java b/tephra-hbase-compat-1.0/src/main/java/co/cask/tephra/hbase10/HBase10ConfigurationProvider.java
deleted file mode 100644
index a80e572..0000000
--- a/tephra-hbase-compat-1.0/src/main/java/co/cask/tephra/hbase10/HBase10ConfigurationProvider.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.hbase10;
-
-import co.cask.tephra.util.ConfigurationProvider;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-
-/**
- * HBase 1.0 version of {@link co.cask.tephra.util.ConfigurationProvider}.
- */
-public class HBase10ConfigurationProvider extends ConfigurationProvider {
- @Override
- public Configuration get() {
- return HBaseConfiguration.create();
- }
-
- @Override
- public Configuration get(Configuration baseConf) {
- return HBaseConfiguration.create(baseConf);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-hbase-compat-1.0/src/main/java/co/cask/tephra/hbase10/SecondaryIndexTable.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/main/java/co/cask/tephra/hbase10/SecondaryIndexTable.java b/tephra-hbase-compat-1.0/src/main/java/co/cask/tephra/hbase10/SecondaryIndexTable.java
deleted file mode 100644
index 1c9de95..0000000
--- a/tephra-hbase-compat-1.0/src/main/java/co/cask/tephra/hbase10/SecondaryIndexTable.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package co.cask.tephra.hbase10;
-
-import co.cask.tephra.TransactionContext;
-import co.cask.tephra.TransactionFailureException;
-import co.cask.tephra.distributed.TransactionServiceClient;
-import com.google.common.base.Throwables;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * A Transactional SecondaryIndexTable.
- */
-public class SecondaryIndexTable {
- private byte[] secondaryIndex;
- private TransactionAwareHTable transactionAwareHTable;
- private TransactionAwareHTable secondaryIndexTable;
- private TransactionContext transactionContext;
- private final TableName secondaryIndexTableName;
- private static final byte[] secondaryIndexFamily = Bytes.toBytes("secondaryIndexFamily");
- private static final byte[] secondaryIndexQualifier = Bytes.toBytes('r');
- private static final byte[] DELIMITER = new byte[] {0};
-
- public SecondaryIndexTable(TransactionServiceClient transactionServiceClient, HTableInterface hTable,
- byte[] secondaryIndex) {
- secondaryIndexTableName = TableName.valueOf(hTable.getName().getNameAsString() + ".idx");
- HTable secondaryIndexHTable = null;
- HBaseAdmin hBaseAdmin = null;
- try {
- hBaseAdmin = new HBaseAdmin(hTable.getConfiguration());
- if (!hBaseAdmin.tableExists(secondaryIndexTableName)) {
- hBaseAdmin.createTable(new HTableDescriptor(secondaryIndexTableName));
- }
- secondaryIndexHTable = new HTable(hTable.getConfiguration(), secondaryIndexTableName);
- } catch (Exception e) {
- Throwables.propagate(e);
- } finally {
- try {
- hBaseAdmin.close();
- } catch (Exception e) {
- Throwables.propagate(e);
- }
- }
-
- this.secondaryIndex = secondaryIndex;
- this.transactionAwareHTable = new TransactionAwareHTable(hTable);
- this.secondaryIndexTable = new TransactionAwareHTable(secondaryIndexHTable);
- this.transactionContext = new TransactionContext(transactionServiceClient, transactionAwareHTable,
- secondaryIndexTable);
- }
-
- public Result get(Get get) throws IOException {
- return get(Collections.singletonList(get))[0];
- }
-
- public Result[] get(List<Get> gets) throws IOException {
- try {
- transactionContext.start();
- Result[] result = transactionAwareHTable.get(gets);
- transactionContext.finish();
- return result;
- } catch (Exception e) {
- try {
- transactionContext.abort();
- } catch (TransactionFailureException e1) {
- throw new IOException("Could not rollback transaction", e1);
- }
- }
- return null;
- }
-
- public Result[] getByIndex(byte[] value) throws IOException {
- try {
- transactionContext.start();
- Scan scan = new Scan(value, Bytes.add(value, new byte[0]));
- scan.addColumn(secondaryIndexFamily, secondaryIndexQualifier);
- ResultScanner indexScanner = secondaryIndexTable.getScanner(scan);
-
- ArrayList<Get> gets = new ArrayList<Get>();
- for (Result result : indexScanner) {
- for (Cell cell : result.listCells()) {
- gets.add(new Get(cell.getValue()));
- }
- }
- Result[] results = transactionAwareHTable.get(gets);
- transactionContext.finish();
- return results;
- } catch (Exception e) {
- try {
- transactionContext.abort();
- } catch (TransactionFailureException e1) {
- throw new IOException("Could not rollback transaction", e1);
- }
- }
- return null;
- }
-
- public void put(Put put) throws IOException {
- put(Collections.singletonList(put));
- }
-
-
- public void put(List<Put> puts) throws IOException {
- try {
- transactionContext.start();
- ArrayList<Put> secondaryIndexPuts = new ArrayList<Put>();
- for (Put put : puts) {
- List<Put> indexPuts = new ArrayList<Put>();
- Set<Map.Entry<byte[], List<KeyValue>>> familyMap = put.getFamilyMap().entrySet();
- for (Map.Entry<byte [], List<KeyValue>> family : familyMap) {
- for (KeyValue value : family.getValue()) {
- if (Bytes.equals(value.getQualifierArray(), value.getQualifierOffset(), value.getQualifierLength(),
- secondaryIndex, 0, secondaryIndex.length)) {
- byte[] secondaryRow = Bytes.add(value.getQualifier(), DELIMITER,
- Bytes.add(value.getValue(), DELIMITER,
- value.getRow()));
- Put indexPut = new Put(secondaryRow);
- indexPut.add(secondaryIndexFamily, secondaryIndexQualifier, put.getRow());
- indexPuts.add(indexPut);
- }
- }
- }
- secondaryIndexPuts.addAll(indexPuts);
- }
- transactionAwareHTable.put(puts);
- secondaryIndexTable.put(secondaryIndexPuts);
- transactionContext.finish();
- } catch (Exception e) {
- try {
- transactionContext.abort();
- } catch (TransactionFailureException e1) {
- throw new IOException("Could not rollback transaction", e1);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-hbase-compat-1.0/src/main/java/co/cask/tephra/hbase10/TransactionAwareHTable.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/main/java/co/cask/tephra/hbase10/TransactionAwareHTable.java b/tephra-hbase-compat-1.0/src/main/java/co/cask/tephra/hbase10/TransactionAwareHTable.java
deleted file mode 100644
index cf308a6..0000000
--- a/tephra-hbase-compat-1.0/src/main/java/co/cask/tephra/hbase10/TransactionAwareHTable.java
+++ /dev/null
@@ -1,674 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package co.cask.tephra.hbase10;
-
-import co.cask.tephra.AbstractTransactionAwareTable;
-import co.cask.tephra.Transaction;
-import co.cask.tephra.TransactionAware;
-import co.cask.tephra.TxConstants;
-import com.google.protobuf.Descriptors.MethodDescriptor;
-import com.google.protobuf.Message;
-import com.google.protobuf.Service;
-import com.google.protobuf.ServiceException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Append;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Increment;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.OperationWithAttributes;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Row;
-import org.apache.hadoop.hbase.client.RowMutations;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.coprocessor.Batch;
-import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
-import org.apache.hadoop.hbase.filter.CompareFilter;
-import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.Set;
-
-/**
- * A Transaction Aware HTable implementation for HBase 1.0. Operations are committed as usual,
- * but upon a failed or aborted transaction, they are rolled back to the state before the transaction
- * was started.
- */
-public class TransactionAwareHTable extends AbstractTransactionAwareTable
- implements HTableInterface, TransactionAware {
-
- private static final Logger LOG = LoggerFactory.getLogger(TransactionAwareHTable.class);
- private final HTableInterface hTable;
-
- /**
- * Create a transactional aware instance of the passed HTable
- *
- * @param hTable underlying HBase table to use
- */
- public TransactionAwareHTable(HTableInterface hTable) {
- this(hTable, false);
- }
-
- /**
- * Create a transactional aware instance of the passed HTable
- *
- * @param hTable underlying HBase table to use
- * @param conflictLevel level of conflict detection to perform (defaults to {@code COLUMN})
- */
- public TransactionAwareHTable(HTableInterface hTable, TxConstants.ConflictDetection conflictLevel) {
- this(hTable, conflictLevel, false);
- }
-
- /**
- * Create a transactional aware instance of the passed HTable, with the option
- * of allowing non-transactional operations.
- * @param hTable underlying HBase table to use
- * @param allowNonTransactional if true, additional operations (checkAndPut, increment, checkAndDelete)
- * will be available, though non-transactional
- */
- public TransactionAwareHTable(HTableInterface hTable, boolean allowNonTransactional) {
- this(hTable, TxConstants.ConflictDetection.COLUMN, allowNonTransactional);
- }
-
- /**
- * Create a transactional aware instance of the passed HTable, with the option
- * of allowing non-transactional operations.
- * @param hTable underlying HBase table to use
- * @param conflictLevel level of conflict detection to perform (defaults to {@code COLUMN})
- * @param allowNonTransactional if true, additional operations (checkAndPut, increment, checkAndDelete)
- * will be available, though non-transactional
- */
- public TransactionAwareHTable(HTableInterface hTable, TxConstants.ConflictDetection conflictLevel,
- boolean allowNonTransactional) {
- super(conflictLevel, allowNonTransactional);
- this.hTable = hTable;
- }
-
- /* AbstractTransactionAwareTable implementation */
-
- @Override
- protected byte[] getTableKey() {
- return getTableName();
- }
-
- @Override
- protected boolean doCommit() throws IOException {
- hTable.flushCommits();
- return true;
- }
-
- @Override
- protected boolean doRollback() throws Exception {
- try {
- // pre-size arraylist of deletes
- int size = 0;
- for (Set<ActionChange> cs : changeSets.values()) {
- size += cs.size();
- }
- List<Delete> rollbackDeletes = new ArrayList<>(size);
- for (Map.Entry<Long, Set<ActionChange>> entry : changeSets.entrySet()) {
- long transactionTimestamp = entry.getKey();
- for (ActionChange change : entry.getValue()) {
- byte[] row = change.getRow();
- byte[] family = change.getFamily();
- byte[] qualifier = change.getQualifier();
- Delete rollbackDelete = new Delete(row);
- rollbackDelete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
- switch (conflictLevel) {
- case ROW:
- case NONE:
- // issue family delete for the tx write pointer
- rollbackDelete.deleteFamilyVersion(change.getFamily(), transactionTimestamp);
- break;
- case COLUMN:
- if (family != null && qualifier == null) {
- rollbackDelete.deleteFamilyVersion(family, transactionTimestamp);
- } else if (family != null && qualifier != null) {
- rollbackDelete.deleteColumn(family, qualifier, transactionTimestamp);
- }
- break;
- default:
- throw new IllegalStateException("Unknown conflict detection level: " + conflictLevel);
- }
- rollbackDeletes.add(rollbackDelete);
- }
- }
- hTable.delete(rollbackDeletes);
- return true;
- } finally {
- try {
- hTable.flushCommits();
- } catch (Exception e) {
- LOG.error("Could not flush HTable commits", e);
- }
- tx = null;
- changeSets.clear();
- }
- }
-
- /* HTableInterface implementation */
-
- @Override
- public byte[] getTableName() {
- return hTable.getTableName();
- }
-
- @Override
- public TableName getName() {
- return hTable.getName();
- }
-
- @Override
- public Configuration getConfiguration() {
- return hTable.getConfiguration();
- }
-
- @Override
- public HTableDescriptor getTableDescriptor() throws IOException {
- return hTable.getTableDescriptor();
- }
-
- @Override
- public boolean exists(Get get) throws IOException {
- if (tx == null) {
- throw new IOException("Transaction not started");
- }
- return hTable.exists(transactionalizeAction(get));
- }
-
- @Override
- public Boolean[] exists(List<Get> gets) throws IOException {
- if (tx == null) {
- throw new IOException("Transaction not started");
- }
- List<Get> transactionalizedGets = new ArrayList<Get>(gets.size());
- for (Get get : gets) {
- transactionalizedGets.add(transactionalizeAction(get));
- }
- return hTable.exists(transactionalizedGets);
- }
-
- @Override
- public void batch(List<? extends Row> actions, Object[] results) throws IOException, InterruptedException {
- if (tx == null) {
- throw new IOException("Transaction not started");
- }
- hTable.batch(transactionalizeActions(actions), results);
- }
-
- @Override
- public Object[] batch(List<? extends Row> actions) throws IOException, InterruptedException {
- if (tx == null) {
- throw new IOException("Transaction not started");
- }
- return hTable.batch(transactionalizeActions(actions));
- }
-
- @Override
- public <R> void batchCallback(List<? extends Row> actions, Object[] results, Batch.Callback<R> callback) throws
- IOException, InterruptedException {
- if (tx == null) {
- throw new IOException("Transaction not started");
- }
- hTable.batchCallback(transactionalizeActions(actions), results, callback);
- }
-
- @Override
- public <R> Object[] batchCallback(List<? extends Row> actions, Batch.Callback<R> callback) throws IOException,
- InterruptedException {
- if (tx == null) {
- throw new IOException("Transaction not started");
- }
- return hTable.batchCallback(transactionalizeActions(actions), callback);
- }
-
- @Override
- public Result get(Get get) throws IOException {
- if (tx == null) {
- throw new IOException("Transaction not started");
- }
- return hTable.get(transactionalizeAction(get));
- }
-
- @Override
- public Result[] get(List<Get> gets) throws IOException {
- if (tx == null) {
- throw new IOException("Transaction not started");
- }
- ArrayList<Get> transactionalizedGets = new ArrayList<Get>();
- for (Get get : gets) {
- transactionalizedGets.add(transactionalizeAction(get));
- }
- return hTable.get(transactionalizedGets);
- }
-
- @Override
- public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
- if (allowNonTransactional) {
- return hTable.getRowOrBefore(row, family);
- } else {
- throw new UnsupportedOperationException("Operation is not supported transactionally");
- }
- }
-
- @Override
- public ResultScanner getScanner(Scan scan) throws IOException {
- if (tx == null) {
- throw new IOException("Transaction not started");
- }
- return hTable.getScanner(transactionalizeAction(scan));
- }
-
- @Override
- public ResultScanner getScanner(byte[] family) throws IOException {
- if (tx == null) {
- throw new IOException("Transaction not started");
- }
- Scan scan = new Scan();
- scan.addFamily(family);
- return hTable.getScanner(transactionalizeAction(scan));
- }
-
- @Override
- public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException {
- if (tx == null) {
- throw new IOException("Transaction not started");
- }
- Scan scan = new Scan();
- scan.addColumn(family, qualifier);
- return hTable.getScanner(transactionalizeAction(scan));
- }
-
- @Override
- public void put(Put put) throws IOException {
- if (tx == null) {
- throw new IOException("Transaction not started");
- }
- Put txPut = transactionalizeAction(put);
- hTable.put(txPut);
- }
-
- @Override
- public void put(List<Put> puts) throws IOException {
- if (tx == null) {
- throw new IOException("Transaction not started");
- }
- List<Put> transactionalizedPuts = new ArrayList<Put>(puts.size());
- for (Put put : puts) {
- Put txPut = transactionalizeAction(put);
- transactionalizedPuts.add(txPut);
- }
- hTable.put(transactionalizedPuts);
- }
-
- @Override
- public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put) throws IOException {
- if (allowNonTransactional) {
- return hTable.checkAndPut(row, family, qualifier, value, put);
- } else {
- throw new UnsupportedOperationException("Operation is not supported transactionally");
- }
- }
-
- @Override
- public void delete(Delete delete) throws IOException {
- if (tx == null) {
- throw new IOException("Transaction not started");
- }
- hTable.delete(transactionalizeAction(delete));
- }
-
- @Override
- public void delete(List<Delete> deletes) throws IOException {
- if (tx == null) {
- throw new IOException("Transaction not started");
- }
- List<Delete> transactionalizedDeletes = new ArrayList<Delete>(deletes.size());
- for (Delete delete : deletes) {
- Delete txDelete = transactionalizeAction(delete);
- transactionalizedDeletes.add(txDelete);
- }
- hTable.delete(transactionalizedDeletes);
- }
-
- @Override
- public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, byte[] value, Delete delete)
- throws IOException {
- if (allowNonTransactional) {
- return hTable.checkAndDelete(row, family, qualifier, value, delete);
- } else {
- throw new UnsupportedOperationException("Operation is not supported transactionally");
- }
- }
-
- @Override
- public boolean checkAndDelete(byte[] bytes, byte[] bytes1, byte[] bytes2, CompareFilter.CompareOp compareOp,
- byte[] bytes3, Delete delete) throws IOException {
- if (allowNonTransactional) {
- return hTable.checkAndDelete(bytes, bytes1, bytes2, compareOp, bytes3, delete);
- } else {
- throw new UnsupportedOperationException("Operation is not supported transactionally");
- }
- }
-
- @Override
- public boolean checkAndPut(byte[] bytes, byte[] bytes1, byte[] bytes2, CompareFilter.CompareOp compareOp,
- byte[] bytes3, Put put) throws IOException {
- if (allowNonTransactional) {
- return hTable.checkAndPut(bytes, bytes1, bytes2, compareOp, bytes3, put);
- } else {
- throw new UnsupportedOperationException("Operation is not supported transactionally");
- }
- }
-
- @Override
- public boolean[] existsAll(List<Get> gets) throws IOException {
- if (tx == null) {
- throw new IOException("Transaction not started");
- }
- List<Get> transactionalizedGets = new ArrayList<Get>(gets.size());
- for (Get get : gets) {
- transactionalizedGets.add(transactionalizeAction(get));
- }
- return hTable.existsAll(transactionalizedGets);
- }
-
- @Override
- public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
- CompareFilter.CompareOp compareOp, byte[] value, RowMutations rowMutations)
- throws IOException {
- if (allowNonTransactional) {
- return hTable.checkAndMutate(row, family, qualifier, compareOp, value, rowMutations);
- }
-
- throw new UnsupportedOperationException("checkAndMutate operation is not supported transactionally");
- }
-
- @Override
- public void mutateRow(RowMutations rm) throws IOException {
- if (tx == null) {
- throw new IOException("Transaction not started");
- }
- RowMutations transactionalMutations = new RowMutations();
- for (Mutation mutation : rm.getMutations()) {
- if (mutation instanceof Put) {
- transactionalMutations.add(transactionalizeAction((Put) mutation));
- } else if (mutation instanceof Delete) {
- transactionalMutations.add(transactionalizeAction((Delete) mutation));
- }
- }
- hTable.mutateRow(transactionalMutations);
- }
-
- @Override
- public Result append(Append append) throws IOException {
- if (allowNonTransactional) {
- return hTable.append(append);
- } else {
- throw new UnsupportedOperationException("Operation is not supported transactionally");
- }
- }
-
- @Override
- public Result increment(Increment increment) throws IOException {
- if (allowNonTransactional) {
- return hTable.increment(increment);
- } else {
- throw new UnsupportedOperationException("Operation is not supported transactionally");
- }
- }
-
- @Override
- public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount) throws IOException {
- if (allowNonTransactional) {
- return hTable.incrementColumnValue(row, family, qualifier, amount);
- } else {
- throw new UnsupportedOperationException("Operation is not supported transactionally");
- }
- }
-
- @Override
- public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, Durability durability)
- throws IOException {
- if (allowNonTransactional) {
- return hTable.incrementColumnValue(row, family, qualifier, amount, durability);
- } else {
- throw new UnsupportedOperationException("Operation is not supported transactionally");
- }
- }
-
- @Override
- public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, boolean writeToWAL)
- throws IOException {
- if (allowNonTransactional) {
- return hTable.incrementColumnValue(row, family, qualifier, amount, writeToWAL);
- } else {
- throw new UnsupportedOperationException("Operation is not supported transactionally");
- }
- }
-
- @Override
- public boolean isAutoFlush() {
- return hTable.isAutoFlush();
- }
-
- @Override
- public void flushCommits() throws IOException {
- hTable.flushCommits();
- }
-
- @Override
- public void close() throws IOException {
- hTable.close();
- }
-
- @Override
- public CoprocessorRpcChannel coprocessorService(byte[] row) {
- return hTable.coprocessorService(row);
- }
-
- @Override
- public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service, byte[] startKey, byte[] endKey,
- Batch.Call<T, R> callable)
- throws ServiceException, Throwable {
- return hTable.coprocessorService(service, startKey, endKey, callable);
- }
-
- @Override
- public <T extends Service, R> void coprocessorService(Class<T> service, byte[] startKey, byte[] endKey,
- Batch.Call<T, R> callable, Batch.Callback<R> callback)
- throws ServiceException, Throwable {
- hTable.coprocessorService(service, startKey, endKey, callable, callback);
- }
-
- @Override
- public <R extends Message> Map<byte[], R> batchCoprocessorService(
- MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey,
- R responsePrototype) throws ServiceException, Throwable {
- return hTable.batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype);
- }
-
- @Override
- public <R extends Message> void batchCoprocessorService(MethodDescriptor methodDescriptor,
- Message request, byte[] startKey, byte[] endKey, R responsePrototype, Callback<R> callback)
- throws ServiceException, Throwable {
- hTable.batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype, callback);
- }
-
- @Override
- public void setAutoFlush(boolean autoFlush) {
- setAutoFlushTo(autoFlush);
- }
-
- @Override
- public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
- hTable.setAutoFlush(autoFlush, clearBufferOnFail);
- }
-
- @Override
- public void setAutoFlushTo(boolean autoFlush) {
- hTable.setAutoFlushTo(autoFlush);
- }
-
- @Override
- public long getWriteBufferSize() {
- return hTable.getWriteBufferSize();
- }
-
- @Override
- public void setWriteBufferSize(long writeBufferSize) throws IOException {
- hTable.setWriteBufferSize(writeBufferSize);
- }
-
- // Helpers to get copies of objects with the timestamp set to the current transaction timestamp.
-
- private Get transactionalizeAction(Get get) throws IOException {
- addToOperation(get, tx);
- return get;
- }
-
- private Scan transactionalizeAction(Scan scan) throws IOException {
- addToOperation(scan, tx);
- return scan;
- }
-
- private Put transactionalizeAction(Put put) throws IOException {
- Put txPut = new Put(put.getRow(), tx.getWritePointer());
- Set<Map.Entry<byte[], List<Cell>>> familyMap = put.getFamilyCellMap().entrySet();
- if (!familyMap.isEmpty()) {
- for (Map.Entry<byte[], List<Cell>> family : familyMap) {
- List<Cell> familyValues = family.getValue();
- if (!familyValues.isEmpty()) {
- for (Cell value : familyValues) {
- txPut.add(value.getFamily(), value.getQualifier(), tx.getWritePointer(), value.getValue());
- addToChangeSet(txPut.getRow(), value.getFamily(), value.getQualifier());
- }
- }
- }
- }
- for (Map.Entry<String, byte[]> entry : put.getAttributesMap().entrySet()) {
- txPut.setAttribute(entry.getKey(), entry.getValue());
- }
- txPut.setDurability(put.getDurability());
- addToOperation(txPut, tx);
- return txPut;
- }
-
- private Delete transactionalizeAction(Delete delete) throws IOException {
- long transactionTimestamp = tx.getWritePointer();
-
- byte[] deleteRow = delete.getRow();
- Delete txDelete = new Delete(deleteRow, transactionTimestamp);
-
- Map<byte[], List<Cell>> familyToDelete = delete.getFamilyCellMap();
- if (familyToDelete.isEmpty()) {
- // perform a row delete if we are using row-level conflict detection
- if (conflictLevel == TxConstants.ConflictDetection.ROW ||
- conflictLevel == TxConstants.ConflictDetection.NONE) {
- // Row delete leaves delete markers in all column families of the table
- // Therefore get all the column families of the hTable from the HTableDescriptor and add them to the changeSet
- for (HColumnDescriptor columnDescriptor : hTable.getTableDescriptor().getColumnFamilies()) {
- // no need to identify individual columns deleted
- addToChangeSet(deleteRow, columnDescriptor.getName(), null);
- }
- } else {
- Result result = get(new Get(delete.getRow()));
- // Delete everything
- NavigableMap<byte[], NavigableMap<byte[], byte[]>> resultMap = result.getNoVersionMap();
- for (Map.Entry<byte[], NavigableMap<byte[], byte[]>> familyEntry : resultMap.entrySet()) {
- NavigableMap<byte[], byte[]> familyColumns = result.getFamilyMap(familyEntry.getKey());
- for (Map.Entry<byte[], byte[]> column : familyColumns.entrySet()) {
- txDelete.deleteColumns(familyEntry.getKey(), column.getKey(), transactionTimestamp);
- addToChangeSet(deleteRow, familyEntry.getKey(), column.getKey());
- }
- }
- }
- } else {
- for (Map.Entry<byte [], List<Cell>> familyEntry : familyToDelete.entrySet()) {
- byte[] family = familyEntry.getKey();
- List<Cell> entries = familyEntry.getValue();
- boolean isFamilyDelete = false;
- if (entries.size() == 1) {
- Cell cell = entries.get(0);
- isFamilyDelete = CellUtil.isDeleteFamily(cell);
- }
- if (isFamilyDelete) {
- if (conflictLevel == TxConstants.ConflictDetection.ROW ||
- conflictLevel == TxConstants.ConflictDetection.NONE) {
- // no need to identify individual columns deleted
- txDelete.deleteFamily(family);
- addToChangeSet(deleteRow, family, null);
- } else {
- Result result = get(new Get(delete.getRow()).addFamily(family));
- // Delete entire family
- NavigableMap<byte[], byte[]> familyColumns = result.getFamilyMap(family);
- for (Map.Entry<byte[], byte[]> column : familyColumns.entrySet()) {
- txDelete.deleteColumns(family, column.getKey(), transactionTimestamp);
- addToChangeSet(deleteRow, family, column.getKey());
- }
- }
- } else {
- for (Cell value : entries) {
- txDelete.deleteColumns(value.getFamily(), value.getQualifier(), transactionTimestamp);
- addToChangeSet(deleteRow, value.getFamily(), value.getQualifier());
- }
- }
- }
- }
- for (Map.Entry<String, byte[]> entry : delete.getAttributesMap().entrySet()) {
- txDelete.setAttribute(entry.getKey(), entry.getValue());
- }
- txDelete.setDurability(delete.getDurability());
- return txDelete;
- }
-
- private List<? extends Row> transactionalizeActions(List<? extends Row> actions) throws IOException {
- List<Row> transactionalizedActions = new ArrayList<>(actions.size());
- for (Row action : actions) {
- if (action instanceof Get) {
- transactionalizedActions.add(transactionalizeAction((Get) action));
- } else if (action instanceof Put) {
- transactionalizedActions.add(transactionalizeAction((Put) action));
- } else if (action instanceof Delete) {
- transactionalizedActions.add(transactionalizeAction((Delete) action));
- } else {
- transactionalizedActions.add(action);
- }
- }
- return transactionalizedActions;
- }
-
- public void addToOperation(OperationWithAttributes op, Transaction tx) throws IOException {
- op.setAttribute(TxConstants.TX_OPERATION_ATTRIBUTE_KEY, txCodec.encode(tx));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-hbase-compat-1.0/src/main/java/co/cask/tephra/hbase10/coprocessor/CellSkipFilter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/main/java/co/cask/tephra/hbase10/coprocessor/CellSkipFilter.java b/tephra-hbase-compat-1.0/src/main/java/co/cask/tephra/hbase10/coprocessor/CellSkipFilter.java
deleted file mode 100644
index ab3418a..0000000
--- a/tephra-hbase-compat-1.0/src/main/java/co/cask/tephra/hbase10/coprocessor/CellSkipFilter.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.hbase10.coprocessor;
-
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.FilterBase;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * {@link Filter} that encapsulates another {@link Filter}. It remembers the last {@link KeyValue}
- * for which the underlying filter returned the {@link ReturnCode#NEXT_COL} or {@link ReturnCode#INCLUDE_AND_NEXT_COL},
- * so that when {@link #filterKeyValue} is called again for the same {@link KeyValue} with different
- * version, it returns {@link ReturnCode#NEXT_COL} directly without consulting the underlying {@link Filter}.
- * Please see TEPHRA-169 for more details.
- */
-public class CellSkipFilter extends FilterBase {
- private final Filter filter;
- // remember the previous keyvalue processed by filter when the return code was NEXT_COL or INCLUDE_AND_NEXT_COL
- private KeyValue skipColumn = null;
-
- public CellSkipFilter(Filter filter) {
- this.filter = filter;
- }
-
- /**
- * Determines whether the current cell should be skipped. The cell will be skipped
- * if the previous keyvalue had the same key as the current cell. This means filter already responded
- * for the previous keyvalue with ReturnCode.NEXT_COL or ReturnCode.INCLUDE_AND_NEXT_COL.
- * @param cell the {@link Cell} to be tested for skipping
- * @return true is current cell should be skipped, false otherwise
- */
- private boolean skipCellVersion(Cell cell) {
- return skipColumn != null
- && CellUtil.matchingRow(cell, skipColumn.getRowArray(), skipColumn.getRowOffset(),
- skipColumn.getRowLength())
- && CellUtil.matchingFamily(cell, skipColumn.getFamilyArray(), skipColumn.getFamilyOffset(),
- skipColumn.getFamilyLength())
- && CellUtil.matchingQualifier(cell, skipColumn.getQualifierArray(), skipColumn.getQualifierOffset(),
- skipColumn.getQualifierLength());
- }
-
- @Override
- public ReturnCode filterKeyValue(Cell cell) throws IOException {
- if (skipCellVersion(cell)) {
- return ReturnCode.NEXT_COL;
- }
-
- ReturnCode code = filter.filterKeyValue(cell);
- if (code == ReturnCode.NEXT_COL || code == ReturnCode.INCLUDE_AND_NEXT_COL) {
- // only store the reference to the keyvalue if we are returning NEXT_COL or INCLUDE_AND_NEXT_COL
- skipColumn = KeyValueUtil.createFirstOnRow(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
- cell.getFamilyArray(), cell.getFamilyOffset(),
- cell.getFamilyLength(), cell.getQualifierArray(),
- cell.getQualifierOffset(), cell.getQualifierLength());
- } else {
- skipColumn = null;
- }
- return code;
- }
-
- @Override
- public boolean filterRow() throws IOException {
- return filter.filterRow();
- }
-
- @Override
- public Cell transformCell(Cell cell) throws IOException {
- return filter.transformCell(cell);
- }
-
- @Override
- public void reset() throws IOException {
- filter.reset();
- }
-
- @Override
- public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException {
- return filter.filterRowKey(buffer, offset, length);
- }
-
- @Override
- public boolean filterAllRemaining() throws IOException {
- return filter.filterAllRemaining();
- }
-
- @Override
- public void filterRowCells(List<Cell> kvs) throws IOException {
- filter.filterRowCells(kvs);
- }
-
- @Override
- public boolean hasFilterRow() {
- return filter.hasFilterRow();
- }
-
- @SuppressWarnings("deprecation")
- @Override
- public KeyValue getNextKeyHint(KeyValue currentKV) throws IOException {
- return filter.getNextKeyHint(currentKV);
- }
-
- @Override
- public Cell getNextCellHint(Cell currentKV) throws IOException {
- return filter.getNextCellHint(currentKV);
- }
-
- @Override
- public boolean isFamilyEssential(byte[] name) throws IOException {
- return filter.isFamilyEssential(name);
- }
-
- @Override
- public byte[] toByteArray() throws IOException {
- return filter.toByteArray();
- }
-}