You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2016/05/11 21:37:22 UTC
[24/31] incubator-tephra git commit: TEPHRA-168 Remove HBase version
from co-processor package names
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/d00d9f6d/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java
new file mode 100644
index 0000000..0ca9f9c
--- /dev/null
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.coprocessor;
+
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.tephra.Transaction;
+
+import java.util.Map;
+import javax.annotation.Nullable;
+
+/**
+ * Factory class for providing {@link Filter} instances.
+ */
+public class TransactionFilters {
+ /**
+ * Creates a new {@link org.apache.hadoop.hbase.filter.Filter} for returning data only from visible transactions.
+ *
+ * @param tx the current transaction to apply. Only data visible to this transaction will be returned.
+ * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
+ * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
+ * these will be interpreted as "delete" markers and the column will be filtered out
+ * @param scanType the type of scan operation being performed
+ */
+ public static Filter getVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
+ ScanType scanType) {
+ return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, null));
+ }
+
+ /**
+ * Creates a new {@link org.apache.hadoop.hbase.filter.Filter} for returning data only from visible transactions.
+ *
+ * @param tx the current transaction to apply. Only data visible to this transaction will be returned.
+ * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
+ * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
+ * these will be interpreted as "delete" markers and the column will be filtered out
+ * @param scanType the type of scan operation being performed
+ * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by
+ * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then
+ * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead.
+ */
+ public static Filter getVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
+ ScanType scanType, @Nullable Filter cellFilter) {
+ return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/d00d9f6d/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
new file mode 100644
index 0000000..0f60910
--- /dev/null
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.coprocessor;
+
+import com.google.common.base.Supplier;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.OperationWithAttributes;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreScanner;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.TransactionCodec;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.coprocessor.TransactionStateCache;
+import org.apache.tephra.coprocessor.TransactionStateCacheSupplier;
+import org.apache.tephra.persist.TransactionVisibilityState;
+import org.apache.tephra.util.TxUtils;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+
+/**
+ * {@code org.apache.hadoop.hbase.coprocessor.RegionObserver} coprocessor that handles server-side processing
+ * for transactions:
+ * <ul>
+ * <li>applies filtering to exclude data from invalid and in-progress transactions</li>
+ * <li>overrides the scanner returned for flush and compaction to drop data written by invalidated transactions,
+ * or expired due to TTL.</li>
+ * </ul>
+ *
+ * <p>In order to use this coprocessor for transactions, configure the class on any table involved in transactions,
+ * or on all user tables by adding the following to hbase-site.xml:
+ * {@code
+ * <property>
+ * <name>hbase.coprocessor.region.classes</name>
+ * <value>org.apache.tephra.hbase.coprocessor.TransactionProcessor</value>
+ * </property>
+ * }
+ * </p>
+ *
+ * <p>HBase {@code Get} and {@code Scan} operations should have the current transaction serialized on to the operation
+ * as an attribute:
+ * {@code
+ * Transaction t = ...;
+ * Get get = new Get(...);
+ * TransactionCodec codec = new TransactionCodec();
+ * codec.addToOperation(get, t);
+ * }
+ * </p>
+ */
+public class TransactionProcessor extends BaseRegionObserver {
+ private static final Log LOG = LogFactory.getLog(TransactionProcessor.class);
+
+ private TransactionStateCache cache;
+ private final TransactionCodec txCodec;
+ protected Map<byte[], Long> ttlByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+ protected boolean allowEmptyValues = TxConstants.ALLOW_EMPTY_VALUES_DEFAULT;
+ protected boolean readNonTxnData = TxConstants.DEFAULT_READ_NON_TX_DATA;
+
+ public TransactionProcessor() {
+ this.txCodec = new TransactionCodec();
+ }
+
+ /* RegionObserver implementation */
+
+ @Override
+ public void start(CoprocessorEnvironment e) throws IOException {
+ if (e instanceof RegionCoprocessorEnvironment) {
+ RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e;
+ Supplier<TransactionStateCache> cacheSupplier = getTransactionStateCacheSupplier(env);
+ this.cache = cacheSupplier.get();
+
+ HTableDescriptor tableDesc = env.getRegion().getTableDesc();
+ for (HColumnDescriptor columnDesc : tableDesc.getFamilies()) {
+ String columnTTL = columnDesc.getValue(TxConstants.PROPERTY_TTL);
+ long ttl = 0;
+ if (columnTTL != null) {
+ try {
+ ttl = Long.parseLong(columnTTL);
+ LOG.info("Family " + columnDesc.getNameAsString() + " has TTL of " + columnTTL);
+ } catch (NumberFormatException nfe) {
+ LOG.warn("Invalid TTL value configured for column family " + columnDesc.getNameAsString() +
+ ", value = " + columnTTL);
+ }
+ }
+ ttlByFamily.put(columnDesc.getName(), ttl);
+ }
+
+ this.allowEmptyValues = env.getConfiguration().getBoolean(TxConstants.ALLOW_EMPTY_VALUES_KEY,
+ TxConstants.ALLOW_EMPTY_VALUES_DEFAULT);
+ this.readNonTxnData = Boolean.valueOf(tableDesc.getValue(TxConstants.READ_NON_TX_DATA));
+ if (readNonTxnData) {
+ LOG.info("Reading pre-existing data enabled for table " + tableDesc.getNameAsString());
+ }
+ }
+ }
+
+ protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
+ return new TransactionStateCacheSupplier(env.getConfiguration());
+ }
+
+ @Override
+ public void stop(CoprocessorEnvironment e) throws IOException {
+ // nothing to do
+ }
+
+ @Override
+ public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results)
+ throws IOException {
+ Transaction tx = getFromOperation(get);
+ if (tx != null) {
+ projectFamilyDeletes(get);
+ get.setMaxVersions();
+ get.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttlByFamily, tx, readNonTxnData),
+ TxUtils.getMaxVisibleTimestamp(tx));
+ Filter newFilter = getTransactionFilter(tx, ScanType.USER_SCAN, get.getFilter());
+ get.setFilter(newFilter);
+ }
+ }
+
+ @Override
+ public void preDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit,
+ Durability durability) throws IOException {
+ // Translate deletes into our own delete tombstones
+ // Since HBase deletes cannot be undone, we need to translate deletes into special puts, which allows
+ // us to rollback the changes (by a real delete) if the transaction fails
+
+ // Deletes that are part of a transaction rollback do not need special handling.
+ // They will never be rolled back, so are performed as normal HBase deletes.
+ if (isRollbackOperation(delete)) {
+ return;
+ }
+
+ // Other deletes are client-initiated and need to be translated into our own tombstones
+ // TODO: this should delegate to the DeleteStrategy implementation.
+ Put deleteMarkers = new Put(delete.getRow(), delete.getTimeStamp());
+ for (byte[] family : delete.getFamilyCellMap().keySet()) {
+ List<Cell> familyCells = delete.getFamilyCellMap().get(family);
+ if (isFamilyDelete(familyCells)) {
+ deleteMarkers.add(family, TxConstants.FAMILY_DELETE_QUALIFIER, familyCells.get(0).getTimestamp(),
+ HConstants.EMPTY_BYTE_ARRAY);
+ } else {
+ int cellSize = familyCells.size();
+ for (int i = 0; i < cellSize; i++) {
+ Cell cell = familyCells.get(i);
+ deleteMarkers.add(family, CellUtil.cloneQualifier(cell), cell.getTimestamp(),
+ HConstants.EMPTY_BYTE_ARRAY);
+ }
+ }
+ }
+ for (Map.Entry<String, byte[]> entry : delete.getAttributesMap().entrySet()) {
+ deleteMarkers.setAttribute(entry.getKey(), entry.getValue());
+ }
+ e.getEnvironment().getRegion().put(deleteMarkers);
+ // skip normal delete handling
+ e.bypass();
+ }
+
+ private boolean isFamilyDelete(List<Cell> familyCells) {
+ return familyCells.size() == 1 && CellUtil.isDeleteFamily(familyCells.get(0));
+ }
+
+ @Override
+ public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan, RegionScanner s)
+ throws IOException {
+ Transaction tx = getFromOperation(scan);
+ if (tx != null) {
+ projectFamilyDeletes(scan);
+ scan.setMaxVersions();
+ scan.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttlByFamily, tx, readNonTxnData),
+ TxUtils.getMaxVisibleTimestamp(tx));
+ Filter newFilter = getTransactionFilter(tx, ScanType.USER_SCAN, scan.getFilter());
+ scan.setFilter(newFilter);
+ }
+ return s;
+ }
+
+ /**
+ * Ensures that family delete markers are present in the columns requested for any scan operation.
+ * @param scan The original scan request
+ * @return The modified scan request with the family delete qualifiers represented
+ */
+ private Scan projectFamilyDeletes(Scan scan) {
+ for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
+ NavigableSet<byte[]> columns = entry.getValue();
+ // wildcard scans will automatically include the delete marker, so only need to add it when we have
+ // explicit columns listed
+ if (columns != null && !columns.isEmpty()) {
+ scan.addColumn(entry.getKey(), TxConstants.FAMILY_DELETE_QUALIFIER);
+ }
+ }
+ return scan;
+ }
+
+ /**
+ * Ensures that family delete markers are present in the columns requested for any get operation.
+ * @param get The original get request
+ * @return The modified get request with the family delete qualifiers represented
+ */
+ private Get projectFamilyDeletes(Get get) {
+ for (Map.Entry<byte[], NavigableSet<byte[]>> entry : get.getFamilyMap().entrySet()) {
+ NavigableSet<byte[]> columns = entry.getValue();
+ // wildcard scans will automatically include the delete marker, so only need to add it when we have
+ // explicit columns listed
+ if (columns != null && !columns.isEmpty()) {
+ get.addColumn(entry.getKey(), TxConstants.FAMILY_DELETE_QUALIFIER);
+ }
+ }
+ return get;
+ }
+
+ @Override
+ public InternalScanner preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+ KeyValueScanner memstoreScanner, InternalScanner scanner)
+ throws IOException {
+ return createStoreScanner(c.getEnvironment(), "flush", cache.getLatestState(), store,
+ Collections.singletonList(memstoreScanner), ScanType.COMPACT_RETAIN_DELETES,
+ HConstants.OLDEST_TIMESTAMP);
+ }
+
+ @Override
+ public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+ List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s,
+ CompactionRequest request)
+ throws IOException {
+ return createStoreScanner(c.getEnvironment(), "compaction", cache.getLatestState(), store, scanners,
+ scanType, earliestPutTs);
+ }
+
+ protected InternalScanner createStoreScanner(RegionCoprocessorEnvironment env, String action,
+ TransactionVisibilityState snapshot, Store store,
+ List<? extends KeyValueScanner> scanners, ScanType type,
+ long earliestPutTs) throws IOException {
+ if (snapshot == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Region " + env.getRegion().getRegionNameAsString() +
+ ", no current transaction state found, defaulting to normal " + action + " scanner");
+ }
+ return null;
+ }
+
+ // construct a dummy transaction from the latest snapshot
+ Transaction dummyTx = TxUtils.createDummyTransaction(snapshot);
+ Scan scan = new Scan();
+ // need to see all versions, since we filter out excludes and applications may rely on multiple versions
+ scan.setMaxVersions();
+ scan.setFilter(
+ new IncludeInProgressFilter(dummyTx.getVisibilityUpperBound(),
+ snapshot.getInvalid(),
+ getTransactionFilter(dummyTx, type, null)));
+
+ return new StoreScanner(store, store.getScanInfo(), scan, scanners,
+ type, store.getSmallestReadPoint(), earliestPutTs);
+ }
+
+ private Transaction getFromOperation(OperationWithAttributes op) throws IOException {
+ byte[] encoded = op.getAttribute(TxConstants.TX_OPERATION_ATTRIBUTE_KEY);
+ if (encoded == null) {
+ // to support old clients
+ encoded = op.getAttribute(TxConstants.OLD_TX_OPERATION_ATTRIBUTE_KEY);
+ }
+ if (encoded != null) {
+ return txCodec.decode(encoded);
+ }
+ return null;
+ }
+
+ private boolean isRollbackOperation(OperationWithAttributes op) throws IOException {
+ return op.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null ||
+ // to support old clients
+ op.getAttribute(TxConstants.OLD_TX_ROLLBACK_ATTRIBUTE_KEY) != null;
+ }
+
+ /**
+ * Derived classes can override this method to customize the filter used to return data visible for the current
+ * transaction.
+ *
+ * @param tx the current transaction to apply
+ * @param type the type of scan being performed
+ */
+ protected Filter getTransactionFilter(Transaction tx, ScanType type, Filter filter) {
+ return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter);
+ }
+
+ /**
+ * Filter used to include cells visible to in-progress transactions on flush and commit.
+ */
+ static class IncludeInProgressFilter extends FilterBase {
+ private final long visibilityUpperBound;
+ private final Set<Long> invalidIds;
+ private final Filter txFilter;
+
+ public IncludeInProgressFilter(long upperBound, Collection<Long> invalids, Filter transactionFilter) {
+ this.visibilityUpperBound = upperBound;
+ this.invalidIds = Sets.newHashSet(invalids);
+ this.txFilter = transactionFilter;
+ }
+
+ @Override
+ public ReturnCode filterKeyValue(Cell cell) throws IOException {
+ // include all cells visible to in-progress transactions, except for those already marked as invalid
+ long ts = cell.getTimestamp();
+ if (ts > visibilityUpperBound) {
+ // include everything that could still be in-progress except invalids
+ if (invalidIds.contains(ts)) {
+ return ReturnCode.SKIP;
+ }
+ return ReturnCode.INCLUDE;
+ }
+ return txFilter.filterKeyValue(cell);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/d00d9f6d/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
new file mode 100644
index 0000000..a3b902a
--- /dev/null
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.coprocessor;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.util.TxUtils;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+/**
+ * Applies filtering of data based on transactional visibility (HBase 0.98+ specific version).
+ * Note: this is intended for server-side use only, as additional properties need to be set on
+ * any {@code Scan} or {@code Get} operation performed.
+ */
+public class TransactionVisibilityFilter extends FilterBase {
+ private final Transaction tx;
+ // oldest visible timestamp by column family, used to apply TTL when reading
+ private final Map<byte[], Long> oldestTsByFamily;
+ // if false, empty values will be interpreted as deletes
+ private final boolean allowEmptyValues;
+ // whether or not we can remove delete markers
+ // these can only be safely removed when we are traversing all storefiles
+ private final boolean clearDeletes;
+ // optional sub-filter to apply to visible cells
+ private final Filter cellFilter;
+
+ // since we traverse KVs in order, cache the current oldest TS to avoid map lookups per KV
+ private byte[] currentFamily = new byte[0];
+ private long currentOldestTs;
+
+ private DeleteTracker deleteTracker = new DeleteTracker();
+
+ /**
+ * Creates a new {@link org.apache.hadoop.hbase.filter.Filter} for returning data only from visible transactions.
+ *
+ * @param tx the current transaction to apply. Only data visible to this transaction will be returned.
+ * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
+ * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
+ * these will be interpreted as "delete" markers and the column will be filtered out
+ * @param scanType the type of scan operation being performed
+ */
+ public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
+ ScanType scanType) {
+ this(tx, ttlByFamily, allowEmptyValues, scanType, null);
+ }
+
+ /**
+ * Creates a new {@link org.apache.hadoop.hbase.filter.Filter} for returning data only from visible transactions.
+ *
+ * @param tx the current transaction to apply. Only data visible to this transaction will be returned.
+ * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
+ * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
+ * these will be interpreted as "delete" markers and the column will be filtered out
+ * @param scanType the type of scan operation being performed
+ * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by
+ * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then
+ * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead.
+ */
+ public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
+ ScanType scanType, @Nullable Filter cellFilter) {
+ this.tx = tx;
+ this.oldestTsByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+ for (Map.Entry<byte[], Long> ttlEntry : ttlByFamily.entrySet()) {
+ long familyTTL = ttlEntry.getValue();
+ oldestTsByFamily.put(ttlEntry.getKey(),
+ familyTTL <= 0 ? 0 : tx.getVisibilityUpperBound() - familyTTL * TxConstants.MAX_TX_PER_MS);
+ }
+ this.allowEmptyValues = allowEmptyValues;
+ this.clearDeletes =
+ scanType == ScanType.COMPACT_DROP_DELETES ||
+ (scanType == ScanType.USER_SCAN && tx.getVisibilityLevel() != Transaction.VisibilityLevel.SNAPSHOT_ALL);
+ this.cellFilter = cellFilter;
+ }
+
+ @Override
+ public ReturnCode filterKeyValue(Cell cell) throws IOException {
+ if (!CellUtil.matchingFamily(cell, currentFamily)) {
+ // column family changed
+ currentFamily = CellUtil.cloneFamily(cell);
+ Long familyOldestTs = oldestTsByFamily.get(currentFamily);
+ currentOldestTs = familyOldestTs != null ? familyOldestTs : 0;
+ deleteTracker.reset();
+ }
+ // need to apply TTL for the column family here
+ long kvTimestamp = cell.getTimestamp();
+ if (TxUtils.getTimestampForTTL(kvTimestamp) < currentOldestTs) {
+ // passed TTL for this column, seek to next
+ return ReturnCode.NEXT_COL;
+ } else if (tx.isVisible(kvTimestamp)) {
+ // Return all writes done by current transaction (including deletes) for VisibilityLevel.SNAPSHOT_ALL
+ if (tx.getVisibilityLevel() == Transaction.VisibilityLevel.SNAPSHOT_ALL && tx.isCurrentWrite(kvTimestamp)) {
+ // cell is visible
+ // visibility SNAPSHOT_ALL needs all matches
+ return runSubFilter(ReturnCode.INCLUDE, cell);
+ }
+ if (DeleteTracker.isFamilyDelete(cell)) {
+ deleteTracker.addFamilyDelete(cell);
+ if (clearDeletes) {
+ return ReturnCode.NEXT_COL;
+ } else {
+ // cell is visible
+ // as soon as we find a KV to include we can move to the next column
+ return runSubFilter(ReturnCode.INCLUDE_AND_NEXT_COL, cell);
+ }
+ }
+ // check if masked by family delete
+ if (deleteTracker.isDeleted(cell)) {
+ return ReturnCode.NEXT_COL;
+ }
+ // check for column delete
+ if (isColumnDelete(cell)) {
+ if (clearDeletes) {
+ // skip "deleted" cell
+ return ReturnCode.NEXT_COL;
+ } else {
+ // keep the marker but skip any remaining versions
+ return runSubFilter(ReturnCode.INCLUDE_AND_NEXT_COL, cell);
+ }
+ }
+ // cell is visible
+ // as soon as we find a KV to include we can move to the next column
+ return runSubFilter(ReturnCode.INCLUDE_AND_NEXT_COL, cell);
+ } else {
+ return ReturnCode.SKIP;
+ }
+ }
+
+ private ReturnCode runSubFilter(ReturnCode txFilterCode, Cell cell) throws IOException {
+ if (cellFilter != null) {
+ ReturnCode subFilterCode = cellFilter.filterKeyValue(cell);
+ return determineReturnCode(txFilterCode, subFilterCode);
+ }
+ return txFilterCode;
+ }
+
+ /**
+ * Determines the return code of TransactionVisibilityFilter based on sub-filter's return code.
+ * Sub-filter can only exclude cells included by TransactionVisibilityFilter, i.e., sub-filter's
+ * INCLUDE will be ignored. This behavior makes sure that sub-filter only sees cell versions valid for the
+ * given transaction. If sub-filter needs to see older versions of cell, then this method can be overridden.
+ *
+ * @param txFilterCode return code from TransactionVisibilityFilter
+ * @param subFilterCode return code from sub-filter
+ * @return final return code
+ */
+ protected ReturnCode determineReturnCode(ReturnCode txFilterCode, ReturnCode subFilterCode) {
+ // Return the more restrictive of the two filter responses
+ switch (subFilterCode) {
+ case INCLUDE:
+ return txFilterCode;
+ case INCLUDE_AND_NEXT_COL:
+ return ReturnCode.INCLUDE_AND_NEXT_COL;
+ case SKIP:
+ return txFilterCode == ReturnCode.INCLUDE ? ReturnCode.SKIP : ReturnCode.NEXT_COL;
+ default:
+ return subFilterCode;
+ }
+ }
+
+ @Override
+ public boolean filterRow() throws IOException {
+ if (cellFilter != null) {
+ return cellFilter.filterRow();
+ }
+ return super.filterRow();
+ }
+
+ @Override
+ public Cell transformCell(Cell cell) throws IOException {
+ // Convert Tephra deletes back into HBase deletes
+ if (tx.getVisibilityLevel() == Transaction.VisibilityLevel.SNAPSHOT_ALL) {
+ if (DeleteTracker.isFamilyDelete(cell)) {
+ return new KeyValue(CellUtil.cloneRow(cell), CellUtil.cloneFamily(cell), null, cell.getTimestamp(),
+ KeyValue.Type.DeleteFamily);
+ } else if (isColumnDelete(cell)) {
+ // Note: in some cases KeyValue.Type.Delete is used in Delete object,
+ // and in some other cases KeyValue.Type.DeleteColumn is used.
+ // Since Tephra cannot distinguish between the two, we return KeyValue.Type.DeleteColumn.
+ // KeyValue.Type.DeleteColumn makes both CellUtil.isDelete and CellUtil.isDeleteColumns return true, and will
+ // work in both cases.
+ return new KeyValue(CellUtil.cloneRow(cell), CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell),
+ cell.getTimestamp(), KeyValue.Type.DeleteColumn);
+ }
+ }
+ return cell;
+ }
+
+ @Override
+ public void reset() throws IOException {
+ deleteTracker.reset();
+ if (cellFilter != null) {
+ cellFilter.reset();
+ }
+ }
+
+ @Override
+ public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException {
+ if (cellFilter != null) {
+ return cellFilter.filterRowKey(buffer, offset, length);
+ }
+ return super.filterRowKey(buffer, offset, length);
+ }
+
+ @Override
+ public boolean filterAllRemaining() throws IOException {
+ if (cellFilter != null) {
+ return cellFilter.filterAllRemaining();
+ }
+ return super.filterAllRemaining();
+ }
+
+ @Override
+ public void filterRowCells(List<Cell> kvs) throws IOException {
+ if (cellFilter != null) {
+ cellFilter.filterRowCells(kvs);
+ } else {
+ super.filterRowCells(kvs);
+ }
+ }
+
+ @Override
+ public boolean hasFilterRow() {
+ if (cellFilter != null) {
+ return cellFilter.hasFilterRow();
+ }
+ return super.hasFilterRow();
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public KeyValue getNextKeyHint(KeyValue currentKV) throws IOException {
+ if (cellFilter != null) {
+ return cellFilter.getNextKeyHint(currentKV);
+ }
+ return super.getNextKeyHint(currentKV);
+ }
+
+ @Override
+ public Cell getNextCellHint(Cell currentKV) throws IOException {
+ if (cellFilter != null) {
+ return cellFilter.getNextCellHint(currentKV);
+ }
+ return super.getNextCellHint(currentKV);
+ }
+
+ @Override
+ public boolean isFamilyEssential(byte[] name) throws IOException {
+ if (cellFilter != null) {
+ return cellFilter.isFamilyEssential(name);
+ }
+ return super.isFamilyEssential(name);
+ }
+
+ @Override
+ public byte[] toByteArray() throws IOException {
+ return super.toByteArray();
+ }
+
+ private boolean isColumnDelete(Cell cell) {
+ return !TxUtils.isPreExistingVersion(cell.getTimestamp()) && cell.getValueLength() == 0 && !allowEmptyValues;
+ }
+
+ private static final class DeleteTracker {
+ private long familyDeleteTs;
+
+ public static boolean isFamilyDelete(Cell cell) {
+ return !TxUtils.isPreExistingVersion(cell.getTimestamp()) &&
+ CellUtil.matchingQualifier(cell, TxConstants.FAMILY_DELETE_QUALIFIER) &&
+ CellUtil.matchingValue(cell, HConstants.EMPTY_BYTE_ARRAY);
+ }
+
+ public void addFamilyDelete(Cell delete) {
+ this.familyDeleteTs = delete.getTimestamp();
+ }
+
+ public boolean isDeleted(Cell cell) {
+ return cell.getTimestamp() <= familyDeleteTs;
+ }
+
+ public void reset() {
+ this.familyDeleteTs = 0;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/d00d9f6d/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/HBase98ConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/HBase98ConfigurationProvider.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/HBase98ConfigurationProvider.java
deleted file mode 100644
index f529783..0000000
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/HBase98ConfigurationProvider.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tephra.hbase98;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.tephra.util.ConfigurationProvider;
-
-/**
- * HBase 0.98 version of {@link ConfigurationProvider}.
- */
-public class HBase98ConfigurationProvider extends ConfigurationProvider {
- @Override
- public Configuration get() {
- return HBaseConfiguration.create();
- }
-
- @Override
- public Configuration get(Configuration baseConf) {
- return HBaseConfiguration.create(baseConf);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/d00d9f6d/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/SecondaryIndexTable.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/SecondaryIndexTable.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/SecondaryIndexTable.java
deleted file mode 100644
index 14a43a9..0000000
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/SecondaryIndexTable.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tephra.hbase98;
-
-import com.google.common.base.Throwables;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.tephra.TransactionContext;
-import org.apache.tephra.TransactionFailureException;
-import org.apache.tephra.distributed.TransactionServiceClient;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * A Transactional SecondaryIndexTable.
- */
-public class SecondaryIndexTable {
- private byte[] secondaryIndex;
- private TransactionAwareHTable transactionAwareHTable;
- private TransactionAwareHTable secondaryIndexTable;
- private TransactionContext transactionContext;
- private final TableName secondaryIndexTableName;
- private static final byte[] secondaryIndexFamily = Bytes.toBytes("secondaryIndexFamily");
- private static final byte[] secondaryIndexQualifier = Bytes.toBytes('r');
- private static final byte[] DELIMITER = new byte[] {0};
-
- public SecondaryIndexTable(TransactionServiceClient transactionServiceClient, HTableInterface hTable,
- byte[] secondaryIndex) {
- secondaryIndexTableName = TableName.valueOf(hTable.getName().getNameAsString() + ".idx");
- HTable secondaryIndexHTable = null;
- HBaseAdmin hBaseAdmin = null;
- try {
- hBaseAdmin = new HBaseAdmin(hTable.getConfiguration());
- if (!hBaseAdmin.tableExists(secondaryIndexTableName)) {
- hBaseAdmin.createTable(new HTableDescriptor(secondaryIndexTableName));
- }
- secondaryIndexHTable = new HTable(hTable.getConfiguration(), secondaryIndexTableName);
- } catch (Exception e) {
- Throwables.propagate(e);
- } finally {
- try {
- hBaseAdmin.close();
- } catch (Exception e) {
- Throwables.propagate(e);
- }
- }
-
- this.secondaryIndex = secondaryIndex;
- this.transactionAwareHTable = new TransactionAwareHTable(hTable);
- this.secondaryIndexTable = new TransactionAwareHTable(secondaryIndexHTable);
- this.transactionContext = new TransactionContext(transactionServiceClient, transactionAwareHTable,
- secondaryIndexTable);
- }
-
- public Result get(Get get) throws IOException {
- return get(Collections.singletonList(get))[0];
- }
-
- public Result[] get(List<Get> gets) throws IOException {
- try {
- transactionContext.start();
- Result[] result = transactionAwareHTable.get(gets);
- transactionContext.finish();
- return result;
- } catch (Exception e) {
- try {
- transactionContext.abort();
- } catch (TransactionFailureException e1) {
- throw new IOException("Could not rollback transaction", e1);
- }
- }
- return null;
- }
-
- public Result[] getByIndex(byte[] value) throws IOException {
- try {
- transactionContext.start();
- Scan scan = new Scan(value, Bytes.add(value, new byte[0]));
- scan.addColumn(secondaryIndexFamily, secondaryIndexQualifier);
- ResultScanner indexScanner = secondaryIndexTable.getScanner(scan);
-
- ArrayList<Get> gets = new ArrayList<Get>();
- for (Result result : indexScanner) {
- for (Cell cell : result.listCells()) {
- gets.add(new Get(cell.getValue()));
- }
- }
- Result[] results = transactionAwareHTable.get(gets);
- transactionContext.finish();
- return results;
- } catch (Exception e) {
- try {
- transactionContext.abort();
- } catch (TransactionFailureException e1) {
- throw new IOException("Could not rollback transaction", e1);
- }
- }
- return null;
- }
-
- public void put(Put put) throws IOException {
- put(Collections.singletonList(put));
- }
-
-
- public void put(List<Put> puts) throws IOException {
- try {
- transactionContext.start();
- ArrayList<Put> secondaryIndexPuts = new ArrayList<Put>();
- for (Put put : puts) {
- List<Put> indexPuts = new ArrayList<Put>();
- Set<Map.Entry<byte[], List<KeyValue>>> familyMap = put.getFamilyMap().entrySet();
- for (Map.Entry<byte [], List<KeyValue>> family : familyMap) {
- for (KeyValue value : family.getValue()) {
- if (Bytes.equals(value.getQualifierArray(), value.getQualifierOffset(), value.getQualifierLength(),
- secondaryIndex, 0, secondaryIndex.length)) {
- byte[] secondaryRow = Bytes.add(value.getQualifier(), DELIMITER,
- Bytes.add(value.getValue(), DELIMITER,
- value.getRow()));
- Put indexPut = new Put(secondaryRow);
- indexPut.add(secondaryIndexFamily, secondaryIndexQualifier, put.getRow());
- indexPuts.add(indexPut);
- }
- }
- }
- secondaryIndexPuts.addAll(indexPuts);
- }
- transactionAwareHTable.put(puts);
- secondaryIndexTable.put(secondaryIndexPuts);
- transactionContext.finish();
- } catch (Exception e) {
- try {
- transactionContext.abort();
- } catch (TransactionFailureException e1) {
- throw new IOException("Could not rollback transaction", e1);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/d00d9f6d/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/TransactionAwareHTable.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/TransactionAwareHTable.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/TransactionAwareHTable.java
deleted file mode 100644
index 9c04d8f..0000000
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/TransactionAwareHTable.java
+++ /dev/null
@@ -1,646 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tephra.hbase98;
-
-import com.google.protobuf.Descriptors.MethodDescriptor;
-import com.google.protobuf.Message;
-import com.google.protobuf.Service;
-import com.google.protobuf.ServiceException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Append;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Increment;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.OperationWithAttributes;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Row;
-import org.apache.hadoop.hbase.client.RowMutations;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.coprocessor.Batch;
-import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
-import org.apache.hadoop.hbase.filter.CompareFilter;
-import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
-import org.apache.tephra.AbstractTransactionAwareTable;
-import org.apache.tephra.Transaction;
-import org.apache.tephra.TransactionAware;
-import org.apache.tephra.TxConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.Set;
-
-/**
- * A Transaction Aware HTable implementation for HBase 0.98. Operations are committed as usual,
- * but upon a failed or aborted transaction, they are rolled back to the state before the transaction
- * was started.
- */
-public class TransactionAwareHTable extends AbstractTransactionAwareTable
- implements HTableInterface, TransactionAware {
-
- private static final Logger LOG = LoggerFactory.getLogger(TransactionAwareHTable.class);
- private final HTableInterface hTable;
-
- /**
- * Create a transactional aware instance of the passed HTable
- *
- * @param hTable underlying HBase table to use
- */
- public TransactionAwareHTable(HTableInterface hTable) {
- this(hTable, false);
- }
-
- /**
- * Create a transactional aware instance of the passed HTable
- *
- * @param hTable underlying HBase table to use
- * @param conflictLevel level of conflict detection to perform (defaults to {@code COLUMN})
- */
- public TransactionAwareHTable(HTableInterface hTable, TxConstants.ConflictDetection conflictLevel) {
- this(hTable, conflictLevel, false);
- }
-
- /**
- * Create a transactional aware instance of the passed HTable, with the option
- * of allowing non-transactional operations.
- * @param hTable underlying HBase table to use
- * @param allowNonTransactional if true, additional operations (checkAndPut, increment, checkAndDelete)
- * will be available, though non-transactional
- */
- public TransactionAwareHTable(HTableInterface hTable, boolean allowNonTransactional) {
- this(hTable, TxConstants.ConflictDetection.COLUMN, allowNonTransactional);
- }
-
- /**
- * Create a transactional aware instance of the passed HTable, with the option
- * of allowing non-transactional operations.
- * @param hTable underlying HBase table to use
- * @param conflictLevel level of conflict detection to perform (defaults to {@code COLUMN})
- * @param allowNonTransactional if true, additional operations (checkAndPut, increment, checkAndDelete)
- * will be available, though non-transactional
- */
- public TransactionAwareHTable(HTableInterface hTable, TxConstants.ConflictDetection conflictLevel,
- boolean allowNonTransactional) {
- super(conflictLevel, allowNonTransactional);
- this.hTable = hTable;
- }
-
- /* AbstractTransactionAwareTable implementation */
-
- @Override
- protected byte[] getTableKey() {
- return getTableName();
- }
-
- @Override
- protected boolean doCommit() throws IOException {
- hTable.flushCommits();
- return true;
- }
-
- @Override
- protected boolean doRollback() throws Exception {
- try {
- // pre-size arraylist of deletes
- int size = 0;
- for (Set<ActionChange> cs : changeSets.values()) {
- size += cs.size();
- }
- List<Delete> rollbackDeletes = new ArrayList<>(size);
- for (Map.Entry<Long, Set<ActionChange>> entry : changeSets.entrySet()) {
- long transactionTimestamp = entry.getKey();
- for (ActionChange change : entry.getValue()) {
- byte[] row = change.getRow();
- byte[] family = change.getFamily();
- byte[] qualifier = change.getQualifier();
- Delete rollbackDelete = new Delete(row);
- makeRollbackOperation(rollbackDelete);
- switch (conflictLevel) {
- case ROW:
- case NONE:
- // issue family delete for the tx write pointer
- rollbackDelete.deleteFamilyVersion(change.getFamily(), transactionTimestamp);
- break;
- case COLUMN:
- if (family != null && qualifier == null) {
- rollbackDelete.deleteFamilyVersion(family, transactionTimestamp);
- } else if (family != null && qualifier != null) {
- rollbackDelete.deleteColumn(family, qualifier, transactionTimestamp);
- }
- break;
- default:
- throw new IllegalStateException("Unknown conflict detection level: " + conflictLevel);
- }
- rollbackDeletes.add(rollbackDelete);
- }
- }
- hTable.delete(rollbackDeletes);
- return true;
- } finally {
- try {
- hTable.flushCommits();
- } catch (Exception e) {
- LOG.error("Could not flush HTable commits", e);
- }
- tx = null;
- changeSets.clear();
- }
- }
-
- /* HTableInterface implementation */
-
- @Override
- public byte[] getTableName() {
- return hTable.getTableName();
- }
-
- @Override
- public TableName getName() {
- return hTable.getName();
- }
-
- @Override
- public Configuration getConfiguration() {
- return hTable.getConfiguration();
- }
-
- @Override
- public HTableDescriptor getTableDescriptor() throws IOException {
- return hTable.getTableDescriptor();
- }
-
- @Override
- public boolean exists(Get get) throws IOException {
- if (tx == null) {
- throw new IOException("Transaction not started");
- }
- return hTable.exists(transactionalizeAction(get));
- }
-
- @Override
- public Boolean[] exists(List<Get> gets) throws IOException {
- if (tx == null) {
- throw new IOException("Transaction not started");
- }
- List<Get> transactionalizedGets = new ArrayList<Get>(gets.size());
- for (Get get : gets) {
- transactionalizedGets.add(transactionalizeAction(get));
- }
- return hTable.exists(transactionalizedGets);
- }
-
- @Override
- public void batch(List<? extends Row> actions, Object[] results) throws IOException, InterruptedException {
- if (tx == null) {
- throw new IOException("Transaction not started");
- }
- hTable.batch(transactionalizeActions(actions), results);
- }
-
- @Override
- public Object[] batch(List<? extends Row> actions) throws IOException, InterruptedException {
- if (tx == null) {
- throw new IOException("Transaction not started");
- }
- return hTable.batch(transactionalizeActions(actions));
- }
-
- @Override
- public <R> void batchCallback(List<? extends Row> actions, Object[] results, Batch.Callback<R> callback) throws
- IOException, InterruptedException {
- if (tx == null) {
- throw new IOException("Transaction not started");
- }
- hTable.batchCallback(transactionalizeActions(actions), results, callback);
- }
-
- @Override
- public <R> Object[] batchCallback(List<? extends Row> actions, Batch.Callback<R> callback) throws IOException,
- InterruptedException {
- if (tx == null) {
- throw new IOException("Transaction not started");
- }
- return hTable.batchCallback(transactionalizeActions(actions), callback);
- }
-
- @Override
- public Result get(Get get) throws IOException {
- if (tx == null) {
- throw new IOException("Transaction not started");
- }
- return hTable.get(transactionalizeAction(get));
- }
-
- @Override
- public Result[] get(List<Get> gets) throws IOException {
- if (tx == null) {
- throw new IOException("Transaction not started");
- }
- ArrayList<Get> transactionalizedGets = new ArrayList<Get>();
- for (Get get : gets) {
- transactionalizedGets.add(transactionalizeAction(get));
- }
- return hTable.get(transactionalizedGets);
- }
-
- @Override
- public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
- if (allowNonTransactional) {
- return hTable.getRowOrBefore(row, family);
- } else {
- throw new UnsupportedOperationException("Operation is not supported transactionally");
- }
- }
-
- @Override
- public ResultScanner getScanner(Scan scan) throws IOException {
- if (tx == null) {
- throw new IOException("Transaction not started");
- }
- return hTable.getScanner(transactionalizeAction(scan));
- }
-
- @Override
- public ResultScanner getScanner(byte[] family) throws IOException {
- if (tx == null) {
- throw new IOException("Transaction not started");
- }
- Scan scan = new Scan();
- scan.addFamily(family);
- return hTable.getScanner(transactionalizeAction(scan));
- }
-
- @Override
- public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException {
- if (tx == null) {
- throw new IOException("Transaction not started");
- }
- Scan scan = new Scan();
- scan.addColumn(family, qualifier);
- return hTable.getScanner(transactionalizeAction(scan));
- }
-
- @Override
- public void put(Put put) throws IOException {
- if (tx == null) {
- throw new IOException("Transaction not started");
- }
- Put txPut = transactionalizeAction(put);
- hTable.put(txPut);
- }
-
- @Override
- public void put(List<Put> puts) throws IOException {
- if (tx == null) {
- throw new IOException("Transaction not started");
- }
- List<Put> transactionalizedPuts = new ArrayList<Put>(puts.size());
- for (Put put : puts) {
- Put txPut = transactionalizeAction(put);
- transactionalizedPuts.add(txPut);
- }
- hTable.put(transactionalizedPuts);
- }
-
- @Override
- public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put) throws IOException {
- if (allowNonTransactional) {
- return hTable.checkAndPut(row, family, qualifier, value, put);
- } else {
- throw new UnsupportedOperationException("Operation is not supported transactionally");
- }
- }
-
- @Override
- public void delete(Delete delete) throws IOException {
- if (tx == null) {
- throw new IOException("Transaction not started");
- }
- hTable.delete(transactionalizeAction(delete));
- }
-
- @Override
- public void delete(List<Delete> deletes) throws IOException {
- if (tx == null) {
- throw new IOException("Transaction not started");
- }
- List<Delete> transactionalizedDeletes = new ArrayList<Delete>(deletes.size());
- for (Delete delete : deletes) {
- Delete txDelete = transactionalizeAction(delete);
- transactionalizedDeletes.add(txDelete);
- }
- hTable.delete(transactionalizedDeletes);
- }
-
- @Override
- public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, byte[] value, Delete delete)
- throws IOException {
- if (allowNonTransactional) {
- return hTable.checkAndDelete(row, family, qualifier, value, delete);
- } else {
- throw new UnsupportedOperationException("Operation is not supported transactionally");
- }
- }
-
- @Override
- public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
- CompareFilter.CompareOp compareOp, byte[] value, RowMutations rowMutations)
- throws IOException {
- if (allowNonTransactional) {
- return hTable.checkAndMutate(row, family, qualifier, compareOp, value, rowMutations);
- }
-
- throw new UnsupportedOperationException("checkAndMutate operation is not supported transactionally");
- }
-
- @Override
- public void mutateRow(RowMutations rm) throws IOException {
- if (tx == null) {
- throw new IOException("Transaction not started");
- }
- RowMutations transactionalMutations = new RowMutations();
- for (Mutation mutation : rm.getMutations()) {
- if (mutation instanceof Put) {
- transactionalMutations.add(transactionalizeAction((Put) mutation));
- } else if (mutation instanceof Delete) {
- transactionalMutations.add(transactionalizeAction((Delete) mutation));
- }
- }
- hTable.mutateRow(transactionalMutations);
- }
-
- @Override
- public Result append(Append append) throws IOException {
- if (allowNonTransactional) {
- return hTable.append(append);
- } else {
- throw new UnsupportedOperationException("Operation is not supported transactionally");
- }
- }
-
- @Override
- public Result increment(Increment increment) throws IOException {
- if (allowNonTransactional) {
- return hTable.increment(increment);
- } else {
- throw new UnsupportedOperationException("Operation is not supported transactionally");
- }
- }
-
- @Override
- public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount) throws IOException {
- if (allowNonTransactional) {
- return hTable.incrementColumnValue(row, family, qualifier, amount);
- } else {
- throw new UnsupportedOperationException("Operation is not supported transactionally");
- }
- }
-
- @Override
- public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, Durability durability)
- throws IOException {
- if (allowNonTransactional) {
- return hTable.incrementColumnValue(row, family, qualifier, amount, durability);
- } else {
- throw new UnsupportedOperationException("Operation is not supported transactionally");
- }
- }
-
- @Override
- public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, boolean writeToWAL)
- throws IOException {
- if (allowNonTransactional) {
- return hTable.incrementColumnValue(row, family, qualifier, amount, writeToWAL);
- } else {
- throw new UnsupportedOperationException("Operation is not supported transactionally");
- }
- }
-
- @Override
- public boolean isAutoFlush() {
- return hTable.isAutoFlush();
- }
-
- @Override
- public void flushCommits() throws IOException {
- hTable.flushCommits();
- }
-
- @Override
- public void close() throws IOException {
- hTable.close();
- }
-
- @Override
- public CoprocessorRpcChannel coprocessorService(byte[] row) {
- return hTable.coprocessorService(row);
- }
-
- @Override
- public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service, byte[] startKey, byte[] endKey,
- Batch.Call<T, R> callable)
- throws ServiceException, Throwable {
- return hTable.coprocessorService(service, startKey, endKey, callable);
- }
-
- @Override
- public <T extends Service, R> void coprocessorService(Class<T> service, byte[] startKey, byte[] endKey,
- Batch.Call<T, R> callable, Batch.Callback<R> callback)
- throws ServiceException, Throwable {
- hTable.coprocessorService(service, startKey, endKey, callable, callback);
- }
-
- @Override
- public <R extends Message> Map<byte[], R> batchCoprocessorService(
- MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey,
- R responsePrototype) throws ServiceException, Throwable {
- return hTable.batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype);
- }
-
- @Override
- public <R extends Message> void batchCoprocessorService(MethodDescriptor methodDescriptor,
- Message request, byte[] startKey, byte[] endKey, R responsePrototype, Callback<R> callback)
- throws ServiceException, Throwable {
- hTable.batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype, callback);
- }
-
- @Override
- public void setAutoFlush(boolean autoFlush) {
- setAutoFlushTo(autoFlush);
- }
-
- @Override
- public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
- hTable.setAutoFlush(autoFlush, clearBufferOnFail);
- }
-
- @Override
- public void setAutoFlushTo(boolean autoFlush) {
- hTable.setAutoFlushTo(autoFlush);
- }
-
- @Override
- public long getWriteBufferSize() {
- return hTable.getWriteBufferSize();
- }
-
- @Override
- public void setWriteBufferSize(long writeBufferSize) throws IOException {
- hTable.setWriteBufferSize(writeBufferSize);
- }
-
- // Helpers to get copies of objects with the timestamp set to the current transaction timestamp.
-
- private Get transactionalizeAction(Get get) throws IOException {
- addToOperation(get, tx);
- return get;
- }
-
- private Scan transactionalizeAction(Scan scan) throws IOException {
- addToOperation(scan, tx);
- return scan;
- }
-
- private Put transactionalizeAction(Put put) throws IOException {
- Put txPut = new Put(put.getRow(), tx.getWritePointer());
- Set<Map.Entry<byte[], List<Cell>>> familyMap = put.getFamilyCellMap().entrySet();
- if (!familyMap.isEmpty()) {
- for (Map.Entry<byte[], List<Cell>> family : familyMap) {
- List<Cell> familyValues = family.getValue();
- if (!familyValues.isEmpty()) {
- for (Cell value : familyValues) {
- txPut.add(value.getFamily(), value.getQualifier(), tx.getWritePointer(), value.getValue());
- addToChangeSet(txPut.getRow(), value.getFamily(), value.getQualifier());
- }
- }
- }
- }
- for (Map.Entry<String, byte[]> entry : put.getAttributesMap().entrySet()) {
- txPut.setAttribute(entry.getKey(), entry.getValue());
- }
- txPut.setDurability(put.getDurability());
- addToOperation(txPut, tx);
- return txPut;
- }
-
- private Delete transactionalizeAction(Delete delete) throws IOException {
- long transactionTimestamp = tx.getWritePointer();
-
- byte[] deleteRow = delete.getRow();
- Delete txDelete = new Delete(deleteRow, transactionTimestamp);
-
- Map<byte[], List<Cell>> familyToDelete = delete.getFamilyCellMap();
- if (familyToDelete.isEmpty()) {
- // perform a row delete if we are using row-level conflict detection
- if (conflictLevel == TxConstants.ConflictDetection.ROW ||
- conflictLevel == TxConstants.ConflictDetection.NONE) {
- // Row delete leaves delete markers in all column families of the table
- // Therefore get all the column families of the hTable from the HTableDescriptor and add them to the changeSet
- for (HColumnDescriptor columnDescriptor : hTable.getTableDescriptor().getColumnFamilies()) {
- // no need to identify individual columns deleted
- addToChangeSet(deleteRow, columnDescriptor.getName(), null);
- }
- } else {
- Result result = get(new Get(delete.getRow()));
- // Delete everything
- NavigableMap<byte[], NavigableMap<byte[], byte[]>> resultMap = result.getNoVersionMap();
- for (Map.Entry<byte[], NavigableMap<byte[], byte[]>> familyEntry : resultMap.entrySet()) {
- NavigableMap<byte[], byte[]> familyColumns = result.getFamilyMap(familyEntry.getKey());
- for (Map.Entry<byte[], byte[]> column : familyColumns.entrySet()) {
- txDelete.deleteColumns(familyEntry.getKey(), column.getKey(), transactionTimestamp);
- addToChangeSet(deleteRow, familyEntry.getKey(), column.getKey());
- }
- }
- }
- } else {
- for (Map.Entry<byte [], List<Cell>> familyEntry : familyToDelete.entrySet()) {
- byte[] family = familyEntry.getKey();
- List<Cell> entries = familyEntry.getValue();
- boolean isFamilyDelete = false;
- if (entries.size() == 1) {
- Cell cell = entries.get(0);
- isFamilyDelete = CellUtil.isDeleteFamily(cell);
- }
- if (isFamilyDelete) {
- if (conflictLevel == TxConstants.ConflictDetection.ROW ||
- conflictLevel == TxConstants.ConflictDetection.NONE) {
- // no need to identify individual columns deleted
- txDelete.deleteFamily(family);
- addToChangeSet(deleteRow, family, null);
- } else {
- Result result = get(new Get(delete.getRow()).addFamily(family));
- // Delete entire family
- NavigableMap<byte[], byte[]> familyColumns = result.getFamilyMap(family);
- for (Map.Entry<byte[], byte[]> column : familyColumns.entrySet()) {
- txDelete.deleteColumns(family, column.getKey(), transactionTimestamp);
- addToChangeSet(deleteRow, family, column.getKey());
- }
- }
- } else {
- for (Cell value : entries) {
- txDelete.deleteColumns(value.getFamily(), value.getQualifier(), transactionTimestamp);
- addToChangeSet(deleteRow, value.getFamily(), value.getQualifier());
- }
- }
- }
- }
- for (Map.Entry<String, byte[]> entry : delete.getAttributesMap().entrySet()) {
- txDelete.setAttribute(entry.getKey(), entry.getValue());
- }
- txDelete.setDurability(delete.getDurability());
- return txDelete;
- }
-
- private List<? extends Row> transactionalizeActions(List<? extends Row> actions) throws IOException {
- List<Row> transactionalizedActions = new ArrayList<>(actions.size());
- for (Row action : actions) {
- if (action instanceof Get) {
- transactionalizedActions.add(transactionalizeAction((Get) action));
- } else if (action instanceof Put) {
- transactionalizedActions.add(transactionalizeAction((Put) action));
- } else if (action instanceof Delete) {
- transactionalizedActions.add(transactionalizeAction((Delete) action));
- } else {
- transactionalizedActions.add(action);
- }
- }
- return transactionalizedActions;
- }
-
- public void addToOperation(OperationWithAttributes op, Transaction tx) throws IOException {
- op.setAttribute(TxConstants.TX_OPERATION_ATTRIBUTE_KEY, txCodec.encode(tx));
- }
-
- protected void makeRollbackOperation(Delete delete) {
- delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/d00d9f6d/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/coprocessor/CellSkipFilter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/coprocessor/CellSkipFilter.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/coprocessor/CellSkipFilter.java
deleted file mode 100644
index ceeef6f..0000000
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/coprocessor/CellSkipFilter.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tephra.hbase98.coprocessor;
-
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.FilterBase;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * {@link Filter} that encapsulates another {@link Filter}. It remembers the last {@link KeyValue}
- * for which the underlying filter returned the {@link ReturnCode#NEXT_COL} or {@link ReturnCode#INCLUDE_AND_NEXT_COL},
- * so that when {@link #filterKeyValue} is called again for the same {@link KeyValue} with different
- * version, it returns {@link ReturnCode#NEXT_COL} directly without consulting the underlying {@link Filter}.
- * Please see TEPHRA-169 for more details.
- */
-public class CellSkipFilter extends FilterBase {
- private final Filter filter;
- // remember the previous keyvalue processed by filter when the return code was NEXT_COL or INCLUDE_AND_NEXT_COL
- private KeyValue skipColumn = null;
-
- public CellSkipFilter(Filter filter) {
- this.filter = filter;
- }
-
- /**
- * Determines whether the current cell should be skipped. The cell will be skipped
- * if the previous keyvalue had the same key as the current cell. This means filter already responded
- * for the previous keyvalue with ReturnCode.NEXT_COL or ReturnCode.INCLUDE_AND_NEXT_COL.
- * @param cell the {@link Cell} to be tested for skipping
- * @return true is current cell should be skipped, false otherwise
- */
- private boolean skipCellVersion(Cell cell) {
- return skipColumn != null
- && skipColumn.matchingRow(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())
- && skipColumn.matchingColumn(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
- cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
- }
-
- @Override
- public ReturnCode filterKeyValue(Cell cell) throws IOException {
- if (skipCellVersion(cell)) {
- return ReturnCode.NEXT_COL;
- }
-
- ReturnCode code = filter.filterKeyValue(cell);
- if (code == ReturnCode.NEXT_COL || code == ReturnCode.INCLUDE_AND_NEXT_COL) {
- // only store the reference to the keyvalue if we are returning NEXT_COL or INCLUDE_AND_NEXT_COL
- skipColumn = KeyValue.createFirstOnRow(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
- cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
- cell.getQualifierArray(), cell.getQualifierOffset(),
- cell.getQualifierLength());
- } else {
- skipColumn = null;
- }
- return code;
- }
-
- @Override
- public boolean filterRow() throws IOException {
- return filter.filterRow();
- }
-
- @Override
- public Cell transformCell(Cell cell) throws IOException {
- return filter.transformCell(cell);
- }
-
- @Override
- public void reset() throws IOException {
- filter.reset();
- }
-
- @Override
- public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException {
- return filter.filterRowKey(buffer, offset, length);
- }
-
- @Override
- public boolean filterAllRemaining() throws IOException {
- return filter.filterAllRemaining();
- }
-
- @Override
- public void filterRowCells(List<Cell> kvs) throws IOException {
- filter.filterRowCells(kvs);
- }
-
- @Override
- public boolean hasFilterRow() {
- return filter.hasFilterRow();
- }
-
- @SuppressWarnings("deprecation")
- @Override
- public KeyValue getNextKeyHint(KeyValue currentKV) throws IOException {
- return filter.getNextKeyHint(currentKV);
- }
-
- @Override
- public Cell getNextCellHint(Cell currentKV) throws IOException {
- return filter.getNextCellHint(currentKV);
- }
-
- @Override
- public boolean isFamilyEssential(byte[] name) throws IOException {
- return filter.isFamilyEssential(name);
- }
-
- @Override
- public byte[] toByteArray() throws IOException {
- return filter.toByteArray();
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/d00d9f6d/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/coprocessor/TransactionFilters.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/coprocessor/TransactionFilters.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/coprocessor/TransactionFilters.java
deleted file mode 100644
index 889c70a..0000000
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/coprocessor/TransactionFilters.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tephra.hbase98.coprocessor;
-
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.regionserver.ScanType;
-import org.apache.tephra.Transaction;
-
-import java.util.Map;
-import javax.annotation.Nullable;
-
-/**
- * Factory class for providing {@link Filter} instances.
- */
-public class TransactionFilters {
- /**
- * Creates a new {@link org.apache.hadoop.hbase.filter.Filter} for returning data only from visible transactions.
- *
- * @param tx the current transaction to apply. Only data visible to this transaction will be returned.
- * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
- * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
- * these will be interpreted as "delete" markers and the column will be filtered out
- * @param scanType the type of scan operation being performed
- */
- public static Filter getVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
- ScanType scanType) {
- return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, null));
- }
-
- /**
- * Creates a new {@link org.apache.hadoop.hbase.filter.Filter} for returning data only from visible transactions.
- *
- * @param tx the current transaction to apply. Only data visible to this transaction will be returned.
- * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
- * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
- * these will be interpreted as "delete" markers and the column will be filtered out
- * @param scanType the type of scan operation being performed
- * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by
- * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then
- * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead.
- */
- public static Filter getVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
- ScanType scanType, @Nullable Filter cellFilter) {
- return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter));
- }
-}