You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2016/09/08 08:10:05 UTC

[6/7] incubator-tephra git commit: TEPHRA-176, TEPHRA-177: Adding maven modules for CDH-5.7, 5.8 support, HBase-1.1 and HBase-1.2 modules

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2246abff/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/CellSkipFilter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/CellSkipFilter.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/CellSkipFilter.java
new file mode 100644
index 0000000..d8664f4
--- /dev/null
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/CellSkipFilter.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.coprocessor;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * {@link Filter} that encapsulates another {@link Filter}. It remembers the last {@link KeyValue}
+ * for which the underlying filter returned the {@link ReturnCode#NEXT_COL} or {@link ReturnCode#INCLUDE_AND_NEXT_COL},
+ * so that when {@link #filterKeyValue} is called again for the same {@link KeyValue} with different
+ * version, it returns {@link ReturnCode#NEXT_COL} directly without consulting the underlying {@link Filter}.
+ * Please see TEPHRA-169 for more details.
+ */
+public class CellSkipFilter extends FilterBase {
+  private final Filter filter;
+  // remember the previous keyvalue processed by filter when the return code was NEXT_COL or INCLUDE_AND_NEXT_COL
+  private KeyValue skipColumn = null;
+
+  public CellSkipFilter(Filter filter) {
+    this.filter = filter;
+  }
+
+  /**
+   * Determines whether the current cell should be skipped. The cell will be skipped
+   * if the previous keyvalue had the same key as the current cell. This means filter already responded
+   * for the previous keyvalue with ReturnCode.NEXT_COL or ReturnCode.INCLUDE_AND_NEXT_COL.
+   * @param cell the {@link Cell} to be tested for skipping
+   * @return true is current cell should be skipped, false otherwise
+   */
+  private boolean skipCellVersion(Cell cell) {
+    return skipColumn != null
+      && CellUtil.matchingRow(cell, skipColumn.getRowArray(), skipColumn.getRowOffset(),
+                              skipColumn.getRowLength())
+      && CellUtil.matchingFamily(cell, skipColumn.getFamilyArray(), skipColumn.getFamilyOffset(),
+                                 skipColumn.getFamilyLength())
+      && CellUtil.matchingQualifier(cell, skipColumn.getQualifierArray(), skipColumn.getQualifierOffset(),
+                                    skipColumn.getQualifierLength());
+  }
+
+  @Override
+  public ReturnCode filterKeyValue(Cell cell) throws IOException {
+    if (skipCellVersion(cell)) {
+      return ReturnCode.NEXT_COL;
+    }
+
+    ReturnCode code = filter.filterKeyValue(cell);
+    if (code == ReturnCode.NEXT_COL || code == ReturnCode.INCLUDE_AND_NEXT_COL) {
+      // only store the reference to the keyvalue if we are returning NEXT_COL or INCLUDE_AND_NEXT_COL
+      skipColumn = KeyValueUtil.createFirstOnRow(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
+                                                   cell.getFamilyArray(), cell.getFamilyOffset(),
+                                                   cell.getFamilyLength(), cell.getQualifierArray(),
+                                                   cell.getQualifierOffset(), cell.getQualifierLength());
+    } else {
+      skipColumn = null;
+    }
+    return code;
+  }
+
+  @Override
+  public boolean filterRow() throws IOException {
+    return filter.filterRow();
+  }
+
+  @Override
+  public Cell transformCell(Cell cell) throws IOException {
+    return filter.transformCell(cell);
+  }
+
+  @Override
+  public void reset() throws IOException {
+    filter.reset();
+  }
+
+  @Override
+  public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException {
+    return filter.filterRowKey(buffer, offset, length);
+  }
+
+  @Override
+  public boolean filterAllRemaining() throws IOException {
+    return filter.filterAllRemaining();
+  }
+
+  @Override
+  public void filterRowCells(List<Cell> kvs) throws IOException {
+    filter.filterRowCells(kvs);
+  }
+
+  @Override
+  public boolean hasFilterRow() {
+    return filter.hasFilterRow();
+  }
+
+  @SuppressWarnings("deprecation")
+  @Override
+  public KeyValue getNextKeyHint(KeyValue currentKV) throws IOException {
+    return filter.getNextKeyHint(currentKV);
+  }
+
+  @Override
+  public Cell getNextCellHint(Cell currentKV) throws IOException {
+    return filter.getNextCellHint(currentKV);
+  }
+
+  @Override
+  public boolean isFamilyEssential(byte[] name) throws IOException {
+    return filter.isFamilyEssential(name);
+  }
+
+  @Override
+  public byte[] toByteArray() throws IOException {
+    return filter.toByteArray();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2246abff/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java
new file mode 100644
index 0000000..0ca9f9c
--- /dev/null
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.coprocessor;
+
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.tephra.Transaction;
+
+import java.util.Map;
+import javax.annotation.Nullable;
+
+/**
+ * Factory class for providing {@link Filter} instances.
+ */
+public class TransactionFilters {
+  /**
+   * Creates a new {@link org.apache.hadoop.hbase.filter.Filter} for returning data only from visible transactions.
+   *
+   * @param tx the current transaction to apply.  Only data visible to this transaction will be returned.
+   * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
+   * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
+   *                         these will be interpreted as "delete" markers and the column will be filtered out
+   * @param scanType the type of scan operation being performed
+   */
+  public static Filter getVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
+                                           ScanType scanType) {
+    return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, null));
+  }
+
+  /**
+   * Creates a new {@link org.apache.hadoop.hbase.filter.Filter} for returning data only from visible transactions.
+   *
+   * @param tx the current transaction to apply.  Only data visible to this transaction will be returned.
+   * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
+   * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
+   *                         these will be interpreted as "delete" markers and the column will be filtered out
+   * @param scanType the type of scan operation being performed
+   * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by
+   *                   calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}.  If null, then
+   *                   {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead.
+   */
+  public static Filter getVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
+                                           ScanType scanType, @Nullable Filter cellFilter) {
+    return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2246abff/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
new file mode 100644
index 0000000..14941b3
--- /dev/null
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.coprocessor;
+
+import com.google.common.base.Supplier;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.OperationWithAttributes;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreScanner;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.TransactionCodec;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.coprocessor.TransactionStateCache;
+import org.apache.tephra.coprocessor.TransactionStateCacheSupplier;
+import org.apache.tephra.persist.TransactionVisibilityState;
+import org.apache.tephra.util.TxUtils;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+
+/**
+ * {@code org.apache.hadoop.hbase.coprocessor.RegionObserver} coprocessor that handles server-side processing
+ * for transactions:
+ * <ul>
+ *   <li>applies filtering to exclude data from invalid and in-progress transactions</li>
+ *   <li>overrides the scanner returned for flush and compaction to drop data written by invalidated transactions,
+ *   or expired due to TTL.</li>
+ * </ul>
+ *
+ * <p>In order to use this coprocessor for transactions, configure the class on any table involved in transactions,
+ * or on all user tables by adding the following to hbase-site.xml:
+ * {@code
+ * <property>
+ *   <name>hbase.coprocessor.region.classes</name>
+ *   <value>org.apache.tephra.hbase.coprocessor.TransactionProcessor</value>
+ * </property>
+ * }
+ * </p>
+ *
+ * <p>HBase {@code Get} and {@code Scan} operations should have the current transaction serialized on to the operation
+ * as an attribute:
+ * {@code
+ * Transaction t = ...;
+ * Get get = new Get(...);
+ * TransactionCodec codec = new TransactionCodec();
+ * codec.addToOperation(get, t);
+ * }
+ * </p>
+ */
+public class TransactionProcessor extends BaseRegionObserver {
+  private static final Log LOG = LogFactory.getLog(TransactionProcessor.class);
+
+  private TransactionStateCache cache;
+  private final TransactionCodec txCodec;
+  protected Map<byte[], Long> ttlByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+  protected boolean allowEmptyValues = TxConstants.ALLOW_EMPTY_VALUES_DEFAULT;
+  protected boolean readNonTxnData = TxConstants.DEFAULT_READ_NON_TX_DATA;
+
+  public TransactionProcessor() {
+    this.txCodec = new TransactionCodec();
+  }
+
+  /* RegionObserver implementation */
+
+  @Override
+  public void start(CoprocessorEnvironment e) throws IOException {
+    if (e instanceof RegionCoprocessorEnvironment) {
+      RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e;
+      Supplier<TransactionStateCache> cacheSupplier = getTransactionStateCacheSupplier(env);
+      this.cache = cacheSupplier.get();
+
+      HTableDescriptor tableDesc = env.getRegion().getTableDesc();
+      for (HColumnDescriptor columnDesc : tableDesc.getFamilies()) {
+        String columnTTL = columnDesc.getValue(TxConstants.PROPERTY_TTL);
+        long ttl = 0;
+        if (columnTTL != null) {
+          try {
+            ttl = Long.parseLong(columnTTL);
+            LOG.info("Family " + columnDesc.getNameAsString() + " has TTL of " + columnTTL);
+          } catch (NumberFormatException nfe) {
+            LOG.warn("Invalid TTL value configured for column family " + columnDesc.getNameAsString() +
+                       ", value = " + columnTTL);
+          }
+        }
+        ttlByFamily.put(columnDesc.getName(), ttl);
+      }
+
+      this.allowEmptyValues = env.getConfiguration().getBoolean(TxConstants.ALLOW_EMPTY_VALUES_KEY,
+                                                                TxConstants.ALLOW_EMPTY_VALUES_DEFAULT);
+      this.readNonTxnData = Boolean.valueOf(tableDesc.getValue(TxConstants.READ_NON_TX_DATA));
+      if (readNonTxnData) {
+        LOG.info("Reading pre-existing data enabled for table " + tableDesc.getNameAsString());
+      }
+    }
+  }
+
+  protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
+    return new TransactionStateCacheSupplier(env.getConfiguration());
+  }
+
+  @Override
+  public void stop(CoprocessorEnvironment e) throws IOException {
+    // nothing to do
+  }
+
+  @Override
+  public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results)
+    throws IOException {
+    Transaction tx = getFromOperation(get);
+    if (tx != null) {
+      projectFamilyDeletes(get);
+      get.setMaxVersions();
+      get.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttlByFamily, tx, readNonTxnData),
+                       TxUtils.getMaxVisibleTimestamp(tx));
+      Filter newFilter = getTransactionFilter(tx, ScanType.USER_SCAN, get.getFilter());
+      get.setFilter(newFilter);
+    }
+  }
+
+  @Override
+  public void preDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit,
+                        Durability durability) throws IOException {
+    // Translate deletes into our own delete tombstones
+    // Since HBase deletes cannot be undone, we need to translate deletes into special puts, which allows
+    // us to rollback the changes (by a real delete) if the transaction fails
+
+    // Deletes that are part of a transaction rollback do not need special handling.
+    // They will never be rolled back, so are performed as normal HBase deletes.
+    if (isRollbackOperation(delete)) {
+      return;
+    }
+
+    // Other deletes are client-initiated and need to be translated into our own tombstones
+    // TODO: this should delegate to the DeleteStrategy implementation.
+    Put deleteMarkers = new Put(delete.getRow(), delete.getTimeStamp());
+    for (byte[] family : delete.getFamilyCellMap().keySet()) {
+      List<Cell> familyCells = delete.getFamilyCellMap().get(family);
+      if (isFamilyDelete(familyCells)) {
+        deleteMarkers.add(family, TxConstants.FAMILY_DELETE_QUALIFIER, familyCells.get(0).getTimestamp(),
+            HConstants.EMPTY_BYTE_ARRAY);
+      } else {
+        int cellSize = familyCells.size();
+        for (Cell cell : familyCells) {
+          deleteMarkers.add(family, CellUtil.cloneQualifier(cell), cell.getTimestamp(),
+                            HConstants.EMPTY_BYTE_ARRAY);
+        }
+      }
+    }
+    for (Map.Entry<String, byte[]> entry : delete.getAttributesMap().entrySet()) {
+        deleteMarkers.setAttribute(entry.getKey(), entry.getValue());
+    }
+    e.getEnvironment().getRegion().put(deleteMarkers);
+    // skip normal delete handling
+    e.bypass();
+  }
+
+  private boolean isFamilyDelete(List<Cell> familyCells) {
+    return familyCells.size() == 1 && CellUtil.isDeleteFamily(familyCells.get(0));
+  }
+
+  @Override
+  public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan, RegionScanner s)
+    throws IOException {
+    Transaction tx = getFromOperation(scan);
+    if (tx != null) {
+      projectFamilyDeletes(scan);
+      scan.setMaxVersions();
+      scan.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttlByFamily, tx, readNonTxnData),
+                        TxUtils.getMaxVisibleTimestamp(tx));
+      Filter newFilter = getTransactionFilter(tx, ScanType.USER_SCAN, scan.getFilter());
+      scan.setFilter(newFilter);
+    }
+    return s;
+  }
+
+  /**
+   * Ensures that family delete markers are present in the columns requested for any scan operation.
+   * @param scan The original scan request
+   * @return The modified scan request with the family delete qualifiers represented
+   */
+  private Scan projectFamilyDeletes(Scan scan) {
+    for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
+      NavigableSet<byte[]> columns = entry.getValue();
+      // wildcard scans will automatically include the delete marker, so only need to add it when we have
+      // explicit columns listed
+      if (columns != null && !columns.isEmpty()) {
+        scan.addColumn(entry.getKey(), TxConstants.FAMILY_DELETE_QUALIFIER);
+      }
+    }
+    return scan;
+  }
+
+  /**
+   * Ensures that family delete markers are present in the columns requested for any get operation.
+   * @param get The original get request
+   * @return The modified get request with the family delete qualifiers represented
+   */
+  private Get projectFamilyDeletes(Get get) {
+    for (Map.Entry<byte[], NavigableSet<byte[]>> entry : get.getFamilyMap().entrySet()) {
+      NavigableSet<byte[]> columns = entry.getValue();
+      // wildcard scans will automatically include the delete marker, so only need to add it when we have
+      // explicit columns listed
+      if (columns != null && !columns.isEmpty()) {
+        get.addColumn(entry.getKey(), TxConstants.FAMILY_DELETE_QUALIFIER);
+      }
+    }
+    return get;
+  }
+
+  @Override
+  public InternalScanner preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+                                             KeyValueScanner memstoreScanner, InternalScanner scanner)
+      throws IOException {
+    return createStoreScanner(c.getEnvironment(), "flush", cache.getLatestState(), store,
+                              Collections.singletonList(memstoreScanner), ScanType.COMPACT_RETAIN_DELETES,
+                              HConstants.OLDEST_TIMESTAMP);
+  }
+
+  @Override
+  public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+      List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s,
+      CompactionRequest request)
+      throws IOException {
+    return createStoreScanner(c.getEnvironment(), "compaction", cache.getLatestState(), store, scanners,
+                              scanType, earliestPutTs);
+  }
+
+  protected InternalScanner createStoreScanner(RegionCoprocessorEnvironment env, String action,
+                                               TransactionVisibilityState snapshot, Store store,
+                                               List<? extends KeyValueScanner> scanners, ScanType type,
+                                               long earliestPutTs) throws IOException {
+    if (snapshot == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Region " + env.getRegion().getRegionInfo().getRegionNameAsString() +
+                    ", no current transaction state found, defaulting to normal " + action + " scanner");
+      }
+      return null;
+    }
+
+    // construct a dummy transaction from the latest snapshot
+    Transaction dummyTx = TxUtils.createDummyTransaction(snapshot);
+    Scan scan = new Scan();
+    // need to see all versions, since we filter out excludes and applications may rely on multiple versions
+    scan.setMaxVersions();
+    scan.setFilter(
+        new IncludeInProgressFilter(dummyTx.getVisibilityUpperBound(),
+            snapshot.getInvalid(),
+            getTransactionFilter(dummyTx, type, null)));
+
+    return new StoreScanner(store, store.getScanInfo(), scan, scanners,
+                            type, store.getSmallestReadPoint(), earliestPutTs);
+  }
+
+  private Transaction getFromOperation(OperationWithAttributes op) throws IOException {
+    byte[] encoded = op.getAttribute(TxConstants.TX_OPERATION_ATTRIBUTE_KEY);
+    if (encoded == null) {
+      // to support old clients
+      encoded = op.getAttribute(TxConstants.OLD_TX_OPERATION_ATTRIBUTE_KEY);
+    }
+    if (encoded != null) {
+      return txCodec.decode(encoded);
+    }
+    return null;
+  }
+
+  private boolean isRollbackOperation(OperationWithAttributes op) throws IOException {
+    return op.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null ||
+      // to support old clients
+      op.getAttribute(TxConstants.OLD_TX_ROLLBACK_ATTRIBUTE_KEY) != null;
+  }
+
+  /**
+   * Derived classes can override this method to customize the filter used to return data visible for the current
+   * transaction.
+   *
+   * @param tx the current transaction to apply
+   * @param type the type of scan being performed
+   */
+  protected Filter getTransactionFilter(Transaction tx, ScanType type, Filter filter) {
+    return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter);
+  }
+
+  /**
+   * Filter used to include cells visible to in-progress transactions on flush and commit.
+   */
+  static class IncludeInProgressFilter extends FilterBase {
+    private final long visibilityUpperBound;
+    private final Set<Long> invalidIds;
+    private final Filter txFilter;
+
+    public IncludeInProgressFilter(long upperBound, Collection<Long> invalids, Filter transactionFilter) {
+      this.visibilityUpperBound = upperBound;
+      this.invalidIds = Sets.newHashSet(invalids);
+      this.txFilter = transactionFilter;
+    }
+
+    @Override
+    public ReturnCode filterKeyValue(Cell cell) throws IOException {
+      // include all cells visible to in-progress transactions, except for those already marked as invalid
+      long ts = cell.getTimestamp();
+      if (ts > visibilityUpperBound) {
+        // include everything that could still be in-progress except invalids
+        if (invalidIds.contains(ts)) {
+          return ReturnCode.SKIP;
+        }
+        return ReturnCode.INCLUDE;
+      }
+      return txFilter.filterKeyValue(cell);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2246abff/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
new file mode 100644
index 0000000..a258972
--- /dev/null
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.coprocessor;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.util.TxUtils;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+/**
+ * Applies filtering of data based on transactional visibility (HBase 1.1+ specific version).
+ * Note: this is intended for server-side use only, as additional properties need to be set on
+ * any {@code Scan} or {@code Get} operation performed.
+ */
+public class TransactionVisibilityFilter extends FilterBase {
+  private final Transaction tx;
+  // oldest visible timestamp by column family, used to apply TTL when reading
+  private final Map<ImmutableBytesWritable, Long> oldestTsByFamily;
+  // if false, empty values will be interpreted as deletes
+  private final boolean allowEmptyValues;
+  // whether or not we can remove delete markers
+  // these can only be safely removed when we are traversing all storefiles
+  private final boolean clearDeletes;
+  // optional sub-filter to apply to visible cells
+  private final Filter cellFilter;
+  // since we traverse KVs in order, cache the current oldest TS to avoid map lookups per KV
+  private final ImmutableBytesWritable currentFamily = new ImmutableBytesWritable(HConstants.EMPTY_BYTE_ARRAY);
+  
+  private long currentOldestTs;
+
+  private DeleteTracker deleteTracker = new DeleteTracker();
+
+  /**
+   * Creates a new {@link org.apache.hadoop.hbase.filter.Filter} for returning data only from visible transactions.
+   *
+   * @param tx the current transaction to apply.  Only data visible to this transaction will be returned.
+   * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
+   * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
+   *                         these will be interpreted as "delete" markers and the column will be filtered out
+   * @param scanType the type of scan operation being performed
+   */
+  public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
+                              ScanType scanType) {
+    this(tx, ttlByFamily, allowEmptyValues, scanType, null);
+  }
+
+  /**
+   * Creates a new {@link org.apache.hadoop.hbase.filter.Filter} for returning data only from visible transactions.
+   *
+   * @param tx the current transaction to apply.  Only data visible to this transaction will be returned.
+   * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
+   * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
+   *                         these will be interpreted as "delete" markers and the column will be filtered out
+   * @param scanType the type of scan operation being performed
+   * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by
+   *                   calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}.  If null, then
+   *                   {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead.
+   */
+   public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
+                               ScanType scanType, @Nullable Filter cellFilter) {
+    this.tx = tx;
+    this.oldestTsByFamily = Maps.newTreeMap();
+    for (Map.Entry<byte[], Long> ttlEntry : ttlByFamily.entrySet()) {
+      long familyTTL = ttlEntry.getValue();
+      oldestTsByFamily.put(new ImmutableBytesWritable(ttlEntry.getKey()),
+                           familyTTL <= 0 ? 0 : tx.getVisibilityUpperBound() - familyTTL * TxConstants.MAX_TX_PER_MS);
+    }
+    this.allowEmptyValues = allowEmptyValues;
+    this.clearDeletes =
+      scanType == ScanType.COMPACT_DROP_DELETES ||
+        (scanType == ScanType.USER_SCAN && tx.getVisibilityLevel() != Transaction.VisibilityLevel.SNAPSHOT_ALL);
+    this.cellFilter = cellFilter;
+  }
+
+  @Override
+  public ReturnCode filterKeyValue(Cell cell) throws IOException {
+    if (!CellUtil.matchingFamily(cell, currentFamily.get(), currentFamily.getOffset(), currentFamily.getLength())) {
+      // column family changed
+      currentFamily.set(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
+      Long familyOldestTs = oldestTsByFamily.get(currentFamily);
+      currentOldestTs = familyOldestTs != null ? familyOldestTs : 0;
+      deleteTracker.reset();
+    }
+    // need to apply TTL for the column family here
+    long kvTimestamp = cell.getTimestamp();
+    if (TxUtils.getTimestampForTTL(kvTimestamp) < currentOldestTs) {
+      // passed TTL for this column, seek to next
+      return ReturnCode.NEXT_COL;
+    } else if (tx.isVisible(kvTimestamp)) {
+      // Return all writes done by current transaction (including deletes) for VisibilityLevel.SNAPSHOT_ALL
+      if (tx.getVisibilityLevel() == Transaction.VisibilityLevel.SNAPSHOT_ALL && tx.isCurrentWrite(kvTimestamp)) {
+        // cell is visible
+        // visibility SNAPSHOT_ALL needs all matches
+        return runSubFilter(ReturnCode.INCLUDE, cell);
+      }
+      if (DeleteTracker.isFamilyDelete(cell)) {
+        deleteTracker.addFamilyDelete(cell);
+        if (clearDeletes) {
+          return ReturnCode.NEXT_COL;
+        } else {
+          // cell is visible
+          // as soon as we find a KV to include we can move to the next column
+          return runSubFilter(ReturnCode.INCLUDE_AND_NEXT_COL, cell);
+        }
+      }
+      // check if masked by family delete
+      if (deleteTracker.isDeleted(cell)) {
+        return ReturnCode.NEXT_COL;
+      }
+      // check for column delete
+      if (isColumnDelete(cell)) {
+        if (clearDeletes) {
+          // skip "deleted" cell
+          return ReturnCode.NEXT_COL;
+        } else {
+          // keep the marker but skip any remaining versions
+          return runSubFilter(ReturnCode.INCLUDE_AND_NEXT_COL, cell);
+        }
+      }
+      // cell is visible
+      // as soon as we find a KV to include we can move to the next column
+      return runSubFilter(ReturnCode.INCLUDE_AND_NEXT_COL, cell);
+    } else {
+      return ReturnCode.SKIP;
+    }
+  }
+
+  private ReturnCode runSubFilter(ReturnCode txFilterCode, Cell cell) throws IOException {
+    if (cellFilter != null) {
+      ReturnCode subFilterCode = cellFilter.filterKeyValue(cell);
+      return determineReturnCode(txFilterCode, subFilterCode);
+    }
+    return txFilterCode;
+  }
+
+  /**
+   * Determines the return code of TransactionVisibilityFilter based on sub-filter's return code.
+   * Sub-filter can only exclude cells included by TransactionVisibilityFilter, i.e., sub-filter's
+   * INCLUDE will be ignored. This behavior makes sure that sub-filter only sees cell versions valid for the
+   * given transaction. If sub-filter needs to see older versions of cell, then this method can be overridden.
+   *
+   * @param txFilterCode return code from TransactionVisibilityFilter
+   * @param subFilterCode return code from sub-filter
+   * @return final return code
+   */
+  protected ReturnCode determineReturnCode(ReturnCode txFilterCode, ReturnCode subFilterCode) {
+    // Return the more restrictive of the two filter responses
+    switch (subFilterCode) {
+      case INCLUDE:
+        return txFilterCode;
+      case INCLUDE_AND_NEXT_COL:
+        return ReturnCode.INCLUDE_AND_NEXT_COL;
+      case SKIP:
+        return txFilterCode == ReturnCode.INCLUDE ? ReturnCode.SKIP : ReturnCode.NEXT_COL;
+      default:
+        return subFilterCode;
+    }
+  }
+
+  @Override
+  public boolean filterRow() throws IOException {
+    if (cellFilter != null) {
+      return cellFilter.filterRow();
+    }
+    return super.filterRow();
+  }
+  
+  @Override
+  public Cell transformCell(Cell cell) throws IOException {
+    // Convert Tephra deletes back into HBase deletes
+    if (tx.getVisibilityLevel() == Transaction.VisibilityLevel.SNAPSHOT_ALL) {
+      if (DeleteTracker.isFamilyDelete(cell)) {
+        return new KeyValue(CellUtil.cloneRow(cell), CellUtil.cloneFamily(cell), null, cell.getTimestamp(),
+                            KeyValue.Type.DeleteFamily);
+      } else if (isColumnDelete(cell)) {
+        // Note: in some cases KeyValue.Type.Delete is used in Delete object,
+        // and in some other cases KeyValue.Type.DeleteColumn is used.
+        // Since Tephra cannot distinguish between the two, we return KeyValue.Type.DeleteColumn.
+        // KeyValue.Type.DeleteColumn makes both CellUtil.isDelete and CellUtil.isDeleteColumns return true, and will
+        // work in both cases.
+        return new KeyValue(CellUtil.cloneRow(cell), CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell),
+                            cell.getTimestamp(), KeyValue.Type.DeleteColumn);
+      }
+    }
+    return cell;
+  }
+
+  @Override
+  public void reset() throws IOException {
+    deleteTracker.reset();
+    if (cellFilter != null) {
+      cellFilter.reset();
+    }
+  }
+
+  @Override
+  public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException {
+    if (cellFilter != null) {
+      return cellFilter.filterRowKey(buffer, offset, length);
+    }
+    return super.filterRowKey(buffer, offset, length);
+  }
+
+  @Override
+  public boolean filterAllRemaining() throws IOException {
+    if (cellFilter != null) {
+      return cellFilter.filterAllRemaining();
+    }
+    return super.filterAllRemaining();
+  }
+
+  @Override
+  public void filterRowCells(List<Cell> kvs) throws IOException {
+    if (cellFilter != null) {
+      cellFilter.filterRowCells(kvs);
+    } else {
+      super.filterRowCells(kvs);
+    }
+  }
+
+  @Override
+  public boolean hasFilterRow() {
+    if (cellFilter != null) {
+      return cellFilter.hasFilterRow();
+    }
+    return super.hasFilterRow();
+  }
+
+  @SuppressWarnings("deprecation")
+  @Override
+  public KeyValue getNextKeyHint(KeyValue currentKV) throws IOException {
+    if (cellFilter != null) {
+      return cellFilter.getNextKeyHint(currentKV);
+    }
+    return super.getNextKeyHint(currentKV);
+  }
+
+  @Override
+  public Cell getNextCellHint(Cell currentKV) throws IOException {
+    if (cellFilter != null) {
+      return cellFilter.getNextCellHint(currentKV);
+    }
+    return super.getNextCellHint(currentKV);
+  }
+
+  @Override
+  public boolean isFamilyEssential(byte[] name) throws IOException {
+    if (cellFilter != null) {
+      return cellFilter.isFamilyEssential(name);
+    }
+    return super.isFamilyEssential(name);
+  }
+
+  private boolean isColumnDelete(Cell cell) {
+    return !TxUtils.isPreExistingVersion(cell.getTimestamp()) && cell.getValueLength() == 0 && !allowEmptyValues;
+  }
+
+  private static final class DeleteTracker {
+    private long familyDeleteTs;
+
+    public static boolean isFamilyDelete(Cell cell) {
+      return !TxUtils.isPreExistingVersion(cell.getTimestamp()) &&
+        CellUtil.matchingQualifier(cell, TxConstants.FAMILY_DELETE_QUALIFIER) &&
+        CellUtil.matchingValue(cell, HConstants.EMPTY_BYTE_ARRAY);
+    }
+
+    public void addFamilyDelete(Cell delete) {
+      this.familyDeleteTs = delete.getTimestamp();
+    }
+
+    public boolean isDeleted(Cell cell) {
+      return cell.getTimestamp() <= familyDeleteTs;
+    }
+
+    public void reset() {
+      this.familyDeleteTs = 0;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2246abff/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/HBase11ConfigurationProviderTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/HBase11ConfigurationProviderTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/HBase11ConfigurationProviderTest.java
new file mode 100644
index 0000000..cac80ec
--- /dev/null
+++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/HBase11ConfigurationProviderTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.tephra.util.AbstractConfigurationProviderTest;
+import org.apache.tephra.util.HBaseVersion;
+
+import java.util.Collection;
+
+/**
+ * Test for HBase 1.1 and HBase 1.2 versions specific behavior.
+ */
+public class HBase11ConfigurationProviderTest extends AbstractConfigurationProviderTest {
+  @Override
+  protected Collection<HBaseVersion.Version> getExpectedVersions() {
+    return ImmutableList.of(HBaseVersion.Version.HBASE_11, HBaseVersion.Version.HBASE_12);
+  }
+}