You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2016/05/11 19:59:01 UTC
[23/30] incubator-tephra git commit: TEPHRA-168 Remove HBase version
from co-processor package names
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/d00d9f6d/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/coprocessor/TransactionProcessor.java
deleted file mode 100644
index cc1915d..0000000
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/coprocessor/TransactionProcessor.java
+++ /dev/null
@@ -1,359 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tephra.hbase98.coprocessor;
-
-import com.google.common.base.Supplier;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.CoprocessorEnvironment;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.OperationWithAttributes;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.FilterBase;
-import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
-import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.hadoop.hbase.regionserver.ScanType;
-import org.apache.hadoop.hbase.regionserver.Store;
-import org.apache.hadoop.hbase.regionserver.StoreScanner;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.tephra.Transaction;
-import org.apache.tephra.TransactionCodec;
-import org.apache.tephra.TxConstants;
-import org.apache.tephra.coprocessor.TransactionStateCache;
-import org.apache.tephra.coprocessor.TransactionStateCacheSupplier;
-import org.apache.tephra.persist.TransactionVisibilityState;
-import org.apache.tephra.util.TxUtils;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableSet;
-import java.util.Set;
-
-/**
- * {@code org.apache.hadoop.hbase.coprocessor.RegionObserver} coprocessor that handles server-side processing
- * for transactions:
- * <ul>
- * <li>applies filtering to exclude data from invalid and in-progress transactions</li>
- * <li>overrides the scanner returned for flush and compaction to drop data written by invalidated transactions,
- * or expired due to TTL.</li>
- * </ul>
- *
- * <p>In order to use this coprocessor for transactions, configure the class on any table involved in transactions,
- * or on all user tables by adding the following to hbase-site.xml:
- * {@code
- * <property>
- * <name>hbase.coprocessor.region.classes</name>
- * <value>TransactionProcessor</value>
- * </property>
- * }
- * </p>
- *
- * <p>HBase {@code Get} and {@code Scan} operations should have the current transaction serialized on to the operation
- * as an attribute:
- * {@code
- * Transaction t = ...;
- * Get get = new Get(...);
- * TransactionCodec codec = new TransactionCodec();
- * codec.addToOperation(get, t);
- * }
- * </p>
- */
-public class TransactionProcessor extends BaseRegionObserver {
- private static final Log LOG = LogFactory.getLog(TransactionProcessor.class);
-
- private TransactionStateCache cache;
- private final TransactionCodec txCodec;
- protected Map<byte[], Long> ttlByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
- protected boolean allowEmptyValues = TxConstants.ALLOW_EMPTY_VALUES_DEFAULT;
- protected boolean readNonTxnData = TxConstants.DEFAULT_READ_NON_TX_DATA;
-
- public TransactionProcessor() {
- this.txCodec = new TransactionCodec();
- }
-
- /* RegionObserver implementation */
-
- @Override
- public void start(CoprocessorEnvironment e) throws IOException {
- if (e instanceof RegionCoprocessorEnvironment) {
- RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e;
- Supplier<TransactionStateCache> cacheSupplier = getTransactionStateCacheSupplier(env);
- this.cache = cacheSupplier.get();
-
- HTableDescriptor tableDesc = env.getRegion().getTableDesc();
- for (HColumnDescriptor columnDesc : tableDesc.getFamilies()) {
- String columnTTL = columnDesc.getValue(TxConstants.PROPERTY_TTL);
- long ttl = 0;
- if (columnTTL != null) {
- try {
- ttl = Long.parseLong(columnTTL);
- LOG.info("Family " + columnDesc.getNameAsString() + " has TTL of " + columnTTL);
- } catch (NumberFormatException nfe) {
- LOG.warn("Invalid TTL value configured for column family " + columnDesc.getNameAsString() +
- ", value = " + columnTTL);
- }
- }
- ttlByFamily.put(columnDesc.getName(), ttl);
- }
-
- this.allowEmptyValues = env.getConfiguration().getBoolean(TxConstants.ALLOW_EMPTY_VALUES_KEY,
- TxConstants.ALLOW_EMPTY_VALUES_DEFAULT);
- this.readNonTxnData = Boolean.valueOf(tableDesc.getValue(TxConstants.READ_NON_TX_DATA));
- if (readNonTxnData) {
- LOG.info("Reading pre-existing data enabled for table " + tableDesc.getNameAsString());
- }
- }
- }
-
- protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
- return new TransactionStateCacheSupplier(env.getConfiguration());
- }
-
- @Override
- public void stop(CoprocessorEnvironment e) throws IOException {
- // nothing to do
- }
-
- @Override
- public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results)
- throws IOException {
- Transaction tx = getFromOperation(get);
- if (tx != null) {
- projectFamilyDeletes(get);
- get.setMaxVersions();
- get.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttlByFamily, tx, readNonTxnData),
- TxUtils.getMaxVisibleTimestamp(tx));
- Filter newFilter = getTransactionFilter(tx, ScanType.USER_SCAN, get.getFilter());
- get.setFilter(newFilter);
- }
- }
-
- @Override
- public void preDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit,
- Durability durability) throws IOException {
- // Translate deletes into our own delete tombstones
- // Since HBase deletes cannot be undone, we need to translate deletes into special puts, which allows
- // us to rollback the changes (by a real delete) if the transaction fails
-
- // Deletes that are part of a transaction rollback do not need special handling.
- // They will never be rolled back, so are performed as normal HBase deletes.
- if (isRollbackOperation(delete)) {
- return;
- }
-
- // Other deletes are client-initiated and need to be translated into our own tombstones
- // TODO: this should delegate to the DeleteStrategy implementation.
- Put deleteMarkers = new Put(delete.getRow(), delete.getTimeStamp());
- for (byte[] family : delete.getFamilyCellMap().keySet()) {
- List<Cell> familyCells = delete.getFamilyCellMap().get(family);
- if (isFamilyDelete(familyCells)) {
- deleteMarkers.add(family, TxConstants.FAMILY_DELETE_QUALIFIER, familyCells.get(0).getTimestamp(),
- HConstants.EMPTY_BYTE_ARRAY);
- } else {
- int cellSize = familyCells.size();
- for (int i = 0; i < cellSize; i++) {
- Cell cell = familyCells.get(i);
- deleteMarkers.add(family, CellUtil.cloneQualifier(cell), cell.getTimestamp(),
- HConstants.EMPTY_BYTE_ARRAY);
- }
- }
- }
- for (Map.Entry<String, byte[]> entry : delete.getAttributesMap().entrySet()) {
- deleteMarkers.setAttribute(entry.getKey(), entry.getValue());
- }
- e.getEnvironment().getRegion().put(deleteMarkers);
- // skip normal delete handling
- e.bypass();
- }
-
- private boolean isFamilyDelete(List<Cell> familyCells) {
- return familyCells.size() == 1 && CellUtil.isDeleteFamily(familyCells.get(0));
- }
-
- @Override
- public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan, RegionScanner s)
- throws IOException {
- Transaction tx = getFromOperation(scan);
- if (tx != null) {
- projectFamilyDeletes(scan);
- scan.setMaxVersions();
- scan.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttlByFamily, tx, readNonTxnData),
- TxUtils.getMaxVisibleTimestamp(tx));
- Filter newFilter = getTransactionFilter(tx, ScanType.USER_SCAN, scan.getFilter());
- scan.setFilter(newFilter);
- }
- return s;
- }
-
- /**
- * Ensures that family delete markers are present in the columns requested for any scan operation.
- * @param scan The original scan request
- * @return The modified scan request with the family delete qualifiers represented
- */
- private Scan projectFamilyDeletes(Scan scan) {
- for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
- NavigableSet<byte[]> columns = entry.getValue();
- // wildcard scans will automatically include the delete marker, so only need to add it when we have
- // explicit columns listed
- if (columns != null && !columns.isEmpty()) {
- scan.addColumn(entry.getKey(), TxConstants.FAMILY_DELETE_QUALIFIER);
- }
- }
- return scan;
- }
-
- /**
- * Ensures that family delete markers are present in the columns requested for any get operation.
- * @param get The original get request
- * @return The modified get request with the family delete qualifiers represented
- */
- private Get projectFamilyDeletes(Get get) {
- for (Map.Entry<byte[], NavigableSet<byte[]>> entry : get.getFamilyMap().entrySet()) {
- NavigableSet<byte[]> columns = entry.getValue();
- // wildcard scans will automatically include the delete marker, so only need to add it when we have
- // explicit columns listed
- if (columns != null && !columns.isEmpty()) {
- get.addColumn(entry.getKey(), TxConstants.FAMILY_DELETE_QUALIFIER);
- }
- }
- return get;
- }
-
- @Override
- public InternalScanner preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
- KeyValueScanner memstoreScanner, InternalScanner scanner)
- throws IOException {
- return createStoreScanner(c.getEnvironment(), "flush", cache.getLatestState(), store,
- Collections.singletonList(memstoreScanner), ScanType.COMPACT_RETAIN_DELETES,
- HConstants.OLDEST_TIMESTAMP);
- }
-
- @Override
- public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
- List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s,
- CompactionRequest request)
- throws IOException {
- return createStoreScanner(c.getEnvironment(), "compaction", cache.getLatestState(), store, scanners,
- scanType, earliestPutTs);
- }
-
- protected InternalScanner createStoreScanner(RegionCoprocessorEnvironment env, String action,
- TransactionVisibilityState snapshot, Store store,
- List<? extends KeyValueScanner> scanners, ScanType type,
- long earliestPutTs) throws IOException {
- if (snapshot == null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Region " + env.getRegion().getRegionNameAsString() +
- ", no current transaction state found, defaulting to normal " + action + " scanner");
- }
- return null;
- }
-
- // construct a dummy transaction from the latest snapshot
- Transaction dummyTx = TxUtils.createDummyTransaction(snapshot);
- Scan scan = new Scan();
- // need to see all versions, since we filter out excludes and applications may rely on multiple versions
- scan.setMaxVersions();
- scan.setFilter(
- new IncludeInProgressFilter(dummyTx.getVisibilityUpperBound(),
- snapshot.getInvalid(),
- getTransactionFilter(dummyTx, type, null)));
-
- return new StoreScanner(store, store.getScanInfo(), scan, scanners,
- type, store.getSmallestReadPoint(), earliestPutTs);
- }
-
- private Transaction getFromOperation(OperationWithAttributes op) throws IOException {
- byte[] encoded = op.getAttribute(TxConstants.TX_OPERATION_ATTRIBUTE_KEY);
- if (encoded == null) {
- // to support old clients
- encoded = op.getAttribute(TxConstants.OLD_TX_OPERATION_ATTRIBUTE_KEY);
- }
- if (encoded != null) {
- return txCodec.decode(encoded);
- }
- return null;
- }
-
- private boolean isRollbackOperation(OperationWithAttributes op) throws IOException {
- return op.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null ||
- // to support old clients
- op.getAttribute(TxConstants.OLD_TX_ROLLBACK_ATTRIBUTE_KEY) != null;
- }
-
- /**
- * Derived classes can override this method to customize the filter used to return data visible for the current
- * transaction.
- *
- * @param tx the current transaction to apply
- * @param type the type of scan being performed
- */
- protected Filter getTransactionFilter(Transaction tx, ScanType type, Filter filter) {
- return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter);
- }
-
- /**
- * Filter used to include cells visible to in-progress transactions on flush and commit.
- */
- static class IncludeInProgressFilter extends FilterBase {
- private final long visibilityUpperBound;
- private final Set<Long> invalidIds;
- private final Filter txFilter;
-
- public IncludeInProgressFilter(long upperBound, Collection<Long> invalids, Filter transactionFilter) {
- this.visibilityUpperBound = upperBound;
- this.invalidIds = Sets.newHashSet(invalids);
- this.txFilter = transactionFilter;
- }
-
- @Override
- public ReturnCode filterKeyValue(Cell cell) throws IOException {
- // include all cells visible to in-progress transactions, except for those already marked as invalid
- long ts = cell.getTimestamp();
- if (ts > visibilityUpperBound) {
- // include everything that could still be in-progress except invalids
- if (invalidIds.contains(ts)) {
- return ReturnCode.SKIP;
- }
- return ReturnCode.INCLUDE;
- }
- return txFilter.filterKeyValue(cell);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/d00d9f6d/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/coprocessor/TransactionVisibilityFilter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/coprocessor/TransactionVisibilityFilter.java
deleted file mode 100644
index d01f83c..0000000
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/coprocessor/TransactionVisibilityFilter.java
+++ /dev/null
@@ -1,313 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tephra.hbase98.coprocessor;
-
-import com.google.common.collect.Maps;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.FilterBase;
-import org.apache.hadoop.hbase.regionserver.ScanType;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.tephra.Transaction;
-import org.apache.tephra.TxConstants;
-import org.apache.tephra.util.TxUtils;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import javax.annotation.Nullable;
-
-/**
- * Applies filtering of data based on transactional visibility (HBase 0.98+ specific version).
- * Note: this is intended for server-side use only, as additional properties need to be set on
- * any {@code Scan} or {@code Get} operation performed.
- */
-public class TransactionVisibilityFilter extends FilterBase {
- private final Transaction tx;
- // oldest visible timestamp by column family, used to apply TTL when reading
- private final Map<byte[], Long> oldestTsByFamily;
- // if false, empty values will be interpreted as deletes
- private final boolean allowEmptyValues;
- // whether or not we can remove delete markers
- // these can only be safely removed when we are traversing all storefiles
- private final boolean clearDeletes;
- // optional sub-filter to apply to visible cells
- private final Filter cellFilter;
-
- // since we traverse KVs in order, cache the current oldest TS to avoid map lookups per KV
- private byte[] currentFamily = new byte[0];
- private long currentOldestTs;
-
- private DeleteTracker deleteTracker = new DeleteTracker();
-
- /**
- * Creates a new {@link org.apache.hadoop.hbase.filter.Filter} for returning data only from visible transactions.
- *
- * @param tx the current transaction to apply. Only data visible to this transaction will be returned.
- * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
- * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
- * these will be interpreted as "delete" markers and the column will be filtered out
- * @param scanType the type of scan operation being performed
- */
- public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
- ScanType scanType) {
- this(tx, ttlByFamily, allowEmptyValues, scanType, null);
- }
-
- /**
- * Creates a new {@link org.apache.hadoop.hbase.filter.Filter} for returning data only from visible transactions.
- *
- * @param tx the current transaction to apply. Only data visible to this transaction will be returned.
- * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
- * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
- * these will be interpreted as "delete" markers and the column will be filtered out
- * @param scanType the type of scan operation being performed
- * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by
- * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then
- * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead.
- */
- public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
- ScanType scanType, @Nullable Filter cellFilter) {
- this.tx = tx;
- this.oldestTsByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
- for (Map.Entry<byte[], Long> ttlEntry : ttlByFamily.entrySet()) {
- long familyTTL = ttlEntry.getValue();
- oldestTsByFamily.put(ttlEntry.getKey(),
- familyTTL <= 0 ? 0 : tx.getVisibilityUpperBound() - familyTTL * TxConstants.MAX_TX_PER_MS);
- }
- this.allowEmptyValues = allowEmptyValues;
- this.clearDeletes =
- scanType == ScanType.COMPACT_DROP_DELETES ||
- (scanType == ScanType.USER_SCAN && tx.getVisibilityLevel() != Transaction.VisibilityLevel.SNAPSHOT_ALL);
- this.cellFilter = cellFilter;
- }
-
- @Override
- public ReturnCode filterKeyValue(Cell cell) throws IOException {
- if (!CellUtil.matchingFamily(cell, currentFamily)) {
- // column family changed
- currentFamily = CellUtil.cloneFamily(cell);
- Long familyOldestTs = oldestTsByFamily.get(currentFamily);
- currentOldestTs = familyOldestTs != null ? familyOldestTs : 0;
- deleteTracker.reset();
- }
- // need to apply TTL for the column family here
- long kvTimestamp = cell.getTimestamp();
- if (TxUtils.getTimestampForTTL(kvTimestamp) < currentOldestTs) {
- // passed TTL for this column, seek to next
- return ReturnCode.NEXT_COL;
- } else if (tx.isVisible(kvTimestamp)) {
- // Return all writes done by current transaction (including deletes) for VisibilityLevel.SNAPSHOT_ALL
- if (tx.getVisibilityLevel() == Transaction.VisibilityLevel.SNAPSHOT_ALL && tx.isCurrentWrite(kvTimestamp)) {
- // cell is visible
- // visibility SNAPSHOT_ALL needs all matches
- return runSubFilter(ReturnCode.INCLUDE, cell);
- }
- if (DeleteTracker.isFamilyDelete(cell)) {
- deleteTracker.addFamilyDelete(cell);
- if (clearDeletes) {
- return ReturnCode.NEXT_COL;
- } else {
- // cell is visible
- // as soon as we find a KV to include we can move to the next column
- return runSubFilter(ReturnCode.INCLUDE_AND_NEXT_COL, cell);
- }
- }
- // check if masked by family delete
- if (deleteTracker.isDeleted(cell)) {
- return ReturnCode.NEXT_COL;
- }
- // check for column delete
- if (isColumnDelete(cell)) {
- if (clearDeletes) {
- // skip "deleted" cell
- return ReturnCode.NEXT_COL;
- } else {
- // keep the marker but skip any remaining versions
- return runSubFilter(ReturnCode.INCLUDE_AND_NEXT_COL, cell);
- }
- }
- // cell is visible
- // as soon as we find a KV to include we can move to the next column
- return runSubFilter(ReturnCode.INCLUDE_AND_NEXT_COL, cell);
- } else {
- return ReturnCode.SKIP;
- }
- }
-
- private ReturnCode runSubFilter(ReturnCode txFilterCode, Cell cell) throws IOException {
- if (cellFilter != null) {
- ReturnCode subFilterCode = cellFilter.filterKeyValue(cell);
- return determineReturnCode(txFilterCode, subFilterCode);
- }
- return txFilterCode;
- }
-
- /**
- * Determines the return code of TransactionVisibilityFilter based on sub-filter's return code.
- * Sub-filter can only exclude cells included by TransactionVisibilityFilter, i.e., sub-filter's
- * INCLUDE will be ignored. This behavior makes sure that sub-filter only sees cell versions valid for the
- * given transaction. If sub-filter needs to see older versions of cell, then this method can be overridden.
- *
- * @param txFilterCode return code from TransactionVisibilityFilter
- * @param subFilterCode return code from sub-filter
- * @return final return code
- */
- protected ReturnCode determineReturnCode(ReturnCode txFilterCode, ReturnCode subFilterCode) {
- // Return the more restrictive of the two filter responses
- switch (subFilterCode) {
- case INCLUDE:
- return txFilterCode;
- case INCLUDE_AND_NEXT_COL:
- return ReturnCode.INCLUDE_AND_NEXT_COL;
- case SKIP:
- return txFilterCode == ReturnCode.INCLUDE ? ReturnCode.SKIP : ReturnCode.NEXT_COL;
- default:
- return subFilterCode;
- }
- }
-
- @Override
- public boolean filterRow() throws IOException {
- if (cellFilter != null) {
- return cellFilter.filterRow();
- }
- return super.filterRow();
- }
-
- @Override
- public Cell transformCell(Cell cell) throws IOException {
- // Convert Tephra deletes back into HBase deletes
- if (tx.getVisibilityLevel() == Transaction.VisibilityLevel.SNAPSHOT_ALL) {
- if (DeleteTracker.isFamilyDelete(cell)) {
- return new KeyValue(CellUtil.cloneRow(cell), CellUtil.cloneFamily(cell), null, cell.getTimestamp(),
- KeyValue.Type.DeleteFamily);
- } else if (isColumnDelete(cell)) {
- // Note: in some cases KeyValue.Type.Delete is used in Delete object,
- // and in some other cases KeyValue.Type.DeleteColumn is used.
- // Since Tephra cannot distinguish between the two, we return KeyValue.Type.DeleteColumn.
- // KeyValue.Type.DeleteColumn makes both CellUtil.isDelete and CellUtil.isDeleteColumns return true, and will
- // work in both cases.
- return new KeyValue(CellUtil.cloneRow(cell), CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell),
- cell.getTimestamp(), KeyValue.Type.DeleteColumn);
- }
- }
- return cell;
- }
-
- @Override
- public void reset() throws IOException {
- deleteTracker.reset();
- if (cellFilter != null) {
- cellFilter.reset();
- }
- }
-
- @Override
- public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException {
- if (cellFilter != null) {
- return cellFilter.filterRowKey(buffer, offset, length);
- }
- return super.filterRowKey(buffer, offset, length);
- }
-
- @Override
- public boolean filterAllRemaining() throws IOException {
- if (cellFilter != null) {
- return cellFilter.filterAllRemaining();
- }
- return super.filterAllRemaining();
- }
-
- @Override
- public void filterRowCells(List<Cell> kvs) throws IOException {
- if (cellFilter != null) {
- cellFilter.filterRowCells(kvs);
- } else {
- super.filterRowCells(kvs);
- }
- }
-
- @Override
- public boolean hasFilterRow() {
- if (cellFilter != null) {
- return cellFilter.hasFilterRow();
- }
- return super.hasFilterRow();
- }
-
- @SuppressWarnings("deprecation")
- @Override
- public KeyValue getNextKeyHint(KeyValue currentKV) throws IOException {
- if (cellFilter != null) {
- return cellFilter.getNextKeyHint(currentKV);
- }
- return super.getNextKeyHint(currentKV);
- }
-
- @Override
- public Cell getNextCellHint(Cell currentKV) throws IOException {
- if (cellFilter != null) {
- return cellFilter.getNextCellHint(currentKV);
- }
- return super.getNextCellHint(currentKV);
- }
-
- @Override
- public boolean isFamilyEssential(byte[] name) throws IOException {
- if (cellFilter != null) {
- return cellFilter.isFamilyEssential(name);
- }
- return super.isFamilyEssential(name);
- }
-
- @Override
- public byte[] toByteArray() throws IOException {
- return super.toByteArray();
- }
-
- private boolean isColumnDelete(Cell cell) {
- return !TxUtils.isPreExistingVersion(cell.getTimestamp()) && cell.getValueLength() == 0 && !allowEmptyValues;
- }
-
- private static final class DeleteTracker {
- private long familyDeleteTs;
-
- public static boolean isFamilyDelete(Cell cell) {
- return !TxUtils.isPreExistingVersion(cell.getTimestamp()) &&
- CellUtil.matchingQualifier(cell, TxConstants.FAMILY_DELETE_QUALIFIER) &&
- CellUtil.matchingValue(cell, HConstants.EMPTY_BYTE_ARRAY);
- }
-
- public void addFamilyDelete(Cell delete) {
- this.familyDeleteTs = delete.getTimestamp();
- }
-
- public boolean isDeleted(Cell cell) {
- return cell.getTimestamp() <= familyDeleteTs;
- }
-
- public void reset() {
- this.familyDeleteTs = 0;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/d00d9f6d/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/HBase98ConfigurationProviderTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/HBase98ConfigurationProviderTest.java b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/HBase98ConfigurationProviderTest.java
new file mode 100644
index 0000000..287fd54
--- /dev/null
+++ b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/HBase98ConfigurationProviderTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase;
+
+import org.apache.tephra.util.AbstractConfigurationProviderTest;
+import org.apache.tephra.util.HBaseVersion;
+
+/**
+ * Test for HBase 0.98 version specific behavior.
+ */
+public class HBase98ConfigurationProviderTest extends AbstractConfigurationProviderTest {
+ @Override
+ protected HBaseVersion.Version getExpectedVersion() {
+ return HBaseVersion.Version.HBASE_98;
+ }
+}