You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2016/09/08 08:10:02 UTC

[3/7] incubator-tephra git commit: TEPHRA-176, TEPHRA-177: Adding maven modules for CDH-5.7, 5.8 support, HBase-1.1 and HBase-1.2 modules

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2246abff/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java b/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
deleted file mode 100644
index bb7afff..0000000
--- a/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
+++ /dev/null
@@ -1,678 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tephra.hbase;
-
-import com.google.protobuf.Descriptors.MethodDescriptor;
-import com.google.protobuf.Message;
-import com.google.protobuf.Service;
-import com.google.protobuf.ServiceException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Append;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Increment;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.OperationWithAttributes;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Row;
-import org.apache.hadoop.hbase.client.RowMutations;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.coprocessor.Batch;
-import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
-import org.apache.hadoop.hbase.filter.CompareFilter;
-import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
-import org.apache.tephra.AbstractTransactionAwareTable;
-import org.apache.tephra.Transaction;
-import org.apache.tephra.TransactionAware;
-import org.apache.tephra.TxConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.Set;
-
-/**
- * A Transaction Aware HTable implementation for HBase 1.1. Operations are committed as usual,
- * but upon a failed or aborted transaction, they are rolled back to the state before the transaction
- * was started.
- */
-public class TransactionAwareHTable extends AbstractTransactionAwareTable
-    implements HTableInterface, TransactionAware {
-
-  private static final Logger LOG = LoggerFactory.getLogger(TransactionAwareHTable.class);
-  private final HTableInterface hTable;
-
-  /**
-   * Create a transactional aware instance of the passed HTable
-   *
-   * @param hTable underlying HBase table to use
-   */
-  public TransactionAwareHTable(HTableInterface hTable) {
-    this(hTable, false);
-  }
-
-  /**
-   * Create a transactional aware instance of the passed HTable
-   *
-   * @param hTable underlying HBase table to use
-   * @param conflictLevel level of conflict detection to perform (defaults to {@code COLUMN})
-   */
-  public TransactionAwareHTable(HTableInterface hTable, TxConstants.ConflictDetection conflictLevel) {
-    this(hTable, conflictLevel, false);
-  }
-
-  /**
-   * Create a transactional aware instance of the passed HTable, with the option
-   * of allowing non-transactional operations.
-   * @param hTable underlying HBase table to use
-   * @param allowNonTransactional if true, additional operations (checkAndPut, increment, checkAndDelete)
-   *                              will be available, though non-transactional
-   */
-  public TransactionAwareHTable(HTableInterface hTable, boolean allowNonTransactional) {
-    this(hTable, TxConstants.ConflictDetection.COLUMN, allowNonTransactional);
-  }
-
-  /**
-   * Create a transactional aware instance of the passed HTable, with the option
-   * of allowing non-transactional operations.
-   * @param hTable underlying HBase table to use
-   * @param conflictLevel level of conflict detection to perform (defaults to {@code COLUMN})
-   * @param allowNonTransactional if true, additional operations (checkAndPut, increment, checkAndDelete)
-   *                              will be available, though non-transactional
-   */
-  public TransactionAwareHTable(HTableInterface hTable, TxConstants.ConflictDetection conflictLevel,
-                                boolean allowNonTransactional) {
-    super(conflictLevel, allowNonTransactional);
-    this.hTable = hTable;
-  }
-
-  /* AbstractTransactionAwareTable implementation */
-
-  @Override
-  protected byte[] getTableKey() {
-    return getTableName();
-  }
-
-  @Override
-  protected boolean doCommit() throws IOException {
-    hTable.flushCommits();
-    return true;
-  }
-
-  @Override
-  protected boolean doRollback() throws Exception {
-    try {
-      // pre-size arraylist of deletes
-      int size = 0;
-      for (Set<ActionChange> cs : changeSets.values()) {
-        size += cs.size();
-      }
-      List<Delete> rollbackDeletes = new ArrayList<>(size);
-      for (Map.Entry<Long, Set<ActionChange>> entry : changeSets.entrySet()) {
-        long transactionTimestamp = entry.getKey();
-        for (ActionChange change : entry.getValue()) {
-          byte[] row = change.getRow();
-          byte[] family = change.getFamily();
-          byte[] qualifier = change.getQualifier();
-          Delete rollbackDelete = new Delete(row);
-          makeRollbackOperation(rollbackDelete);
-          switch (conflictLevel) {
-            case ROW:
-            case NONE:
-              // issue family delete for the tx write pointer
-              rollbackDelete.deleteFamilyVersion(change.getFamily(), transactionTimestamp);
-              break;
-            case COLUMN:
-              if (family != null && qualifier == null) {
-                rollbackDelete.deleteFamilyVersion(family, transactionTimestamp);
-              } else if (family != null && qualifier != null) {
-                rollbackDelete.deleteColumn(family, qualifier, transactionTimestamp);
-              }
-              break;
-            default:
-              throw new IllegalStateException("Unknown conflict detection level: " + conflictLevel);
-          }
-          rollbackDeletes.add(rollbackDelete);
-        }
-      }
-      hTable.delete(rollbackDeletes);
-      return true;
-    } finally {
-      try {
-        hTable.flushCommits();
-      } catch (Exception e) {
-        LOG.error("Could not flush HTable commits", e);
-      }
-      tx = null;
-      changeSets.clear();
-    }
-  }
-
-  /* HTableInterface implementation */
-
-  @Override
-  public byte[] getTableName() {
-    return hTable.getTableName();
-  }
-
-  @Override
-  public TableName getName() {
-    return hTable.getName();
-  }
-
-  @Override
-  public Configuration getConfiguration() {
-    return hTable.getConfiguration();
-  }
-
-  @Override
-  public HTableDescriptor getTableDescriptor() throws IOException {
-    return hTable.getTableDescriptor();
-  }
-
-  @Override
-  public boolean exists(Get get) throws IOException {
-    if (tx == null) {
-      throw new IOException("Transaction not started");
-    }
-    return hTable.exists(transactionalizeAction(get));
-  }
-
-  @Override
-  public Boolean[] exists(List<Get> gets) throws IOException {
-    if (tx == null) {
-      throw new IOException("Transaction not started");
-    }
-    List<Get> transactionalizedGets = new ArrayList<>(gets.size());
-    for (Get get : gets) {
-      transactionalizedGets.add(transactionalizeAction(get));
-    }
-    return hTable.exists(transactionalizedGets);
-  }
-
-  @Override
-  public void batch(List<? extends Row> actions, Object[] results) throws IOException, InterruptedException {
-    if (tx == null) {
-      throw new IOException("Transaction not started");
-    }
-    hTable.batch(transactionalizeActions(actions), results);
-  }
-
-  @Override
-  public Object[] batch(List<? extends Row> actions) throws IOException, InterruptedException {
-    if (tx == null) {
-      throw new IOException("Transaction not started");
-    }
-    return hTable.batch(transactionalizeActions(actions));
-  }
-
-  @Override
-  public <R> void batchCallback(List<? extends Row> actions, Object[] results, Batch.Callback<R> callback) throws
-    IOException, InterruptedException {
-    if (tx == null) {
-      throw new IOException("Transaction not started");
-    }
-    hTable.batchCallback(transactionalizeActions(actions), results, callback);
-  }
-
-  @Override
-  public <R> Object[] batchCallback(List<? extends Row> actions, Batch.Callback<R> callback) throws IOException,
-    InterruptedException {
-    if (tx == null) {
-      throw new IOException("Transaction not started");
-    }
-    return hTable.batchCallback(transactionalizeActions(actions), callback);
-  }
-
-  @Override
-  public Result get(Get get) throws IOException {
-    if (tx == null) {
-      throw new IOException("Transaction not started");
-    }
-    return hTable.get(transactionalizeAction(get));
-  }
-
-  @Override
-  public Result[] get(List<Get> gets) throws IOException {
-    if (tx == null) {
-      throw new IOException("Transaction not started");
-    }
-    ArrayList<Get> transactionalizedGets = new ArrayList<>();
-    for (Get get : gets) {
-      transactionalizedGets.add(transactionalizeAction(get));
-    }
-    return hTable.get(transactionalizedGets);
-  }
-
-  @Override
-  public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
-    if (allowNonTransactional) {
-      return hTable.getRowOrBefore(row, family);
-    } else {
-      throw new UnsupportedOperationException("Operation is not supported transactionally");
-    }
-  }
-
-  @Override
-  public ResultScanner getScanner(Scan scan) throws IOException {
-    if (tx == null) {
-      throw new IOException("Transaction not started");
-    }
-    return hTable.getScanner(transactionalizeAction(scan));
-  }
-
-  @Override
-  public ResultScanner getScanner(byte[] family) throws IOException {
-    if (tx == null) {
-      throw new IOException("Transaction not started");
-    }
-    Scan scan = new Scan();
-    scan.addFamily(family);
-    return hTable.getScanner(transactionalizeAction(scan));
-  }
-
-  @Override
-  public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException {
-    if (tx == null) {
-      throw new IOException("Transaction not started");
-    }
-    Scan scan = new Scan();
-    scan.addColumn(family, qualifier);
-    return hTable.getScanner(transactionalizeAction(scan));
-  }
-
-  @Override
-  public void put(Put put) throws IOException {
-    if (tx == null) {
-      throw new IOException("Transaction not started");
-    }
-    Put txPut = transactionalizeAction(put);
-    hTable.put(txPut);
-  }
-
-  @Override
-  public void put(List<Put> puts) throws IOException {
-    if (tx == null) {
-      throw new IOException("Transaction not started");
-    }
-    List<Put> transactionalizedPuts = new ArrayList<>(puts.size());
-    for (Put put : puts) {
-      Put txPut = transactionalizeAction(put);
-      transactionalizedPuts.add(txPut);
-    }
-    hTable.put(transactionalizedPuts);
-  }
-
-  @Override
-  public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put) throws IOException {
-    if (allowNonTransactional) {
-      return hTable.checkAndPut(row, family, qualifier, value, put);
-    } else {
-      throw new UnsupportedOperationException("Operation is not supported transactionally");
-    }
-  }
-
-  @Override
-  public void delete(Delete delete) throws IOException {
-    if (tx == null) {
-      throw new IOException("Transaction not started");
-    }
-    hTable.delete(transactionalizeAction(delete));
-  }
-
-  @Override
-  public void delete(List<Delete> deletes) throws IOException {
-    if (tx == null) {
-      throw new IOException("Transaction not started");
-    }
-    List<Delete> transactionalizedDeletes = new ArrayList<>(deletes.size());
-    for (Delete delete : deletes) {
-      Delete txDelete = transactionalizeAction(delete);
-      transactionalizedDeletes.add(txDelete);
-    }
-    hTable.delete(transactionalizedDeletes);
-  }
-
-  @Override
-  public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, byte[] value, Delete delete)
-    throws IOException {
-    if (allowNonTransactional) {
-      return hTable.checkAndDelete(row, family, qualifier, value, delete);
-    } else {
-      throw new UnsupportedOperationException("Operation is not supported transactionally");
-    }
-  }
-
-  @Override
-  public boolean checkAndDelete(byte[] bytes, byte[] bytes1, byte[] bytes2, CompareFilter.CompareOp compareOp,
-                                byte[] bytes3, Delete delete) throws IOException {
-    if (allowNonTransactional) {
-      return hTable.checkAndDelete(bytes, bytes1, bytes2, compareOp, bytes3, delete);
-    } else {
-      throw new UnsupportedOperationException("Operation is not supported transactionally");
-    }
-  }
-
-  @Override
-  public boolean checkAndPut(byte[] bytes, byte[] bytes1, byte[] bytes2, CompareFilter.CompareOp compareOp,
-                             byte[] bytes3, Put put) throws IOException {
-    if (allowNonTransactional) {
-      return hTable.checkAndPut(bytes, bytes1, bytes2, compareOp, bytes3, put);
-    } else {
-      throw new UnsupportedOperationException("Operation is not supported transactionally");
-    }
-  }
-
-  @Override
-  public boolean[] existsAll(List<Get> gets) throws IOException {
-    if (tx == null) {
-      throw new IOException("Transaction not started");
-    }
-    List<Get> transactionalizedGets = new ArrayList<>(gets.size());
-    for (Get get : gets) {
-      transactionalizedGets.add(transactionalizeAction(get));
-    }
-    return hTable.existsAll(transactionalizedGets);
-  }
-
-  @Override
-  public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
-                                CompareFilter.CompareOp compareOp, byte[] value, RowMutations rowMutations)
-      throws IOException {
-    if (allowNonTransactional) {
-      return hTable.checkAndMutate(row, family, qualifier, compareOp, value, rowMutations);
-    }
-
-    throw new UnsupportedOperationException("checkAndMutate operation is not supported transactionally");
-  }
-
-  @Override
-  public void mutateRow(RowMutations rm) throws IOException {
-    if (tx == null) {
-      throw new IOException("Transaction not started");
-    }
-    RowMutations transactionalMutations = new RowMutations();
-    for (Mutation mutation : rm.getMutations()) {
-      if (mutation instanceof Put) {
-        transactionalMutations.add(transactionalizeAction((Put) mutation));
-      } else if (mutation instanceof Delete) {
-        transactionalMutations.add(transactionalizeAction((Delete) mutation));
-      }
-    }
-    hTable.mutateRow(transactionalMutations);
-  }
-
-  @Override
-  public Result append(Append append) throws IOException {
-    if (allowNonTransactional) {
-      return hTable.append(append);
-    } else {
-      throw new UnsupportedOperationException("Operation is not supported transactionally");
-    }
-  }
-
-  @Override
-  public Result increment(Increment increment) throws IOException {
-    if (allowNonTransactional) {
-      return hTable.increment(increment);
-    } else {
-      throw new UnsupportedOperationException("Operation is not supported transactionally");
-    }
-  }
-
-  @Override
-  public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount) throws IOException {
-    if (allowNonTransactional) {
-      return hTable.incrementColumnValue(row, family, qualifier, amount);
-    } else {
-      throw new UnsupportedOperationException("Operation is not supported transactionally");
-    }
-  }
-
-  @Override
-  public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, Durability durability)
-    throws IOException {
-    if (allowNonTransactional) {
-      return hTable.incrementColumnValue(row, family, qualifier, amount, durability);
-    } else {
-      throw new UnsupportedOperationException("Operation is not supported transactionally");
-    }
-  }
-
-  @Override
-  public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, boolean writeToWAL)
-    throws IOException {
-    if (allowNonTransactional) {
-      return hTable.incrementColumnValue(row, family, qualifier, amount, writeToWAL);
-    } else {
-      throw new UnsupportedOperationException("Operation is not supported transactionally");
-    }
-  }
-
-  @Override
-  public boolean isAutoFlush() {
-    return hTable.isAutoFlush();
-  }
-
-  @Override
-  public void flushCommits() throws IOException {
-    hTable.flushCommits();
-  }
-
-  @Override
-  public void close() throws IOException {
-    hTable.close();
-  }
-
-  @Override
-  public CoprocessorRpcChannel coprocessorService(byte[] row) {
-    return hTable.coprocessorService(row);
-  }
-
-  @Override
-  public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service, byte[] startKey, byte[] endKey,
-                                                                  Batch.Call<T, R> callable)
-    throws ServiceException, Throwable {
-    return hTable.coprocessorService(service, startKey, endKey, callable);
-  }
-
-  @Override
-  public <T extends Service, R> void coprocessorService(Class<T> service, byte[] startKey, byte[] endKey,
-                                                        Batch.Call<T, R> callable, Batch.Callback<R> callback)
-    throws ServiceException, Throwable {
-    hTable.coprocessorService(service, startKey, endKey, callable, callback);
-  }
-
-  @Override
-  public <R extends Message> Map<byte[], R> batchCoprocessorService(
-      MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey,
-      R responsePrototype) throws ServiceException, Throwable {
-    return hTable.batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype);
-  }
-
-  @Override
-  public <R extends Message> void batchCoprocessorService(MethodDescriptor methodDescriptor,
-      Message request, byte[] startKey, byte[] endKey, R responsePrototype, Callback<R> callback)
-      throws ServiceException, Throwable {
-    hTable.batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype, callback);
-  }
-
-  @Override
-  public void setAutoFlush(boolean autoFlush) {
-    setAutoFlushTo(autoFlush);
-  }
-
-  @Override
-  public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
-    hTable.setAutoFlush(autoFlush, clearBufferOnFail);
-  }
-
-  @Override
-  public void setAutoFlushTo(boolean autoFlush) {
-    hTable.setAutoFlushTo(autoFlush);
-  }
-
-  @Override
-  public long getWriteBufferSize() {
-    return hTable.getWriteBufferSize();
-  }
-
-  @Override
-  public void setWriteBufferSize(long writeBufferSize) throws IOException {
-    hTable.setWriteBufferSize(writeBufferSize);
-  }
-
-  // Helpers to get copies of objects with the timestamp set to the current transaction timestamp.
-
-  private Get transactionalizeAction(Get get) throws IOException {
-    addToOperation(get, tx);
-    return get;
-  }
-
-  private Scan transactionalizeAction(Scan scan) throws IOException {
-    addToOperation(scan, tx);
-    return scan;
-  }
-
-  private Put transactionalizeAction(Put put) throws IOException {
-    Put txPut = new Put(put.getRow(), tx.getWritePointer());
-    Set<Map.Entry<byte[], List<Cell>>> familyMap = put.getFamilyCellMap().entrySet();
-    if (!familyMap.isEmpty()) {
-      for (Map.Entry<byte[], List<Cell>> family : familyMap) {
-        List<Cell> familyValues = family.getValue();
-        if (!familyValues.isEmpty()) {
-          for (Cell value : familyValues) {
-            txPut.add(value.getFamily(), value.getQualifier(), tx.getWritePointer(), value.getValue());
-            addToChangeSet(txPut.getRow(), value.getFamily(), value.getQualifier());
-          }
-        }
-      }
-    }
-    for (Map.Entry<String, byte[]> entry : put.getAttributesMap().entrySet()) {
-      txPut.setAttribute(entry.getKey(), entry.getValue());
-    }
-    txPut.setDurability(put.getDurability());
-    addToOperation(txPut, tx);
-    return txPut;
-  }
-
-  private Delete transactionalizeAction(Delete delete) throws IOException {
-    long transactionTimestamp = tx.getWritePointer();
-
-    byte[] deleteRow = delete.getRow();
-    Delete txDelete = new Delete(deleteRow, transactionTimestamp);
-
-    Map<byte[], List<Cell>> familyToDelete = delete.getFamilyCellMap();
-    if (familyToDelete.isEmpty()) {
-      // perform a row delete if we are using row-level conflict detection
-      if (conflictLevel == TxConstants.ConflictDetection.ROW ||
-        conflictLevel == TxConstants.ConflictDetection.NONE) {
-        // Row delete leaves delete markers in all column families of the table
-        // Therefore get all the column families of the hTable from the HTableDescriptor and add them to the changeSet
-        for (HColumnDescriptor columnDescriptor : hTable.getTableDescriptor().getColumnFamilies()) {
-          // no need to identify individual columns deleted
-          addToChangeSet(deleteRow, columnDescriptor.getName(), null);
-        }
-      } else {
-        Result result = get(new Get(delete.getRow()));
-        // Delete everything
-        NavigableMap<byte[], NavigableMap<byte[], byte[]>> resultMap = result.getNoVersionMap();
-        for (Map.Entry<byte[], NavigableMap<byte[], byte[]>> familyEntry : resultMap.entrySet()) {
-          NavigableMap<byte[], byte[]> familyColumns = result.getFamilyMap(familyEntry.getKey());
-          for (Map.Entry<byte[], byte[]> column : familyColumns.entrySet()) {
-            txDelete.deleteColumns(familyEntry.getKey(), column.getKey(), transactionTimestamp);
-            addToChangeSet(deleteRow, familyEntry.getKey(), column.getKey());
-          }
-        }
-      }
-    } else {
-      for (Map.Entry<byte [], List<Cell>> familyEntry : familyToDelete.entrySet()) {
-        byte[] family = familyEntry.getKey();
-        List<Cell> entries = familyEntry.getValue();
-        boolean isFamilyDelete = false;
-        if (entries.size() == 1) {
-          Cell cell = entries.get(0);
-          isFamilyDelete = CellUtil.isDeleteFamily(cell);
-        }
-        if (isFamilyDelete) {
-          if (conflictLevel == TxConstants.ConflictDetection.ROW ||
-              conflictLevel == TxConstants.ConflictDetection.NONE) {
-            // no need to identify individual columns deleted
-            txDelete.deleteFamily(family);
-            addToChangeSet(deleteRow, family, null);
-          } else {
-            Result result = get(new Get(delete.getRow()).addFamily(family));
-            // Delete entire family
-            NavigableMap<byte[], byte[]> familyColumns = result.getFamilyMap(family);
-            for (Map.Entry<byte[], byte[]> column : familyColumns.entrySet()) {
-              txDelete.deleteColumns(family, column.getKey(), transactionTimestamp);
-              addToChangeSet(deleteRow, family, column.getKey());
-            }
-          }
-        } else {
-          for (Cell value : entries) {
-            txDelete.deleteColumns(value.getFamily(), value.getQualifier(), transactionTimestamp);
-            addToChangeSet(deleteRow, value.getFamily(), value.getQualifier());
-          }
-        }
-      }
-    }
-    for (Map.Entry<String, byte[]> entry : delete.getAttributesMap().entrySet()) {
-        txDelete.setAttribute(entry.getKey(), entry.getValue());
-    }
-    txDelete.setDurability(delete.getDurability());
-    return txDelete;
-  }
-
-  private List<? extends Row> transactionalizeActions(List<? extends Row> actions) throws IOException {
-    List<Row> transactionalizedActions = new ArrayList<>(actions.size());
-    for (Row action : actions) {
-      if (action instanceof Get) {
-        transactionalizedActions.add(transactionalizeAction((Get) action));
-      } else if (action instanceof Put) {
-        transactionalizedActions.add(transactionalizeAction((Put) action));
-      } else if (action instanceof Delete) {
-        transactionalizedActions.add(transactionalizeAction((Delete) action));
-      } else {
-        transactionalizedActions.add(action);
-      }
-    }
-    return transactionalizedActions;
-  }
-
-  public void addToOperation(OperationWithAttributes op, Transaction tx) throws IOException {
-    op.setAttribute(TxConstants.TX_OPERATION_ATTRIBUTE_KEY, txCodec.encode(tx));
-  }
-
-  protected void makeRollbackOperation(Delete delete) {
-    delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2246abff/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase/coprocessor/CellSkipFilter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase/coprocessor/CellSkipFilter.java b/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase/coprocessor/CellSkipFilter.java
deleted file mode 100644
index d8664f4..0000000
--- a/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase/coprocessor/CellSkipFilter.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tephra.hbase.coprocessor;
-
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.FilterBase;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * {@link Filter} that encapsulates another {@link Filter}. It remembers the last {@link KeyValue}
- * for which the underlying filter returned the {@link ReturnCode#NEXT_COL} or {@link ReturnCode#INCLUDE_AND_NEXT_COL},
- * so that when {@link #filterKeyValue} is called again for the same {@link KeyValue} with different
- * version, it returns {@link ReturnCode#NEXT_COL} directly without consulting the underlying {@link Filter}.
- * Please see TEPHRA-169 for more details.
- */
-public class CellSkipFilter extends FilterBase {
-  private final Filter filter;
-  // remember the previous keyvalue processed by filter when the return code was NEXT_COL or INCLUDE_AND_NEXT_COL
-  private KeyValue skipColumn = null;
-
-  public CellSkipFilter(Filter filter) {
-    this.filter = filter;
-  }
-
-  /**
-   * Determines whether the current cell should be skipped. The cell will be skipped
-   * if the previous keyvalue had the same key as the current cell. This means filter already responded
-   * for the previous keyvalue with ReturnCode.NEXT_COL or ReturnCode.INCLUDE_AND_NEXT_COL.
-   * @param cell the {@link Cell} to be tested for skipping
-   * @return true is current cell should be skipped, false otherwise
-   */
-  private boolean skipCellVersion(Cell cell) {
-    return skipColumn != null
-      && CellUtil.matchingRow(cell, skipColumn.getRowArray(), skipColumn.getRowOffset(),
-                              skipColumn.getRowLength())
-      && CellUtil.matchingFamily(cell, skipColumn.getFamilyArray(), skipColumn.getFamilyOffset(),
-                                 skipColumn.getFamilyLength())
-      && CellUtil.matchingQualifier(cell, skipColumn.getQualifierArray(), skipColumn.getQualifierOffset(),
-                                    skipColumn.getQualifierLength());
-  }
-
-  @Override
-  public ReturnCode filterKeyValue(Cell cell) throws IOException {
-    if (skipCellVersion(cell)) {
-      return ReturnCode.NEXT_COL;
-    }
-
-    ReturnCode code = filter.filterKeyValue(cell);
-    if (code == ReturnCode.NEXT_COL || code == ReturnCode.INCLUDE_AND_NEXT_COL) {
-      // only store the reference to the keyvalue if we are returning NEXT_COL or INCLUDE_AND_NEXT_COL
-      skipColumn = KeyValueUtil.createFirstOnRow(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
-                                                   cell.getFamilyArray(), cell.getFamilyOffset(),
-                                                   cell.getFamilyLength(), cell.getQualifierArray(),
-                                                   cell.getQualifierOffset(), cell.getQualifierLength());
-    } else {
-      skipColumn = null;
-    }
-    return code;
-  }
-
-  @Override
-  public boolean filterRow() throws IOException {
-    return filter.filterRow();
-  }
-
-  @Override
-  public Cell transformCell(Cell cell) throws IOException {
-    return filter.transformCell(cell);
-  }
-
-  @Override
-  public void reset() throws IOException {
-    filter.reset();
-  }
-
-  @Override
-  public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException {
-    return filter.filterRowKey(buffer, offset, length);
-  }
-
-  @Override
-  public boolean filterAllRemaining() throws IOException {
-    return filter.filterAllRemaining();
-  }
-
-  @Override
-  public void filterRowCells(List<Cell> kvs) throws IOException {
-    filter.filterRowCells(kvs);
-  }
-
-  @Override
-  public boolean hasFilterRow() {
-    return filter.hasFilterRow();
-  }
-
-  @SuppressWarnings("deprecation")
-  @Override
-  public KeyValue getNextKeyHint(KeyValue currentKV) throws IOException {
-    return filter.getNextKeyHint(currentKV);
-  }
-
-  @Override
-  public Cell getNextCellHint(Cell currentKV) throws IOException {
-    return filter.getNextCellHint(currentKV);
-  }
-
-  @Override
-  public boolean isFamilyEssential(byte[] name) throws IOException {
-    return filter.isFamilyEssential(name);
-  }
-
-  @Override
-  public byte[] toByteArray() throws IOException {
-    return filter.toByteArray();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2246abff/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java b/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java
deleted file mode 100644
index 0ca9f9c..0000000
--- a/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tephra.hbase.coprocessor;
-
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.regionserver.ScanType;
-import org.apache.tephra.Transaction;
-
-import java.util.Map;
-import javax.annotation.Nullable;
-
-/**
- * Factory class for providing {@link Filter} instances.
- */
-public class TransactionFilters {
-  /**
-   * Creates a new {@link org.apache.hadoop.hbase.filter.Filter} for returning data only from visible transactions.
-   *
-   * @param tx the current transaction to apply.  Only data visible to this transaction will be returned.
-   * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
-   * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
-   *                         these will be interpreted as "delete" markers and the column will be filtered out
-   * @param scanType the type of scan operation being performed
-   */
-  public static Filter getVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
-                                           ScanType scanType) {
-    return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, null));
-  }
-
-  /**
-   * Creates a new {@link org.apache.hadoop.hbase.filter.Filter} for returning data only from visible transactions.
-   *
-   * @param tx the current transaction to apply.  Only data visible to this transaction will be returned.
-   * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
-   * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
-   *                         these will be interpreted as "delete" markers and the column will be filtered out
-   * @param scanType the type of scan operation being performed
-   * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by
-   *                   calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}.  If null, then
-   *                   {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead.
-   */
-  public static Filter getVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
-                                           ScanType scanType, @Nullable Filter cellFilter) {
-    return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2246abff/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
deleted file mode 100644
index 14941b3..0000000
--- a/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ /dev/null
@@ -1,358 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tephra.hbase.coprocessor;
-
-import com.google.common.base.Supplier;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.CoprocessorEnvironment;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.OperationWithAttributes;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.FilterBase;
-import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
-import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.hadoop.hbase.regionserver.ScanType;
-import org.apache.hadoop.hbase.regionserver.Store;
-import org.apache.hadoop.hbase.regionserver.StoreScanner;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.tephra.Transaction;
-import org.apache.tephra.TransactionCodec;
-import org.apache.tephra.TxConstants;
-import org.apache.tephra.coprocessor.TransactionStateCache;
-import org.apache.tephra.coprocessor.TransactionStateCacheSupplier;
-import org.apache.tephra.persist.TransactionVisibilityState;
-import org.apache.tephra.util.TxUtils;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableSet;
-import java.util.Set;
-
-/**
- * {@code org.apache.hadoop.hbase.coprocessor.RegionObserver} coprocessor that handles server-side processing
- * for transactions:
- * <ul>
- *   <li>applies filtering to exclude data from invalid and in-progress transactions</li>
- *   <li>overrides the scanner returned for flush and compaction to drop data written by invalidated transactions,
- *   or expired due to TTL.</li>
- * </ul>
- *
- * <p>In order to use this coprocessor for transactions, configure the class on any table involved in transactions,
- * or on all user tables by adding the following to hbase-site.xml:
- * {@code
- * <property>
- *   <name>hbase.coprocessor.region.classes</name>
- *   <value>org.apache.tephra.hbase.coprocessor.TransactionProcessor</value>
- * </property>
- * }
- * </p>
- *
- * <p>HBase {@code Get} and {@code Scan} operations should have the current transaction serialized on to the operation
- * as an attribute:
- * {@code
- * Transaction t = ...;
- * Get get = new Get(...);
- * TransactionCodec codec = new TransactionCodec();
- * codec.addToOperation(get, t);
- * }
- * </p>
- */
-public class TransactionProcessor extends BaseRegionObserver {
-  private static final Log LOG = LogFactory.getLog(TransactionProcessor.class);
-
-  private TransactionStateCache cache;
-  private final TransactionCodec txCodec;
-  protected Map<byte[], Long> ttlByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
-  protected boolean allowEmptyValues = TxConstants.ALLOW_EMPTY_VALUES_DEFAULT;
-  protected boolean readNonTxnData = TxConstants.DEFAULT_READ_NON_TX_DATA;
-
-  public TransactionProcessor() {
-    this.txCodec = new TransactionCodec();
-  }
-
-  /* RegionObserver implementation */
-
-  @Override
-  public void start(CoprocessorEnvironment e) throws IOException {
-    if (e instanceof RegionCoprocessorEnvironment) {
-      RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e;
-      Supplier<TransactionStateCache> cacheSupplier = getTransactionStateCacheSupplier(env);
-      this.cache = cacheSupplier.get();
-
-      HTableDescriptor tableDesc = env.getRegion().getTableDesc();
-      for (HColumnDescriptor columnDesc : tableDesc.getFamilies()) {
-        String columnTTL = columnDesc.getValue(TxConstants.PROPERTY_TTL);
-        long ttl = 0;
-        if (columnTTL != null) {
-          try {
-            ttl = Long.parseLong(columnTTL);
-            LOG.info("Family " + columnDesc.getNameAsString() + " has TTL of " + columnTTL);
-          } catch (NumberFormatException nfe) {
-            LOG.warn("Invalid TTL value configured for column family " + columnDesc.getNameAsString() +
-                       ", value = " + columnTTL);
-          }
-        }
-        ttlByFamily.put(columnDesc.getName(), ttl);
-      }
-
-      this.allowEmptyValues = env.getConfiguration().getBoolean(TxConstants.ALLOW_EMPTY_VALUES_KEY,
-                                                                TxConstants.ALLOW_EMPTY_VALUES_DEFAULT);
-      this.readNonTxnData = Boolean.valueOf(tableDesc.getValue(TxConstants.READ_NON_TX_DATA));
-      if (readNonTxnData) {
-        LOG.info("Reading pre-existing data enabled for table " + tableDesc.getNameAsString());
-      }
-    }
-  }
-
-  protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
-    return new TransactionStateCacheSupplier(env.getConfiguration());
-  }
-
-  @Override
-  public void stop(CoprocessorEnvironment e) throws IOException {
-    // nothing to do
-  }
-
-  @Override
-  public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results)
-    throws IOException {
-    Transaction tx = getFromOperation(get);
-    if (tx != null) {
-      projectFamilyDeletes(get);
-      get.setMaxVersions();
-      get.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttlByFamily, tx, readNonTxnData),
-                       TxUtils.getMaxVisibleTimestamp(tx));
-      Filter newFilter = getTransactionFilter(tx, ScanType.USER_SCAN, get.getFilter());
-      get.setFilter(newFilter);
-    }
-  }
-
-  @Override
-  public void preDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit,
-                        Durability durability) throws IOException {
-    // Translate deletes into our own delete tombstones
-    // Since HBase deletes cannot be undone, we need to translate deletes into special puts, which allows
-    // us to rollback the changes (by a real delete) if the transaction fails
-
-    // Deletes that are part of a transaction rollback do not need special handling.
-    // They will never be rolled back, so are performed as normal HBase deletes.
-    if (isRollbackOperation(delete)) {
-      return;
-    }
-
-    // Other deletes are client-initiated and need to be translated into our own tombstones
-    // TODO: this should delegate to the DeleteStrategy implementation.
-    Put deleteMarkers = new Put(delete.getRow(), delete.getTimeStamp());
-    for (byte[] family : delete.getFamilyCellMap().keySet()) {
-      List<Cell> familyCells = delete.getFamilyCellMap().get(family);
-      if (isFamilyDelete(familyCells)) {
-        deleteMarkers.add(family, TxConstants.FAMILY_DELETE_QUALIFIER, familyCells.get(0).getTimestamp(),
-            HConstants.EMPTY_BYTE_ARRAY);
-      } else {
-        int cellSize = familyCells.size();
-        for (Cell cell : familyCells) {
-          deleteMarkers.add(family, CellUtil.cloneQualifier(cell), cell.getTimestamp(),
-                            HConstants.EMPTY_BYTE_ARRAY);
-        }
-      }
-    }
-    for (Map.Entry<String, byte[]> entry : delete.getAttributesMap().entrySet()) {
-        deleteMarkers.setAttribute(entry.getKey(), entry.getValue());
-    }
-    e.getEnvironment().getRegion().put(deleteMarkers);
-    // skip normal delete handling
-    e.bypass();
-  }
-
-  private boolean isFamilyDelete(List<Cell> familyCells) {
-    return familyCells.size() == 1 && CellUtil.isDeleteFamily(familyCells.get(0));
-  }
-
-  @Override
-  public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan, RegionScanner s)
-    throws IOException {
-    Transaction tx = getFromOperation(scan);
-    if (tx != null) {
-      projectFamilyDeletes(scan);
-      scan.setMaxVersions();
-      scan.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttlByFamily, tx, readNonTxnData),
-                        TxUtils.getMaxVisibleTimestamp(tx));
-      Filter newFilter = getTransactionFilter(tx, ScanType.USER_SCAN, scan.getFilter());
-      scan.setFilter(newFilter);
-    }
-    return s;
-  }
-
-  /**
-   * Ensures that family delete markers are present in the columns requested for any scan operation.
-   * @param scan The original scan request
-   * @return The modified scan request with the family delete qualifiers represented
-   */
-  private Scan projectFamilyDeletes(Scan scan) {
-    for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
-      NavigableSet<byte[]> columns = entry.getValue();
-      // wildcard scans will automatically include the delete marker, so only need to add it when we have
-      // explicit columns listed
-      if (columns != null && !columns.isEmpty()) {
-        scan.addColumn(entry.getKey(), TxConstants.FAMILY_DELETE_QUALIFIER);
-      }
-    }
-    return scan;
-  }
-
-  /**
-   * Ensures that family delete markers are present in the columns requested for any get operation.
-   * @param get The original get request
-   * @return The modified get request with the family delete qualifiers represented
-   */
-  private Get projectFamilyDeletes(Get get) {
-    for (Map.Entry<byte[], NavigableSet<byte[]>> entry : get.getFamilyMap().entrySet()) {
-      NavigableSet<byte[]> columns = entry.getValue();
-      // wildcard scans will automatically include the delete marker, so only need to add it when we have
-      // explicit columns listed
-      if (columns != null && !columns.isEmpty()) {
-        get.addColumn(entry.getKey(), TxConstants.FAMILY_DELETE_QUALIFIER);
-      }
-    }
-    return get;
-  }
-
-  @Override
-  public InternalScanner preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
-                                             KeyValueScanner memstoreScanner, InternalScanner scanner)
-      throws IOException {
-    return createStoreScanner(c.getEnvironment(), "flush", cache.getLatestState(), store,
-                              Collections.singletonList(memstoreScanner), ScanType.COMPACT_RETAIN_DELETES,
-                              HConstants.OLDEST_TIMESTAMP);
-  }
-
-  @Override
-  public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
-      List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s,
-      CompactionRequest request)
-      throws IOException {
-    return createStoreScanner(c.getEnvironment(), "compaction", cache.getLatestState(), store, scanners,
-                              scanType, earliestPutTs);
-  }
-
-  protected InternalScanner createStoreScanner(RegionCoprocessorEnvironment env, String action,
-                                               TransactionVisibilityState snapshot, Store store,
-                                               List<? extends KeyValueScanner> scanners, ScanType type,
-                                               long earliestPutTs) throws IOException {
-    if (snapshot == null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Region " + env.getRegion().getRegionInfo().getRegionNameAsString() +
-                    ", no current transaction state found, defaulting to normal " + action + " scanner");
-      }
-      return null;
-    }
-
-    // construct a dummy transaction from the latest snapshot
-    Transaction dummyTx = TxUtils.createDummyTransaction(snapshot);
-    Scan scan = new Scan();
-    // need to see all versions, since we filter out excludes and applications may rely on multiple versions
-    scan.setMaxVersions();
-    scan.setFilter(
-        new IncludeInProgressFilter(dummyTx.getVisibilityUpperBound(),
-            snapshot.getInvalid(),
-            getTransactionFilter(dummyTx, type, null)));
-
-    return new StoreScanner(store, store.getScanInfo(), scan, scanners,
-                            type, store.getSmallestReadPoint(), earliestPutTs);
-  }
-
-  private Transaction getFromOperation(OperationWithAttributes op) throws IOException {
-    byte[] encoded = op.getAttribute(TxConstants.TX_OPERATION_ATTRIBUTE_KEY);
-    if (encoded == null) {
-      // to support old clients
-      encoded = op.getAttribute(TxConstants.OLD_TX_OPERATION_ATTRIBUTE_KEY);
-    }
-    if (encoded != null) {
-      return txCodec.decode(encoded);
-    }
-    return null;
-  }
-
-  private boolean isRollbackOperation(OperationWithAttributes op) throws IOException {
-    return op.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null ||
-      // to support old clients
-      op.getAttribute(TxConstants.OLD_TX_ROLLBACK_ATTRIBUTE_KEY) != null;
-  }
-
-  /**
-   * Derived classes can override this method to customize the filter used to return data visible for the current
-   * transaction.
-   *
-   * @param tx the current transaction to apply
-   * @param type the type of scan being performed
-   */
-  protected Filter getTransactionFilter(Transaction tx, ScanType type, Filter filter) {
-    return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter);
-  }
-
-  /**
-   * Filter used to include cells visible to in-progress transactions on flush and commit.
-   */
-  static class IncludeInProgressFilter extends FilterBase {
-    private final long visibilityUpperBound;
-    private final Set<Long> invalidIds;
-    private final Filter txFilter;
-
-    public IncludeInProgressFilter(long upperBound, Collection<Long> invalids, Filter transactionFilter) {
-      this.visibilityUpperBound = upperBound;
-      this.invalidIds = Sets.newHashSet(invalids);
-      this.txFilter = transactionFilter;
-    }
-
-    @Override
-    public ReturnCode filterKeyValue(Cell cell) throws IOException {
-      // include all cells visible to in-progress transactions, except for those already marked as invalid
-      long ts = cell.getTimestamp();
-      if (ts > visibilityUpperBound) {
-        // include everything that could still be in-progress except invalids
-        if (invalidIds.contains(ts)) {
-          return ReturnCode.SKIP;
-        }
-        return ReturnCode.INCLUDE;
-      }
-      return txFilter.filterKeyValue(cell);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2246abff/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
deleted file mode 100644
index a258972..0000000
--- a/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
+++ /dev/null
@@ -1,308 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tephra.hbase.coprocessor;
-
-import com.google.common.collect.Maps;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.FilterBase;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.regionserver.ScanType;
-import org.apache.tephra.Transaction;
-import org.apache.tephra.TxConstants;
-import org.apache.tephra.util.TxUtils;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import javax.annotation.Nullable;
-
-/**
- * Applies filtering of data based on transactional visibility (HBase 1.1+ specific version).
- * Note: this is intended for server-side use only, as additional properties need to be set on
- * any {@code Scan} or {@code Get} operation performed.
- */
-public class TransactionVisibilityFilter extends FilterBase {
-  private final Transaction tx;
-  // oldest visible timestamp by column family, used to apply TTL when reading
-  private final Map<ImmutableBytesWritable, Long> oldestTsByFamily;
-  // if false, empty values will be interpreted as deletes
-  private final boolean allowEmptyValues;
-  // whether or not we can remove delete markers
-  // these can only be safely removed when we are traversing all storefiles
-  private final boolean clearDeletes;
-  // optional sub-filter to apply to visible cells
-  private final Filter cellFilter;
-  // since we traverse KVs in order, cache the current oldest TS to avoid map lookups per KV
-  private final ImmutableBytesWritable currentFamily = new ImmutableBytesWritable(HConstants.EMPTY_BYTE_ARRAY);
-  
-  private long currentOldestTs;
-
-  private DeleteTracker deleteTracker = new DeleteTracker();
-
-  /**
-   * Creates a new {@link org.apache.hadoop.hbase.filter.Filter} for returning data only from visible transactions.
-   *
-   * @param tx the current transaction to apply.  Only data visible to this transaction will be returned.
-   * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
-   * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
-   *                         these will be interpreted as "delete" markers and the column will be filtered out
-   * @param scanType the type of scan operation being performed
-   */
-  public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
-                              ScanType scanType) {
-    this(tx, ttlByFamily, allowEmptyValues, scanType, null);
-  }
-
-  /**
-   * Creates a new {@link org.apache.hadoop.hbase.filter.Filter} for returning data only from visible transactions.
-   *
-   * @param tx the current transaction to apply.  Only data visible to this transaction will be returned.
-   * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
-   * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
-   *                         these will be interpreted as "delete" markers and the column will be filtered out
-   * @param scanType the type of scan operation being performed
-   * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by
-   *                   calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}.  If null, then
-   *                   {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead.
-   */
-   public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
-                               ScanType scanType, @Nullable Filter cellFilter) {
-    this.tx = tx;
-    this.oldestTsByFamily = Maps.newTreeMap();
-    for (Map.Entry<byte[], Long> ttlEntry : ttlByFamily.entrySet()) {
-      long familyTTL = ttlEntry.getValue();
-      oldestTsByFamily.put(new ImmutableBytesWritable(ttlEntry.getKey()),
-                           familyTTL <= 0 ? 0 : tx.getVisibilityUpperBound() - familyTTL * TxConstants.MAX_TX_PER_MS);
-    }
-    this.allowEmptyValues = allowEmptyValues;
-    this.clearDeletes =
-      scanType == ScanType.COMPACT_DROP_DELETES ||
-        (scanType == ScanType.USER_SCAN && tx.getVisibilityLevel() != Transaction.VisibilityLevel.SNAPSHOT_ALL);
-    this.cellFilter = cellFilter;
-  }
-
-  @Override
-  public ReturnCode filterKeyValue(Cell cell) throws IOException {
-    if (!CellUtil.matchingFamily(cell, currentFamily.get(), currentFamily.getOffset(), currentFamily.getLength())) {
-      // column family changed
-      currentFamily.set(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
-      Long familyOldestTs = oldestTsByFamily.get(currentFamily);
-      currentOldestTs = familyOldestTs != null ? familyOldestTs : 0;
-      deleteTracker.reset();
-    }
-    // need to apply TTL for the column family here
-    long kvTimestamp = cell.getTimestamp();
-    if (TxUtils.getTimestampForTTL(kvTimestamp) < currentOldestTs) {
-      // passed TTL for this column, seek to next
-      return ReturnCode.NEXT_COL;
-    } else if (tx.isVisible(kvTimestamp)) {
-      // Return all writes done by current transaction (including deletes) for VisibilityLevel.SNAPSHOT_ALL
-      if (tx.getVisibilityLevel() == Transaction.VisibilityLevel.SNAPSHOT_ALL && tx.isCurrentWrite(kvTimestamp)) {
-        // cell is visible
-        // visibility SNAPSHOT_ALL needs all matches
-        return runSubFilter(ReturnCode.INCLUDE, cell);
-      }
-      if (DeleteTracker.isFamilyDelete(cell)) {
-        deleteTracker.addFamilyDelete(cell);
-        if (clearDeletes) {
-          return ReturnCode.NEXT_COL;
-        } else {
-          // cell is visible
-          // as soon as we find a KV to include we can move to the next column
-          return runSubFilter(ReturnCode.INCLUDE_AND_NEXT_COL, cell);
-        }
-      }
-      // check if masked by family delete
-      if (deleteTracker.isDeleted(cell)) {
-        return ReturnCode.NEXT_COL;
-      }
-      // check for column delete
-      if (isColumnDelete(cell)) {
-        if (clearDeletes) {
-          // skip "deleted" cell
-          return ReturnCode.NEXT_COL;
-        } else {
-          // keep the marker but skip any remaining versions
-          return runSubFilter(ReturnCode.INCLUDE_AND_NEXT_COL, cell);
-        }
-      }
-      // cell is visible
-      // as soon as we find a KV to include we can move to the next column
-      return runSubFilter(ReturnCode.INCLUDE_AND_NEXT_COL, cell);
-    } else {
-      return ReturnCode.SKIP;
-    }
-  }
-
-  private ReturnCode runSubFilter(ReturnCode txFilterCode, Cell cell) throws IOException {
-    if (cellFilter != null) {
-      ReturnCode subFilterCode = cellFilter.filterKeyValue(cell);
-      return determineReturnCode(txFilterCode, subFilterCode);
-    }
-    return txFilterCode;
-  }
-
-  /**
-   * Determines the return code of TransactionVisibilityFilter based on sub-filter's return code.
-   * Sub-filter can only exclude cells included by TransactionVisibilityFilter, i.e., sub-filter's
-   * INCLUDE will be ignored. This behavior makes sure that sub-filter only sees cell versions valid for the
-   * given transaction. If sub-filter needs to see older versions of cell, then this method can be overridden.
-   *
-   * @param txFilterCode return code from TransactionVisibilityFilter
-   * @param subFilterCode return code from sub-filter
-   * @return final return code
-   */
-  protected ReturnCode determineReturnCode(ReturnCode txFilterCode, ReturnCode subFilterCode) {
-    // Return the more restrictive of the two filter responses
-    switch (subFilterCode) {
-      case INCLUDE:
-        return txFilterCode;
-      case INCLUDE_AND_NEXT_COL:
-        return ReturnCode.INCLUDE_AND_NEXT_COL;
-      case SKIP:
-        return txFilterCode == ReturnCode.INCLUDE ? ReturnCode.SKIP : ReturnCode.NEXT_COL;
-      default:
-        return subFilterCode;
-    }
-  }
-
-  @Override
-  public boolean filterRow() throws IOException {
-    if (cellFilter != null) {
-      return cellFilter.filterRow();
-    }
-    return super.filterRow();
-  }
-  
-  @Override
-  public Cell transformCell(Cell cell) throws IOException {
-    // Convert Tephra deletes back into HBase deletes
-    if (tx.getVisibilityLevel() == Transaction.VisibilityLevel.SNAPSHOT_ALL) {
-      if (DeleteTracker.isFamilyDelete(cell)) {
-        return new KeyValue(CellUtil.cloneRow(cell), CellUtil.cloneFamily(cell), null, cell.getTimestamp(),
-                            KeyValue.Type.DeleteFamily);
-      } else if (isColumnDelete(cell)) {
-        // Note: in some cases KeyValue.Type.Delete is used in Delete object,
-        // and in some other cases KeyValue.Type.DeleteColumn is used.
-        // Since Tephra cannot distinguish between the two, we return KeyValue.Type.DeleteColumn.
-        // KeyValue.Type.DeleteColumn makes both CellUtil.isDelete and CellUtil.isDeleteColumns return true, and will
-        // work in both cases.
-        return new KeyValue(CellUtil.cloneRow(cell), CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell),
-                            cell.getTimestamp(), KeyValue.Type.DeleteColumn);
-      }
-    }
-    return cell;
-  }
-
-  @Override
-  public void reset() throws IOException {
-    deleteTracker.reset();
-    if (cellFilter != null) {
-      cellFilter.reset();
-    }
-  }
-
-  @Override
-  public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException {
-    if (cellFilter != null) {
-      return cellFilter.filterRowKey(buffer, offset, length);
-    }
-    return super.filterRowKey(buffer, offset, length);
-  }
-
-  @Override
-  public boolean filterAllRemaining() throws IOException {
-    if (cellFilter != null) {
-      return cellFilter.filterAllRemaining();
-    }
-    return super.filterAllRemaining();
-  }
-
-  @Override
-  public void filterRowCells(List<Cell> kvs) throws IOException {
-    if (cellFilter != null) {
-      cellFilter.filterRowCells(kvs);
-    } else {
-      super.filterRowCells(kvs);
-    }
-  }
-
-  @Override
-  public boolean hasFilterRow() {
-    if (cellFilter != null) {
-      return cellFilter.hasFilterRow();
-    }
-    return super.hasFilterRow();
-  }
-
-  @SuppressWarnings("deprecation")
-  @Override
-  public KeyValue getNextKeyHint(KeyValue currentKV) throws IOException {
-    if (cellFilter != null) {
-      return cellFilter.getNextKeyHint(currentKV);
-    }
-    return super.getNextKeyHint(currentKV);
-  }
-
-  @Override
-  public Cell getNextCellHint(Cell currentKV) throws IOException {
-    if (cellFilter != null) {
-      return cellFilter.getNextCellHint(currentKV);
-    }
-    return super.getNextCellHint(currentKV);
-  }
-
-  @Override
-  public boolean isFamilyEssential(byte[] name) throws IOException {
-    if (cellFilter != null) {
-      return cellFilter.isFamilyEssential(name);
-    }
-    return super.isFamilyEssential(name);
-  }
-
-  private boolean isColumnDelete(Cell cell) {
-    return !TxUtils.isPreExistingVersion(cell.getTimestamp()) && cell.getValueLength() == 0 && !allowEmptyValues;
-  }
-
-  private static final class DeleteTracker {
-    private long familyDeleteTs;
-
-    public static boolean isFamilyDelete(Cell cell) {
-      return !TxUtils.isPreExistingVersion(cell.getTimestamp()) &&
-        CellUtil.matchingQualifier(cell, TxConstants.FAMILY_DELETE_QUALIFIER) &&
-        CellUtil.matchingValue(cell, HConstants.EMPTY_BYTE_ARRAY);
-    }
-
-    public void addFamilyDelete(Cell delete) {
-      this.familyDeleteTs = delete.getTimestamp();
-    }
-
-    public boolean isDeleted(Cell cell) {
-      return cell.getTimestamp() <= familyDeleteTs;
-    }
-
-    public void reset() {
-      this.familyDeleteTs = 0;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2246abff/tephra-hbase-compat-1.1/src/test/java/org/apache/tephra/hbase/HBase11ConfigurationProviderTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1/src/test/java/org/apache/tephra/hbase/HBase11ConfigurationProviderTest.java b/tephra-hbase-compat-1.1/src/test/java/org/apache/tephra/hbase/HBase11ConfigurationProviderTest.java
deleted file mode 100644
index cac80ec..0000000
--- a/tephra-hbase-compat-1.1/src/test/java/org/apache/tephra/hbase/HBase11ConfigurationProviderTest.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tephra.hbase;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.tephra.util.AbstractConfigurationProviderTest;
-import org.apache.tephra.util.HBaseVersion;
-
-import java.util.Collection;
-
-/**
- * Test for HBase 1.1 and HBase 1.2 versions specific behavior.
- */
-public class HBase11ConfigurationProviderTest extends AbstractConfigurationProviderTest {
-  @Override
-  protected Collection<HBaseVersion.Version> getExpectedVersions() {
-    return ImmutableList.of(HBaseVersion.Version.HBASE_11, HBaseVersion.Version.HBASE_12);
-  }
-}