You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by an...@apache.org on 2017/09/06 08:01:15 UTC
[4/5] incubator-tephra git commit: Support for HBase 1.3.x
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8f958edb/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
new file mode 100644
index 0000000..40e2c37
--- /dev/null
+++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.coprocessor;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.util.TxUtils;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+/**
+ * Applies filtering of data based on transactional visibility (HBase 1.3 specific version).
+ * Note: this is intended for server-side use only, as additional properties need to be set on
+ * any {@code Scan} or {@code Get} operation performed.
+ */
+public class TransactionVisibilityFilter extends FilterBase {
+ private final Transaction tx;
+ // oldest visible timestamp by column family, used to apply TTL when reading
+ private final Map<ImmutableBytesWritable, Long> oldestTsByFamily;
+ // if false, empty values will be interpreted as deletes
+ private final boolean allowEmptyValues;
+ // whether or not we can remove delete markers
+ // these can only be safely removed when we are traversing all storefiles
+ private final boolean clearDeletes;
+ // optional sub-filter to apply to visible cells
+ private final Filter cellFilter;
+ // since we traverse KVs in order, cache the current oldest TS to avoid map lookups per KV
+ private final ImmutableBytesWritable currentFamily = new ImmutableBytesWritable(HConstants.EMPTY_BYTE_ARRAY);
+
+ private long currentOldestTs;
+
+ private DeleteTracker deleteTracker = new DeleteTracker();
+
+ /**
+ * Creates a new {@link org.apache.hadoop.hbase.filter.Filter} for returning data only from visible transactions.
+ *
+ * @param tx the current transaction to apply. Only data visible to this transaction will be returned.
+ * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
+ * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
+ * these will be interpreted as "delete" markers and the column will be filtered out
+ * @param scanType the type of scan operation being performed
+ */
+ public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
+ ScanType scanType) {
+ this(tx, ttlByFamily, allowEmptyValues, scanType, null);
+ }
+
+ /**
+ * Creates a new {@link org.apache.hadoop.hbase.filter.Filter} for returning data only from visible transactions.
+ *
+ * @param tx the current transaction to apply. Only data visible to this transaction will be returned.
+ * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
+ * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
+ * these will be interpreted as "delete" markers and the column will be filtered out
+ * @param scanType the type of scan operation being performed
+ * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by
+ * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then
+ * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead.
+ */
+ public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
+ ScanType scanType, @Nullable Filter cellFilter) {
+ this.tx = tx;
+ this.oldestTsByFamily = Maps.newTreeMap();
+ for (Map.Entry<byte[], Long> ttlEntry : ttlByFamily.entrySet()) {
+ long familyTTL = ttlEntry.getValue();
+ oldestTsByFamily.put(new ImmutableBytesWritable(ttlEntry.getKey()),
+ familyTTL <= 0 ? 0 : tx.getVisibilityUpperBound() - familyTTL * TxConstants.MAX_TX_PER_MS);
+ }
+ this.allowEmptyValues = allowEmptyValues;
+ this.clearDeletes =
+ scanType == ScanType.COMPACT_DROP_DELETES ||
+ (scanType == ScanType.USER_SCAN && tx.getVisibilityLevel() != Transaction.VisibilityLevel.SNAPSHOT_ALL);
+ this.cellFilter = cellFilter;
+ }
+
+ @Override
+ public ReturnCode filterKeyValue(Cell cell) throws IOException {
+ if (!CellUtil.matchingFamily(cell, currentFamily.get(), currentFamily.getOffset(), currentFamily.getLength())) {
+ // column family changed
+ currentFamily.set(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
+ Long familyOldestTs = oldestTsByFamily.get(currentFamily);
+ currentOldestTs = familyOldestTs != null ? familyOldestTs : 0;
+ deleteTracker.reset();
+ }
+ // need to apply TTL for the column family here
+ long kvTimestamp = cell.getTimestamp();
+ if (TxUtils.getTimestampForTTL(kvTimestamp) < currentOldestTs) {
+ // passed TTL for this column, seek to next
+ return ReturnCode.NEXT_COL;
+ } else if (tx.isVisible(kvTimestamp)) {
+ // Return all writes done by current transaction (including deletes) for VisibilityLevel.SNAPSHOT_ALL
+ if (tx.getVisibilityLevel() == Transaction.VisibilityLevel.SNAPSHOT_ALL && tx.isCurrentWrite(kvTimestamp)) {
+ // cell is visible
+ // visibility SNAPSHOT_ALL needs all matches
+ return runSubFilter(ReturnCode.INCLUDE, cell);
+ }
+ if (DeleteTracker.isFamilyDelete(cell)) {
+ deleteTracker.addFamilyDelete(cell);
+ if (clearDeletes) {
+ return ReturnCode.NEXT_COL;
+ } else {
+ // cell is visible
+ // as soon as we find a KV to include we can move to the next column
+ return runSubFilter(ReturnCode.INCLUDE_AND_NEXT_COL, cell);
+ }
+ }
+ // check if masked by family delete
+ if (deleteTracker.isDeleted(cell)) {
+ return ReturnCode.NEXT_COL;
+ }
+ // check for column delete
+ if (isColumnDelete(cell)) {
+ if (clearDeletes) {
+ // skip "deleted" cell
+ return ReturnCode.NEXT_COL;
+ } else {
+ // keep the marker but skip any remaining versions
+ return runSubFilter(ReturnCode.INCLUDE_AND_NEXT_COL, cell);
+ }
+ }
+ // cell is visible
+ // as soon as we find a KV to include we can move to the next column
+ return runSubFilter(ReturnCode.INCLUDE_AND_NEXT_COL, cell);
+ } else {
+ return ReturnCode.SKIP;
+ }
+ }
+
+ private ReturnCode runSubFilter(ReturnCode txFilterCode, Cell cell) throws IOException {
+ if (cellFilter != null) {
+ ReturnCode subFilterCode = cellFilter.filterKeyValue(cell);
+ return determineReturnCode(txFilterCode, subFilterCode);
+ }
+ return txFilterCode;
+ }
+
+ /**
+ * Determines the return code of TransactionVisibilityFilter based on sub-filter's return code.
+ * Sub-filter can only exclude cells included by TransactionVisibilityFilter, i.e., sub-filter's
+ * INCLUDE will be ignored. This behavior makes sure that sub-filter only sees cell versions valid for the
+ * given transaction. If sub-filter needs to see older versions of cell, then this method can be overridden.
+ *
+ * @param txFilterCode return code from TransactionVisibilityFilter
+ * @param subFilterCode return code from sub-filter
+ * @return final return code
+ */
+ protected ReturnCode determineReturnCode(ReturnCode txFilterCode, ReturnCode subFilterCode) {
+ // Return the more restrictive of the two filter responses
+ switch (subFilterCode) {
+ case INCLUDE:
+ return txFilterCode;
+ case INCLUDE_AND_NEXT_COL:
+ return ReturnCode.INCLUDE_AND_NEXT_COL;
+ case SKIP:
+ return txFilterCode == ReturnCode.INCLUDE ? ReturnCode.SKIP : ReturnCode.NEXT_COL;
+ default:
+ return subFilterCode;
+ }
+ }
+
+ @Override
+ public boolean filterRow() throws IOException {
+ if (cellFilter != null) {
+ return cellFilter.filterRow();
+ }
+ return super.filterRow();
+ }
+
+ @Override
+ public Cell transformCell(Cell cell) throws IOException {
+ // Convert Tephra deletes back into HBase deletes
+ if (tx.getVisibilityLevel() == Transaction.VisibilityLevel.SNAPSHOT_ALL) {
+ if (DeleteTracker.isFamilyDelete(cell)) {
+ return new KeyValue(CellUtil.cloneRow(cell), CellUtil.cloneFamily(cell), null, cell.getTimestamp(),
+ KeyValue.Type.DeleteFamily);
+ } else if (isColumnDelete(cell)) {
+ // Note: in some cases KeyValue.Type.Delete is used in Delete object,
+ // and in some other cases KeyValue.Type.DeleteColumn is used.
+ // Since Tephra cannot distinguish between the two, we return KeyValue.Type.DeleteColumn.
+ // KeyValue.Type.DeleteColumn makes both CellUtil.isDelete and CellUtil.isDeleteColumns return true, and will
+ // work in both cases.
+ return new KeyValue(CellUtil.cloneRow(cell), CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell),
+ cell.getTimestamp(), KeyValue.Type.DeleteColumn);
+ }
+ }
+ return cell;
+ }
+
+ @Override
+ public void reset() throws IOException {
+ deleteTracker.reset();
+ if (cellFilter != null) {
+ cellFilter.reset();
+ }
+ }
+
+ @Override
+ public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException {
+ if (cellFilter != null) {
+ return cellFilter.filterRowKey(buffer, offset, length);
+ }
+ return super.filterRowKey(buffer, offset, length);
+ }
+
+ @Override
+ public boolean filterAllRemaining() throws IOException {
+ if (cellFilter != null) {
+ return cellFilter.filterAllRemaining();
+ }
+ return super.filterAllRemaining();
+ }
+
+ @Override
+ public void filterRowCells(List<Cell> kvs) throws IOException {
+ if (cellFilter != null) {
+ cellFilter.filterRowCells(kvs);
+ } else {
+ super.filterRowCells(kvs);
+ }
+ }
+
+ @Override
+ public boolean hasFilterRow() {
+ if (cellFilter != null) {
+ return cellFilter.hasFilterRow();
+ }
+ return super.hasFilterRow();
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public KeyValue getNextKeyHint(KeyValue currentKV) throws IOException {
+ if (cellFilter != null) {
+ return cellFilter.getNextKeyHint(currentKV);
+ }
+ return super.getNextKeyHint(currentKV);
+ }
+
+ @Override
+ public Cell getNextCellHint(Cell currentKV) throws IOException {
+ if (cellFilter != null) {
+ return cellFilter.getNextCellHint(currentKV);
+ }
+ return super.getNextCellHint(currentKV);
+ }
+
+ @Override
+ public boolean isFamilyEssential(byte[] name) throws IOException {
+ if (cellFilter != null) {
+ return cellFilter.isFamilyEssential(name);
+ }
+ return super.isFamilyEssential(name);
+ }
+
+ private boolean isColumnDelete(Cell cell) {
+ return !TxUtils.isPreExistingVersion(cell.getTimestamp()) && cell.getValueLength() == 0 && !allowEmptyValues;
+ }
+
+ private static final class DeleteTracker {
+ private long familyDeleteTs;
+ private byte[] rowKey;
+
+ public static boolean isFamilyDelete(Cell cell) {
+ return !TxUtils.isPreExistingVersion(cell.getTimestamp()) &&
+ CellUtil.matchingQualifier(cell, TxConstants.FAMILY_DELETE_QUALIFIER) &&
+ CellUtil.matchingValue(cell, HConstants.EMPTY_BYTE_ARRAY);
+ }
+
+ public void addFamilyDelete(Cell delete) {
+ this.familyDeleteTs = delete.getTimestamp();
+ this.rowKey = Bytes.copy(delete.getRowArray(), delete.getRowOffset(), delete.getRowLength());
+ }
+
+ public boolean isDeleted(Cell cell) {
+ return rowKey != null && Bytes.compareTo(cell.getRowArray(), cell.getRowOffset(),
+ cell.getRowLength(), rowKey, 0, rowKey.length) == 0 && cell.getTimestamp() <= familyDeleteTs;
+ }
+
+ public void reset() {
+ this.familyDeleteTs = 0;
+ this.rowKey = null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8f958edb/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
new file mode 100644
index 0000000..9b856d9
--- /dev/null
+++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.persist.TransactionVisibilityState;
+import org.apache.tephra.util.TxUtils;
+
+import java.io.IOException;
+import javax.annotation.Nullable;
+
+/**
+ * Record compaction state for invalid list pruning
+ */
+public class CompactionState {
+ private static final Log LOG = LogFactory.getLog(CompactionState.class);
+
+ private final byte[] regionName;
+ private final String regionNameAsString;
+ private final PruneUpperBoundWriterSupplier pruneUpperBoundWriterSupplier;
+ private final PruneUpperBoundWriter pruneUpperBoundWriter;
+
+ private volatile long pruneUpperBound = -1;
+
+ public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable, long pruneFlushInterval) {
+ this.regionName = env.getRegionInfo().getRegionName();
+ this.regionNameAsString = env.getRegionInfo().getRegionNameAsString();
+ DataJanitorState dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
+ @Override
+ public Table get() throws IOException {
+ return env.getTable(stateTable);
+ }
+ });
+ this.pruneUpperBoundWriterSupplier = new PruneUpperBoundWriterSupplier(stateTable, dataJanitorState,
+ pruneFlushInterval);
+ this.pruneUpperBoundWriter = pruneUpperBoundWriterSupplier.get();
+ }
+
+ /**
+ * Records the transaction state used for a compaction. This method is called when the compaction starts.
+ *
+ * @param request {@link CompactionRequest} for the compaction
+ * @param snapshot transaction state that will be used for the compaction
+ */
+ public void record(CompactionRequest request, @Nullable TransactionVisibilityState snapshot) {
+ if (request.isMajor() && snapshot != null) {
+ Transaction tx = TxUtils.createDummyTransaction(snapshot);
+ pruneUpperBound = TxUtils.getPruneUpperBound(tx);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ String.format("Computed prune upper bound %s for compaction request %s using transaction state from time %s",
+ pruneUpperBound, request, snapshot.getTimestamp()));
+ }
+ } else {
+ pruneUpperBound = -1;
+ }
+ }
+
+ /**
+ * Persists the transaction state recorded by {@link #record(CompactionRequest, TransactionVisibilityState)}.
+ * This method is called after the compaction has successfully completed.
+ */
+ public void persist() {
+ if (pruneUpperBound != -1) {
+ pruneUpperBoundWriter.persistPruneEntry(regionName, pruneUpperBound);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Enqueued prune upper bound %s for region %s", pruneUpperBound, regionNameAsString));
+ }
+ }
+ }
+
+ /**
+ * Persist that the given region is empty at the given time
+ * @param time time in milliseconds
+ */
+ public void persistRegionEmpty(long time) {
+ pruneUpperBoundWriter.persistRegionEmpty(regionName, time);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Enqueued empty region %s at time %s", regionNameAsString, time));
+ }
+ }
+
+ /**
+ * Releases the usage {@link PruneUpperBoundWriter}.
+ */
+ public void stop() {
+ pruneUpperBoundWriterSupplier.release();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8f958edb/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
new file mode 100644
index 0000000..db59d7d
--- /dev/null
+++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
@@ -0,0 +1,536 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.apache.tephra.txprune.RegionPruneInfo;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import javax.annotation.Nullable;
+
+/**
+ * Persist data janitor state into an HBase table.
+ * This is used by both {@link TransactionProcessor} and by the {@link HBaseTransactionPruningPlugin}
+ * to persist and read the compaction state.
+ */
+@SuppressWarnings("WeakerAccess")
+public class DataJanitorState {
+ private static final Log LOG = LogFactory.getLog(DataJanitorState.class);
+
+ public static final byte[] FAMILY = {'f'};
+ public static final byte[] PRUNE_UPPER_BOUND_COL = {'p'};
+
+ private static final byte[] REGION_TIME_COL = {'r'};
+ private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_COL = {'i'};
+ private static final byte[] EMPTY_REGION_TIME_COL = {'e'};
+
+ private static final byte[] REGION_KEY_PREFIX = {0x1};
+ private static final byte[] REGION_KEY_PREFIX_STOP = {0x2};
+
+ private static final byte[] REGION_TIME_KEY_PREFIX = {0x2};
+ private static final byte[] REGION_TIME_KEY_PREFIX_STOP = {0x3};
+
+ private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX = {0x3};
+ private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX_STOP = {0x4};
+
+ private static final byte[] EMPTY_REGION_TIME_KEY_PREFIX = {0x4};
+ private static final byte[] EMPTY_REGION_TIME_KEY_PREFIX_STOP = {0x5};
+
+ private static final byte[] REGION_TIME_COUNT_KEY_PREFIX = {0x5};
+ private static final byte[] REGION_TIME_COUNT_KEY_PREFIX_STOP = {0x6};
+
+ private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
+ // This value can be used when we don't care about the value we write in a column
+ private static final byte[] COL_VAL = Bytes.toBytes('1');
+
+ private final TableSupplier stateTableSupplier;
+
+
+ public DataJanitorState(TableSupplier stateTableSupplier) {
+ this.stateTableSupplier = stateTableSupplier;
+ }
+
+ // ----------------------------------------------------------------
+ // ------- Methods for prune upper bound for a given region -------
+ // ----------------------------------------------------------------
+ // The data is stored in the following format -
+ // Key: 0x1<region-id>
+ // Col 'u': <prune upper bound>
+ // ----------------------------------------------------------------
+
+ /**
+ * Persist the latest prune upper bound for a given region. This is called by {@link TransactionProcessor}
+ * after major compaction.
+ *
+ * @param regionId region id
+ * @param pruneUpperBound the latest prune upper bound for the region
+ * @throws IOException when not able to persist the data to HBase
+ */
+ public void savePruneUpperBoundForRegion(byte[] regionId, long pruneUpperBound) throws IOException {
+ try (Table stateTable = stateTableSupplier.get()) {
+ Put put = new Put(makeRegionKey(regionId));
+ put.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL, Bytes.toBytes(pruneUpperBound));
+ stateTable.put(put);
+ }
+ }
+
+ /**
+ * Get latest prune upper bound for a given region. This indicates the largest invalid transaction that no
+ * longer has writes in this region.
+ *
+ * @param regionId region id
+ * @return latest prune upper bound for the region
+ * @throws IOException when not able to read the data from HBase
+ */
+ public long getPruneUpperBoundForRegion(byte[] regionId) throws IOException {
+ RegionPruneInfo regionPruneInfo = getPruneInfoForRegion(regionId);
+ return (regionPruneInfo == null) ? -1 : regionPruneInfo.getPruneUpperBound();
+ }
+
+ /**
+ * Get the latest {@link RegionPruneInfo} for a given region.
+ *
+ * @param regionId region id
+ * @return {@link RegionPruneInfo} for the region
+ * @throws IOException when not able to read the data from HBase
+ */
+ @Nullable
+ public RegionPruneInfo getPruneInfoForRegion(byte[] regionId) throws IOException {
+ try (Table stateTable = stateTableSupplier.get()) {
+ Get get = new Get(makeRegionKey(regionId));
+ get.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
+ Cell cell = stateTable.get(get).getColumnLatestCell(FAMILY, PRUNE_UPPER_BOUND_COL);
+ if (cell == null) {
+ return null;
+ }
+ byte[] pruneUpperBoundBytes = CellUtil.cloneValue(cell);
+ long timestamp = cell.getTimestamp();
+ return new RegionPruneInfo(regionId, Bytes.toStringBinary(regionId),
+ Bytes.toLong(pruneUpperBoundBytes), timestamp);
+ }
+ }
+
+ /**
+ * Get latest prune upper bounds for given regions. This is a batch operation of method
+ * {@link #getPruneUpperBoundForRegion(byte[])}
+ *
+ * @param regions a set of regions
+ * @return a map containing region id and its latest prune upper bound value
+ * @throws IOException when not able to read the data from HBase
+ */
+ public Map<byte[], Long> getPruneUpperBoundForRegions(SortedSet<byte[]> regions) throws IOException {
+ Map<byte[], Long> resultMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ List<RegionPruneInfo> regionPruneInfos = getPruneInfoForRegions(regions);
+ for (RegionPruneInfo regionPruneInfo : regionPruneInfos) {
+ resultMap.put(regionPruneInfo.getRegionName(), regionPruneInfo.getPruneUpperBound());
+ }
+ return Collections.unmodifiableMap(resultMap);
+ }
+
+ /**
+ * Gets a list of {@link RegionPruneInfo} for given regions. Returns all regions if the given regions set is null.
+ *
+ * @param regions a set of regions
+ * @return list of {@link RegionPruneInfo}s.
+ * @throws IOException when not able to read the data from HBase
+ */
+ public List<RegionPruneInfo> getPruneInfoForRegions(@Nullable SortedSet<byte[]> regions) throws IOException {
+ List<RegionPruneInfo> regionPruneInfos = new ArrayList<>();
+ try (Table stateTable = stateTableSupplier.get()) {
+ byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
+ Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
+ scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
+
+ try (ResultScanner scanner = stateTable.getScanner(scan)) {
+ Result next;
+ while ((next = scanner.next()) != null) {
+ byte[] region = getRegionFromKey(next.getRow());
+ if (regions == null || regions.contains(region)) {
+ Cell cell = next.getColumnLatestCell(FAMILY, PRUNE_UPPER_BOUND_COL);
+ if (cell != null) {
+ byte[] pruneUpperBoundBytes = CellUtil.cloneValue(cell);
+ long timestamp = cell.getTimestamp();
+ regionPruneInfos.add(new RegionPruneInfo(region, Bytes.toStringBinary(region),
+ Bytes.toLong(pruneUpperBoundBytes), timestamp));
+ }
+ }
+ }
+ }
+ }
+ return Collections.unmodifiableList(regionPruneInfos);
+ }
+
+ /**
+ * Delete prune upper bounds for the regions that are not in the given exclude set, and the
+ * prune upper bound is less than the given value.
+ * After the invalid list is pruned up to deletionPruneUpperBound, we do not need entries for regions that have
+ * prune upper bound less than deletionPruneUpperBound. We however limit the deletion to only regions that are
+ * no longer in existence (due to deletion, etc.), to avoid update/delete race conditions.
+ *
+ * @param deletionPruneUpperBound prune upper bound below which regions will be deleted
+ * @param excludeRegions set of regions that should not be deleted
+ * @throws IOException when not able to delete data in HBase
+ */
+ public void deletePruneUpperBounds(long deletionPruneUpperBound, SortedSet<byte[]> excludeRegions)
+ throws IOException {
+ try (Table stateTable = stateTableSupplier.get()) {
+ byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
+ Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
+ scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
+
+ try (ResultScanner scanner = stateTable.getScanner(scan)) {
+ Result next;
+ while ((next = scanner.next()) != null) {
+ byte[] region = getRegionFromKey(next.getRow());
+ if (!excludeRegions.contains(region)) {
+ byte[] timeBytes = next.getValue(FAMILY, PRUNE_UPPER_BOUND_COL);
+ if (timeBytes != null) {
+ long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
+ if (pruneUpperBoundRegion < deletionPruneUpperBound) {
+ stateTable.delete(new Delete(next.getRow()));
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ // ---------------------------------------------------
+ // ------- Methods for regions at a given time -------
+ // ---------------------------------------------------
+ // Key: 0x2<inverted time><region-id>
+ // Col 't': <empty byte array>
+ // ---------------------------------------------------
+
+ /**
+ * Persist the regions for the given time. {@link HBaseTransactionPruningPlugin} saves the set of
+ * transactional regions existing in the HBase instance periodically.
+ *
+ * @param time timestamp in milliseconds
+ * @param regions set of regions at the time
+ * @throws IOException when not able to persist the data to HBase
+ */
+ public void saveRegionsForTime(long time, Set<byte[]> regions) throws IOException {
+ byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
+ try (Table stateTable = stateTableSupplier.get()) {
+ for (byte[] region : regions) {
+ Put put = new Put(makeTimeRegionKey(timeBytes, region));
+ put.addColumn(FAMILY, REGION_TIME_COL, COL_VAL);
+ stateTable.put(put);
+ }
+
+ // Save the count of regions as a checksum
+ saveRegionCountForTime(stateTable, timeBytes, regions.size());
+ }
+ }
+
+ @VisibleForTesting
+ void saveRegionCountForTime(Table stateTable, byte[] timeBytes, int count) throws IOException {
+ Put put = new Put(makeTimeRegionCountKey(timeBytes));
+ put.addColumn(FAMILY, REGION_TIME_COL, Bytes.toBytes(count));
+ stateTable.put(put);
+ }
+
+ /**
+ * Return the set of regions saved for the time at or before the given time. This method finds the greatest time
+ * that is less than or equal to the given time, and then returns all regions with that exact time, but none that are
+ * older than that.
+ *
+ * @param time timestamp in milliseconds
+ * @return set of regions and time at which they were recorded, or null if no regions found
+ * @throws IOException when not able to read the data from HBase
+ */
+ @Nullable
+ public TimeRegions getRegionsOnOrBeforeTime(long time) throws IOException {
+ try (Table stateTable = stateTableSupplier.get()) {
+ TimeRegions timeRegions;
+ while ((timeRegions = getNextSetOfTimeRegions(stateTable, time)) != null) {
+ int count = getRegionCountForTime(stateTable, timeRegions.getTime());
+ if (count != -1 && count == timeRegions.getRegions().size()) {
+ return timeRegions;
+ } else {
+ LOG.warn(String.format("Got incorrect count for regions saved at time %s, expected = %s but actual = %s",
+ timeRegions.getTime(), count, timeRegions.getRegions().size()));
+ time = timeRegions.getTime() - 1;
+ }
+ }
+ return null;
+ }
+ }
+
+ @Nullable
+ private TimeRegions getNextSetOfTimeRegions(Table stateTable, long time) throws IOException {
+ byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
+ Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
+ scan.addColumn(FAMILY, REGION_TIME_COL);
+
+
+ long currentRegionTime = -1;
+ SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+ Result next;
+ try (ResultScanner scanner = stateTable.getScanner(scan)) {
+ while ((next = scanner.next()) != null) {
+ Map.Entry<Long, byte[]> timeRegion = getTimeRegion(next.getRow());
+ // Stop if reached next time value
+ if (currentRegionTime == -1) {
+ currentRegionTime = timeRegion.getKey();
+ } else if (timeRegion.getKey() < currentRegionTime) {
+ break;
+ } else if (timeRegion.getKey() > currentRegionTime) {
+ throw new IllegalStateException(
+ String.format("Got out of order time %d when expecting time less than or equal to %d",
+ timeRegion.getKey(), currentRegionTime));
+ }
+ regions.add(timeRegion.getValue());
+ }
+ }
+ return regions.isEmpty() ? null : new TimeRegions(currentRegionTime, Collections.unmodifiableSortedSet(regions));
+ }
+
+ @VisibleForTesting
+ int getRegionCountForTime(Table stateTable, long time) throws IOException {
+ Get get = new Get(makeTimeRegionCountKey(Bytes.toBytes(getInvertedTime(time))));
+ get.addColumn(FAMILY, REGION_TIME_COL);
+ Result result = stateTable.get(get);
+ byte[] value = result.getValue(FAMILY, REGION_TIME_COL);
+ return value == null ? -1 : Bytes.toInt(value);
+ }
+
+ /**
+ * Delete all the regions that were recorded for all times equal or less than the given time.
+ *
+ * @param time timestamp in milliseconds
+ * @throws IOException when not able to delete data in HBase
+ */
+ public void deleteAllRegionsOnOrBeforeTime(long time) throws IOException {
+ byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
+ try (Table stateTable = stateTableSupplier.get()) {
+ // Delete the regions
+ Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
+ scan.addColumn(FAMILY, REGION_TIME_COL);
+ deleteFromScan(stateTable, scan);
+
+ // Delete the count
+ scan = new Scan(makeTimeRegionCountKey(timeBytes), REGION_TIME_COUNT_KEY_PREFIX_STOP);
+ scan.addColumn(FAMILY, REGION_TIME_COL);
+ deleteFromScan(stateTable, scan);
+ }
+ }
+
+ // ---------------------------------------------------------------------
+ // ------- Methods for inactive transaction bound for given time -------
+ // ---------------------------------------------------------------------
+ // Key: 0x3<inverted time>
+ // Col 'p': <inactive transaction bound>
+ // ---------------------------------------------------------------------
+
+ /**
+ * Persist inactive transaction bound for a given time. This is the smallest not in-progress transaction that
+ * will not have writes in any HBase regions that are created after the given time.
+ *
+ * @param time time in milliseconds
+ * @param inactiveTransactionBound inactive transaction bound for the given time
+ * @throws IOException when not able to persist the data to HBase
+ */
+ public void saveInactiveTransactionBoundForTime(long time, long inactiveTransactionBound) throws IOException {
+ try (Table stateTable = stateTableSupplier.get()) {
+ Put put = new Put(makeInactiveTransactionBoundTimeKey(Bytes.toBytes(getInvertedTime(time))));
+ put.addColumn(FAMILY, INACTIVE_TRANSACTION_BOUND_TIME_COL, Bytes.toBytes(inactiveTransactionBound));
+ stateTable.put(put);
+ }
+ }
+
+ /**
+ * Return inactive transaction bound for the given time.
+ *
+ * @param time time in milliseconds
+ * @return inactive transaction bound for the given time
+ * @throws IOException when not able to read the data from HBase
+ */
+ public long getInactiveTransactionBoundForTime(long time) throws IOException {
+ try (Table stateTable = stateTableSupplier.get()) {
+ Get get = new Get(makeInactiveTransactionBoundTimeKey(Bytes.toBytes(getInvertedTime(time))));
+ get.addColumn(FAMILY, INACTIVE_TRANSACTION_BOUND_TIME_COL);
+ byte[] result = stateTable.get(get).getValue(FAMILY, INACTIVE_TRANSACTION_BOUND_TIME_COL);
+ return result == null ? -1 : Bytes.toLong(result);
+ }
+ }
+
+ /**
+ * Delete all inactive transaction bounds recorded for a time less than the given time
+ *
+ * @param time time in milliseconds
+ * @throws IOException when not able to delete data in HBase
+ */
+ public void deleteInactiveTransactionBoundsOnOrBeforeTime(long time) throws IOException {
+ try (Table stateTable = stateTableSupplier.get()) {
+ Scan scan = new Scan(makeInactiveTransactionBoundTimeKey(Bytes.toBytes(getInvertedTime(time))),
+ INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX_STOP);
+ scan.addColumn(FAMILY, INACTIVE_TRANSACTION_BOUND_TIME_COL);
+ deleteFromScan(stateTable, scan);
+ }
+ }
+
+ // --------------------------------------------------------
+ // ------- Methods for empty regions at a given time -------
+ // --------------------------------------------------------
+ // Key: 0x4<time><region-id>
+ // Col 'e': <empty byte array>
+ // --------------------------------------------------------
+
+ /**
+ * Save the given region as empty as of the given time.
+ *
+ * @param time time in milliseconds
+ * @param regionId region id
+ */
+ public void saveEmptyRegionForTime(long time, byte[] regionId) throws IOException {
+ byte[] timeBytes = Bytes.toBytes(time);
+ try (Table stateTable = stateTableSupplier.get()) {
+ Put put = new Put(makeEmptyRegionTimeKey(timeBytes, regionId));
+ put.addColumn(FAMILY, EMPTY_REGION_TIME_COL, COL_VAL);
+ stateTable.put(put);
+ }
+ }
+
+ /**
+ * Return regions that were recorded as empty after the given time.
+ *
+ * @param time time in milliseconds
+ * @param includeRegions If not null, the returned set will be an intersection of the includeRegions set
+ * and the empty regions after the given time
+ */
+ public SortedSet<byte[]> getEmptyRegionsAfterTime(long time, @Nullable SortedSet<byte[]> includeRegions)
+ throws IOException {
+ SortedSet<byte[]> emptyRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+ try (Table stateTable = stateTableSupplier.get()) {
+ Scan scan = new Scan(makeEmptyRegionTimeKey(Bytes.toBytes(time + 1), EMPTY_BYTE_ARRAY),
+ EMPTY_REGION_TIME_KEY_PREFIX_STOP);
+ scan.addColumn(FAMILY, EMPTY_REGION_TIME_COL);
+
+ try (ResultScanner scanner = stateTable.getScanner(scan)) {
+ Result next;
+ while ((next = scanner.next()) != null) {
+ byte[] emptyRegion = getEmptyRegionFromKey(next.getRow());
+ if (includeRegions == null || includeRegions.contains(emptyRegion)) {
+ emptyRegions.add(emptyRegion);
+ }
+ }
+ }
+ }
+ return Collections.unmodifiableSortedSet(emptyRegions);
+ }
+
+ /**
+ * Delete empty region records saved on or before the given time.
+ *
+ * @param time time in milliseconds
+ */
+ public void deleteEmptyRegionsOnOrBeforeTime(long time) throws IOException {
+ try (Table stateTable = stateTableSupplier.get()) {
+ Scan scan = new Scan();
+ scan.setStopRow(makeEmptyRegionTimeKey(Bytes.toBytes(time + 1), EMPTY_BYTE_ARRAY));
+ scan.addColumn(FAMILY, EMPTY_REGION_TIME_COL);
+ deleteFromScan(stateTable, scan);
+ }
+ }
+
+ @VisibleForTesting
+ void deleteFromScan(Table stateTable, Scan scan) throws IOException {
+ try (ResultScanner scanner = stateTable.getScanner(scan)) {
+ Result next;
+ while ((next = scanner.next()) != null) {
+ stateTable.delete(new Delete(next.getRow()));
+ }
+ }
+ }
+
+ private byte[] makeRegionKey(byte[] regionId) {
+ return Bytes.add(REGION_KEY_PREFIX, regionId);
+ }
+
+ private byte[] getRegionFromKey(byte[] regionKey) {
+ int prefixLen = REGION_KEY_PREFIX.length;
+ return Bytes.copy(regionKey, prefixLen, regionKey.length - prefixLen);
+ }
+
+ private byte[] makeTimeRegionKey(byte[] time, byte[] regionId) {
+ return Bytes.add(REGION_TIME_KEY_PREFIX, time, regionId);
+ }
+
+ private byte[] makeTimeRegionCountKey(byte[] time) {
+ return Bytes.add(REGION_TIME_COUNT_KEY_PREFIX, time);
+ }
+
+ private byte[] makeInactiveTransactionBoundTimeKey(byte[] time) {
+ return Bytes.add(INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX, time);
+ }
+
+ private Map.Entry<Long, byte[]> getTimeRegion(byte[] key) {
+ int offset = REGION_TIME_KEY_PREFIX.length;
+ long time = getInvertedTime(Bytes.toLong(key, offset));
+ offset += Bytes.SIZEOF_LONG;
+ byte[] regionName = Bytes.copy(key, offset, key.length - offset);
+ return Maps.immutableEntry(time, regionName);
+ }
+
+ private byte[] makeEmptyRegionTimeKey(byte[] time, byte[] regionId) {
+ return Bytes.add(EMPTY_REGION_TIME_KEY_PREFIX, time, regionId);
+ }
+
+ private byte[] getEmptyRegionFromKey(byte[] key) {
+ int prefixLen = EMPTY_REGION_TIME_KEY_PREFIX.length + Bytes.SIZEOF_LONG;
+ return Bytes.copy(key, prefixLen, key.length - prefixLen);
+ }
+
+ private long getInvertedTime(long time) {
+ return Long.MAX_VALUE - time;
+ }
+
+ /**
+ * Supplies table for persisting state
+ */
+ public interface TableSupplier {
+ Table get() throws IOException;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8f958edb/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
new file mode 100644
index 0000000..84c480a
--- /dev/null
+++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableExistsException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.apache.tephra.txprune.TransactionPruningPlugin;
+import org.apache.tephra.util.TxUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+/**
+ * Default implementation of the {@link TransactionPruningPlugin} for HBase.
+ *
+ * This plugin determines the prune upper bound for transactional HBase tables that use
+ * coprocessor {@link TransactionProcessor}.
+ *
+ * <h3>State storage:</h3>
+ *
+ * This plugin expects the TransactionProcessor to save the prune upper bound for invalid transactions
+ * after every major compaction of a region. Let's call this <i>(region, prune upper bound)</i>.
+ * In addition, the plugin also persists the following information on a run at time <i>t</i>
+ * <ul>
+ * <li>
+ * <i>(t, set of regions)</i>: Set of transactional regions at time <i>t</i>.
+ * Transactional regions are regions of the tables that have the coprocessor TransactionProcessor
+ * attached to them.
+ * </li>
+ * <li>
+ * <i>(t, inactive transaction bound)</i>: This is the smallest not in-progress transaction that
+ * will not have writes in any HBase regions that are created after time <i>t</i>.
+ * This value is determined by the Transaction Service based on the transaction state at time <i>t</i>
+ * and passed on to the plugin.
+ * </li>
+ * </ul>
+ *
+ * <h3>Computing prune upper bound:</h3>
+ *
+ * In a typical HBase instance, there can be a constant change in the number of regions due to region creations,
+ * splits and merges. At any given time there can always be a region on which a major compaction has not been run.
+ * Since the prune upper bound will get recorded for a region only after a major compaction,
+ * using only the latest set of regions we may not be able to find the
+ * prune upper bounds for all the current regions. Hence we persist the set of regions that exist at that time
+ * of each run of the plugin, and use historical region set for time <i>t</i>, <i>t - 1</i>, etc.
+ * to determine the prune upper bound.
+ *
+ * From the regions saved at time <i>t</i>, <i>t - 1</i>, etc.,
+ * the plugin tries to find the latest <i>(t, set of regions)</i> where all regions have been major compacted,
+ * i.e, all regions have prune upper bound recorded in <i>(region, prune upper bound)</i>.
+ * <br/>
+ * If such a set is found for time <i>t1</i>, the prune upper bound returned by the plugin is the minimum of
+ * <ul>
+ * <li>Prune upper bounds of regions in set <i>(t1, set of regions)</i></li>
+ * <li>Inactive transaction bound from <i>(t1, inactive transaction bound)</i></li>
+ * </ul>
+ *
+ * <p/>
+ * Above, when we find <i>(t1, set of regions)</i>, there may a region that was created after time <i>t1</i>,
+ * but has a data write from an invalid transaction that is smaller than the prune upper bounds of all
+ * regions in <i>(t1, set of regions)</i>. This is possible because <i>(region, prune upper bound)</i> persisted by
+ * TransactionProcessor is always the latest prune upper bound for a region.
+ * <br/>
+ * However a region created after time <i>t1</i> cannot have writes from an invalid transaction that is smaller than
+ * inactive transaction bound at the time the region was created.
+ * Since we limit the plugin prune upper bound using <i>(t1, inactive transaction bound)</i>,
+ * there should be no invalid transactions smaller than the plugin prune upper bound with writes in any
+ * transactional region of this HBase instance.
+ *
+ * <p/>
+ * Note: If your tables uses a transactional coprocessor other than TransactionProcessor,
+ * then you may need to write a new plugin to compute prune upper bound for those tables.
+ */
+@SuppressWarnings("WeakerAccess")
+public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
+ public static final Logger LOG = LoggerFactory.getLogger(HBaseTransactionPruningPlugin.class);
+
+ protected Configuration conf;
+ protected Connection connection;
+ protected DataJanitorState dataJanitorState;
+
+ @Override
+ public void initialize(Configuration conf) throws IOException {
+ this.conf = conf;
+ this.connection = ConnectionFactory.createConnection(conf);
+
+ final TableName stateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
+ LOG.info("Initializing plugin with state table {}", stateTable.getNameWithNamespaceInclAsString());
+ createPruneTable(stateTable);
+ this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
+ @Override
+ public Table get() throws IOException {
+ return connection.getTable(stateTable);
+ }
+ });
+ }
+
+ /**
+ * Determines prune upper bound for the data store as mentioned above.
+ */
+ @Override
+ public long fetchPruneUpperBound(long time, long inactiveTransactionBound) throws IOException {
+ LOG.debug("Fetching prune upper bound for time {} and inactive transaction bound {}",
+ time, inactiveTransactionBound);
+ if (time < 0 || inactiveTransactionBound < 0) {
+ return -1;
+ }
+
+ // Get all the current transactional regions
+ SortedSet<byte[]> transactionalRegions = getTransactionalRegions();
+ if (!transactionalRegions.isEmpty()) {
+ LOG.debug("Saving {} transactional regions for time {}", transactionalRegions.size(), time);
+ dataJanitorState.saveRegionsForTime(time, transactionalRegions);
+ // Save inactive transaction bound for time as the final step.
+ // We can then use its existence to make sure that the data for a given time is complete or not
+ LOG.debug("Saving inactive transaction bound {} for time {}", inactiveTransactionBound, time);
+ dataJanitorState.saveInactiveTransactionBoundForTime(time, inactiveTransactionBound);
+ }
+
+ return computePruneUpperBound(new TimeRegions(time, transactionalRegions));
+ }
+
+ /**
+ * After invalid list has been pruned, this cleans up state information that is no longer required.
+ * This includes -
+ * <ul>
+ * <li>
+ * <i>(region, prune upper bound)</i> - prune upper bound for regions that are older
+ * than maxPrunedInvalid
+ * </li>
+ * <li>
+ * <i>(t, set of regions) - Regions set that were recorded on or before the start time
+ * of maxPrunedInvalid
+ * </li>
+ * <li>
+ * (t, inactive transaction bound) - Smallest not in-progress transaction without any writes in new regions
+ * information recorded on or before the start time of maxPrunedInvalid
+ * </li>
+ * </ul>
+ */
+ @Override
+ public void pruneComplete(long time, long maxPrunedInvalid) throws IOException {
+ LOG.debug("Prune complete for time {} and prune upper bound {}", time, maxPrunedInvalid);
+ if (time < 0 || maxPrunedInvalid < 0) {
+ return;
+ }
+
+ // Get regions for the current time, so as to not delete the prune upper bounds for them.
+ // The prune upper bounds for regions are recorded by TransactionProcessor and the deletion
+ // is done by this class. To avoid update/delete race condition, we only delete prune upper
+ // bounds for the stale regions.
+ TimeRegions regionsToExclude = dataJanitorState.getRegionsOnOrBeforeTime(time);
+ if (regionsToExclude != null) {
+ LOG.debug("Deleting prune upper bounds smaller than {} for stale regions", maxPrunedInvalid);
+ dataJanitorState.deletePruneUpperBounds(maxPrunedInvalid, regionsToExclude.getRegions());
+ } else {
+ LOG.warn("Cannot find saved regions on or before time {}", time);
+ }
+ long pruneTime = TxUtils.getTimestamp(maxPrunedInvalid);
+ LOG.debug("Deleting regions recorded before time {}", pruneTime);
+ dataJanitorState.deleteAllRegionsOnOrBeforeTime(pruneTime);
+ LOG.debug("Deleting inactive transaction bounds recorded on or before time {}", pruneTime);
+ dataJanitorState.deleteInactiveTransactionBoundsOnOrBeforeTime(pruneTime);
+ LOG.debug("Deleting empty regions recorded on or before time {}", pruneTime);
+ dataJanitorState.deleteEmptyRegionsOnOrBeforeTime(pruneTime);
+ }
+
+ @Override
+ public void destroy() {
+ LOG.info("Stopping plugin...");
+ try {
+ connection.close();
+ } catch (IOException e) {
+ LOG.error("Got exception while closing HBase connection", e);
+ }
+ }
+
+ /**
+ * Create the prune state table given the {@link TableName} if the table doesn't exist already.
+ *
+ * @param stateTable prune state table name
+ */
+ protected void createPruneTable(TableName stateTable) throws IOException {
+ try (Admin admin = this.connection.getAdmin()) {
+ if (admin.tableExists(stateTable)) {
+ LOG.debug("Not creating pruneStateTable {} since it already exists.",
+ stateTable.getNameWithNamespaceInclAsString());
+ return;
+ }
+
+ HTableDescriptor htd = new HTableDescriptor(stateTable);
+ htd.addFamily(new HColumnDescriptor(DataJanitorState.FAMILY).setMaxVersions(1));
+ admin.createTable(htd);
+ LOG.info("Created pruneTable {}", stateTable.getNameWithNamespaceInclAsString());
+ } catch (TableExistsException ex) {
+ // Expected if the prune state table is being created at the same time by another client
+ LOG.debug("Not creating pruneStateTable {} since it already exists.",
+ stateTable.getNameWithNamespaceInclAsString(), ex);
+ }
+ }
+
+ /**
+ * Returns whether the table is a transactional table. By default, it is a table is identified as a transactional
+ * table if it has a the coprocessor {@link TransactionProcessor} attached to it. Should be overriden if the users
+ * attach a different coprocessor.
+ *
+ * @param tableDescriptor {@link HTableDescriptor} of the table
+ * @return true if the table is transactional
+ */
+ protected boolean isTransactionalTable(HTableDescriptor tableDescriptor) {
+ return tableDescriptor.hasCoprocessor(TransactionProcessor.class.getName());
+ }
+
+ protected SortedSet<byte[]> getTransactionalRegions() throws IOException {
+ SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+ try (Admin admin = connection.getAdmin()) {
+ HTableDescriptor[] tableDescriptors = admin.listTables();
+ LOG.debug("Got {} tables to process", tableDescriptors == null ? 0 : tableDescriptors.length);
+ if (tableDescriptors != null) {
+ for (HTableDescriptor tableDescriptor : tableDescriptors) {
+ if (isTransactionalTable(tableDescriptor)) {
+ List<HRegionInfo> tableRegions = admin.getTableRegions(tableDescriptor.getTableName());
+ LOG.debug("Regions for table {}: {}", tableDescriptor.getTableName(), tableRegions);
+ if (tableRegions != null) {
+ for (HRegionInfo region : tableRegions) {
+ regions.add(region.getRegionName());
+ }
+ }
+ } else {
+ LOG.debug("{} is not a transactional table", tableDescriptor.getTableName());
+ }
+ }
+ }
+ }
+ return regions;
+ }
+
+ /**
+ * Try to find the latest set of regions in which all regions have been major compacted, and
+ * compute prune upper bound from them. Starting from newest to oldest, this looks into the
+ * region set that has been saved periodically, and joins it with the prune upper bound data
+ * for a region recorded after a major compaction.
+ *
+ * @param timeRegions the latest set of regions
+ * @return prune upper bound
+ * @throws IOException when not able to talk to HBase
+ */
+ private long computePruneUpperBound(TimeRegions timeRegions) throws IOException {
+ do {
+ LOG.debug("Computing prune upper bound for {}", timeRegions);
+ SortedSet<byte[]> transactionalRegions = timeRegions.getRegions();
+ long time = timeRegions.getTime();
+
+ long inactiveTransactionBound = dataJanitorState.getInactiveTransactionBoundForTime(time);
+ LOG.debug("Got inactive transaction bound {}", inactiveTransactionBound);
+ // If inactiveTransactionBound is not recorded then that means the data is not complete for these regions
+ if (inactiveTransactionBound == -1) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ignoring regions for time {} as no inactiveTransactionBound was found for that time, " +
+ "and hence the data must be incomplete", time);
+ }
+ continue;
+ }
+
+ // Get the prune upper bounds for all the transactional regions
+ Map<byte[], Long> pruneUpperBoundRegions =
+ dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions);
+ logPruneUpperBoundRegions(pruneUpperBoundRegions);
+
+ // Use inactiveTransactionBound as the prune upper bound for the empty regions since the regions that are
+ // recorded as empty after inactiveTransactionBoundTime will not have invalid data
+ // for transactions started on or before inactiveTransactionBoundTime
+ pruneUpperBoundRegions = handleEmptyRegions(inactiveTransactionBound, transactionalRegions,
+ pruneUpperBoundRegions);
+
+ // If prune upper bounds are found for all the transactional regions, then compute the prune upper bound
+ // across all regions
+ if (!transactionalRegions.isEmpty() &&
+ pruneUpperBoundRegions.size() == transactionalRegions.size()) {
+ Long minPruneUpperBoundRegions = Collections.min(pruneUpperBoundRegions.values());
+ long pruneUpperBound = Math.min(inactiveTransactionBound, minPruneUpperBoundRegions);
+ LOG.debug("Found prune upper bound {} for time {}", pruneUpperBound, time);
+ return pruneUpperBound;
+ } else {
+ if (LOG.isDebugEnabled()) {
+ Sets.SetView<byte[]> difference =
+ Sets.difference(transactionalRegions, pruneUpperBoundRegions.keySet());
+ LOG.debug("Ignoring regions for time {} because the following regions did not record a pruneUpperBound: {}",
+ time, Iterables.transform(difference, TimeRegions.BYTE_ARR_TO_STRING_FN));
+ }
+ }
+
+ timeRegions = dataJanitorState.getRegionsOnOrBeforeTime(time - 1);
+ } while (timeRegions != null);
+ return -1;
+ }
+
+ private Map<byte[], Long> handleEmptyRegions(long inactiveTransactionBound,
+ SortedSet<byte[]> transactionalRegions,
+ Map<byte[], Long> pruneUpperBoundRegions) throws IOException {
+ long inactiveTransactionBoundTime = TxUtils.getTimestamp(inactiveTransactionBound);
+ SortedSet<byte[]> emptyRegions =
+ dataJanitorState.getEmptyRegionsAfterTime(inactiveTransactionBoundTime, transactionalRegions);
+ LOG.debug("Got empty transactional regions for inactive transaction bound time {}: {}",
+ inactiveTransactionBoundTime, Iterables.transform(emptyRegions, TimeRegions.BYTE_ARR_TO_STRING_FN));
+
+ // The regions that are recorded as empty after inactiveTransactionBoundTime will not have invalid data
+ // for transactions started before or on inactiveTransactionBoundTime. Hence we can consider the prune upper bound
+ // for these empty regions as inactiveTransactionBound
+ Map<byte[], Long> pubWithEmptyRegions = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ pubWithEmptyRegions.putAll(pruneUpperBoundRegions);
+ for (byte[] emptyRegion : emptyRegions) {
+ if (!pruneUpperBoundRegions.containsKey(emptyRegion)) {
+ pubWithEmptyRegions.put(emptyRegion, inactiveTransactionBound);
+ }
+ }
+ return Collections.unmodifiableMap(pubWithEmptyRegions);
+ }
+
+ private void logPruneUpperBoundRegions(Map<byte[], Long> pruneUpperBoundRegions) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got region - prune upper bound map: {}",
+ Iterables.transform(pruneUpperBoundRegions.entrySet(),
+ new Function<Map.Entry<byte[], Long>, Map.Entry<String, Long>>() {
+ @Override
+ public Map.Entry<String, Long> apply(Map.Entry<byte[], Long> input) {
+ String regionName = TimeRegions.BYTE_ARR_TO_STRING_FN.apply(input.getKey());
+ return Maps.immutableEntry(regionName, input.getValue());
+ }
+ }));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8f958edb/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
new file mode 100644
index 0000000..443c998
--- /dev/null
+++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.MinMaxPriorityQueue;
+import com.google.common.collect.Sets;
+import com.google.gson.Gson;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.txprune.RegionPruneInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import javax.annotation.Nullable;
+
+/**
+ * Invalid List Pruning Debug Tool.
+ */
+public class InvalidListPruningDebug {
+ private static final Logger LOG = LoggerFactory.getLogger(InvalidListPruningDebug.class);
+ private static final Gson GSON = new Gson();
+ private DataJanitorState dataJanitorState;
+ private Connection connection;
+ private TableName tableName;
+
+ /**
+ * Initialize the Invalid List Debug Tool.
+ * @param conf {@link Configuration}
+ * @throws IOException
+ */
+ public void initialize(final Configuration conf) throws IOException {
+ LOG.debug("InvalidListPruningDebugMain : initialize method called");
+ connection = ConnectionFactory.createConnection(conf);
+ tableName = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
+ dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
+ @Override
+ public Table get() throws IOException {
+ return connection.getTable(tableName);
+ }
+ });
+ }
+
+ public void destroy() throws IOException {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+
+ /**
+ * Returns a set of regions that are live but are not empty nor have a prune upper bound recorded. These regions
+ * will stop the progress of pruning.
+ *
+ * @param numRegions number of regions
+ * @return {@link Set} of regions that needs to be compacted and flushed
+ */
+ public Set<String> getRegionsToBeCompacted(Integer numRegions) throws IOException {
+ // Fetch the live regions
+ Map<Long, SortedSet<String>> latestTimeRegion = getRegionsOnOrBeforeTime(System.currentTimeMillis());
+ if (latestTimeRegion.isEmpty()) {
+ return new HashSet<>();
+ }
+
+ Long timestamp = latestTimeRegion.keySet().iterator().next();
+ SortedSet<String> liveRegions = latestTimeRegion.get(timestamp);
+
+ SortedSet<byte[]> emptyRegions = dataJanitorState.getEmptyRegionsAfterTime(timestamp, null);
+ SortedSet<String> emptyRegionNames = new TreeSet<>();
+ Iterable<String> regionStrings = Iterables.transform(emptyRegions, TimeRegions.BYTE_ARR_TO_STRING_FN);
+ for (String regionString : regionStrings) {
+ emptyRegionNames.add(regionString);
+ }
+
+ Set<String> nonEmptyRegions = Sets.newHashSet(Sets.difference(liveRegions, emptyRegionNames));
+
+ // Get all pruned regions and remove them from the nonEmptyRegions, resulting in a set of regions that are
+ // not empty and have not been registered prune upper bound
+ Queue<RegionPruneInfo> prunedRegions = getIdleRegions(-1);
+ for (RegionPruneInfo prunedRegion : prunedRegions) {
+ if (nonEmptyRegions.contains(prunedRegion.getRegionNameAsString())) {
+ nonEmptyRegions.remove(prunedRegion.getRegionNameAsString());
+ }
+ }
+
+ if ((numRegions < 0) || (numRegions >= nonEmptyRegions.size())) {
+ return nonEmptyRegions;
+ }
+
+ Set<String> subsetRegions = new HashSet<>(numRegions);
+ for (String regionName : nonEmptyRegions) {
+ if (subsetRegions.size() == numRegions) {
+ break;
+ }
+ subsetRegions.add(regionName);
+ }
+ return subsetRegions;
+ }
+
+ /**
+ * Return a list of RegionPruneInfo. These regions are the ones that have the lowest prune upper bounds.
+ * If -1 is passed in, all the regions and their prune upper bound will be returned. Note that only the regions
+ * that are known to be live will be returned.
+ *
+ * @param numRegions number of regions
+ * @return Map of region name and its prune upper bound
+ */
+ public Queue<RegionPruneInfo> getIdleRegions(Integer numRegions) throws IOException {
+ List<RegionPruneInfo> regionPruneInfos = dataJanitorState.getPruneInfoForRegions(null);
+ if (regionPruneInfos.isEmpty()) {
+ return new LinkedList<>();
+ }
+
+ // Create a set with region names
+ Set<String> pruneRegionNameSet = new HashSet<>();
+ for (RegionPruneInfo regionPruneInfo : regionPruneInfos) {
+ pruneRegionNameSet.add(regionPruneInfo.getRegionNameAsString());
+ }
+
+ // Fetch the live regions
+ Map<Long, SortedSet<String>> latestTimeRegion = getRegionsOnOrBeforeTime(System.currentTimeMillis());
+ if (!latestTimeRegion.isEmpty()) {
+ SortedSet<String> liveRegions = latestTimeRegion.values().iterator().next();
+ Set<String> liveRegionsWithPruneInfo = Sets.intersection(liveRegions, pruneRegionNameSet);
+ List<RegionPruneInfo> liveRegionWithPruneInfoList = new ArrayList<>();
+ for (RegionPruneInfo regionPruneInfo : regionPruneInfos) {
+ if (liveRegionsWithPruneInfo.contains(regionPruneInfo.getRegionNameAsString())) {
+ liveRegionWithPruneInfoList.add(regionPruneInfo);
+ }
+ }
+
+ // Use the subset of live regions and prune regions
+ regionPruneInfos = liveRegionWithPruneInfoList;
+ }
+
+ if (numRegions < 0) {
+ numRegions = regionPruneInfos.size();
+ }
+
+ Queue<RegionPruneInfo> lowestPrunes = MinMaxPriorityQueue.orderedBy(new Comparator<RegionPruneInfo>() {
+ @Override
+ public int compare(RegionPruneInfo o1, RegionPruneInfo o2) {
+ return (int) (o1.getPruneUpperBound() - o2.getPruneUpperBound());
+ }
+ }).maximumSize(numRegions).create();
+
+ for (RegionPruneInfo pruneInfo : regionPruneInfos) {
+ lowestPrunes.add(pruneInfo);
+ }
+ return lowestPrunes;
+ }
+
+ /**
+ * Return the prune upper bound value of a given region. If no prune upper bound has been written for this region yet,
+ * it will return a null.
+ *
+ * @param regionId region id
+ * @return {@link RegionPruneInfo} of the region
+ * @throws IOException if there are any errors while trying to fetch the {@link RegionPruneInfo}
+ */
+ @Nullable
+ public RegionPruneInfo getRegionPruneInfo(String regionId) throws IOException {
+ return dataJanitorState.getPruneInfoForRegion(Bytes.toBytesBinary(regionId));
+ }
+
+ /**
+ *
+ * @param time Given a time, provide the {@link TimeRegions} at or before that time
+ * @return transactional regions that are present at or before the given time
+ * @throws IOException if there are any errors while trying to fetch the {@link TimeRegions}
+ */
+ public Map<Long, SortedSet<String>> getRegionsOnOrBeforeTime(Long time) throws IOException {
+ Map<Long, SortedSet<String>> regionMap = new HashMap<>();
+ TimeRegions timeRegions = dataJanitorState.getRegionsOnOrBeforeTime(time);
+ if (timeRegions == null) {
+ return regionMap;
+ }
+ SortedSet<String> regionNames = new TreeSet<>();
+ Iterable<String> regionStrings = Iterables.transform(timeRegions.getRegions(), TimeRegions.BYTE_ARR_TO_STRING_FN);
+ for (String regionString : regionStrings) {
+ regionNames.add(regionString);
+ }
+ regionMap.put(timeRegions.getTime(), regionNames);
+ return regionMap;
+ }
+
+ private void printUsage(PrintWriter pw) {
+ pw.println("Usage : org.apache.tephra.hbase.txprune.InvalidListPruning <command> <parameter>");
+ pw.println("Available commands, corresponding parameters are:");
+ pw.println("****************************************************");
+ pw.println("time-region ts");
+ pw.println("Desc: Prints out the transactional regions present in HBase at time 'ts' (in milliseconds) " +
+ "or the latest time before time 'ts'.");
+ pw.println("idle-regions limit");
+ pw.println("Desc: Prints out 'limit' number of regions which has the lowest prune upper bounds. If '-1' is " +
+ "provided as the limit, prune upper bounds of all regions are returned.");
+ pw.println("prune-info region-name-as-string");
+ pw.println("Desc: Prints out the Pruning information for the region 'region-name-as-string'");
+ pw.println("to-compact-regions limit");
+ pw.println("Desc: Prints out 'limit' number of regions that are active, but are not empty, " +
+ "and have not registered a prune upper bound.");
+ }
+
+ private boolean execute(String[] args) throws IOException {
+ try (PrintWriter pw = new PrintWriter(System.out)) {
+ if (args.length != 2) {
+ printUsage(pw);
+ return false;
+ }
+
+ String command = args[0];
+ String parameter = args[1];
+ if ("time-region".equals(command)) {
+ Long time = Long.parseLong(parameter);
+ Map<Long, SortedSet<String>> timeRegion = getRegionsOnOrBeforeTime(time);
+ pw.println(GSON.toJson(timeRegion));
+ return true;
+ } else if ("idle-regions".equals(command)) {
+ Integer numRegions = Integer.parseInt(parameter);
+ Queue<RegionPruneInfo> regionPruneInfos = getIdleRegions(numRegions);
+ pw.println(GSON.toJson(regionPruneInfos));
+ return true;
+ } else if ("prune-info".equals(command)) {
+ RegionPruneInfo regionPruneInfo = getRegionPruneInfo(parameter);
+ if (regionPruneInfo != null) {
+ pw.println(GSON.toJson(regionPruneInfo));
+ } else {
+ pw.println(String.format("No prune info found for the region %s.", parameter));
+ }
+ return true;
+ } else if ("to-compact-regions".equals(command)) {
+ Integer numRegions = Integer.parseInt(parameter);
+ Set<String> toBeCompactedRegions = getRegionsToBeCompacted(numRegions);
+ pw.println(GSON.toJson(toBeCompactedRegions));
+ return true;
+ } else {
+ pw.println(String.format("%s is not a valid command.", command));
+ printUsage(pw);
+ return false;
+ }
+ }
+ }
+
+ public static void main(String[] args) {
+ Configuration hConf = HBaseConfiguration.create();
+ InvalidListPruningDebug pruningDebug = new InvalidListPruningDebug();
+ try {
+ pruningDebug.initialize(hConf);
+ boolean success = pruningDebug.execute(args);
+ pruningDebug.destroy();
+ if (!success) {
+ System.exit(1);
+ }
+ } catch (IOException ex) {
+ LOG.error("Received an exception while trying to execute the debug tool. ", ex);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8f958edb/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
new file mode 100644
index 0000000..677710b
--- /dev/null
+++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import com.google.common.util.concurrent.AbstractIdleService;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Thread that will write the the prune upper bound. An instance of this class should be obtained only
+ * through {@link PruneUpperBoundWriterSupplier} which will also handle the lifecycle of this instance.
+ */
+public class PruneUpperBoundWriter extends AbstractIdleService {
+ private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriter.class);
+
+ private final TableName tableName;
+ private final DataJanitorState dataJanitorState;
+ private final long pruneFlushInterval;
+ // Map of region name -> prune upper bound
+ private final ConcurrentSkipListMap<byte[], Long> pruneEntries;
+ // Map of region name -> time the region was found to be empty
+ private final ConcurrentSkipListMap<byte[], Long> emptyRegions;
+
+ private volatile Thread flushThread;
+ private volatile boolean stopped;
+
+ private long lastChecked;
+
+ @SuppressWarnings("WeakerAccess")
+ public PruneUpperBoundWriter(TableName tableName, DataJanitorState dataJanitorState, long pruneFlushInterval) {
+ this.tableName = tableName;
+ this.dataJanitorState = dataJanitorState;
+ this.pruneFlushInterval = pruneFlushInterval;
+ this.pruneEntries = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
+ this.emptyRegions = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
+ }
+
+ @SuppressWarnings("WeakerAccess")
+ public void persistPruneEntry(byte[] regionName, long pruneUpperBound) {
+ warnIfNotRunning(regionName);
+ // The number of entries in this map is bound by the number of regions in this region server and thus it will not
+ // grow indefinitely
+ pruneEntries.put(regionName, pruneUpperBound);
+ }
+
+ @SuppressWarnings("WeakerAccess")
+ public void persistRegionEmpty(byte[] regionName, long time) {
+ warnIfNotRunning(regionName);
+ // The number of entries in this map is bound by the number of regions in this region server and thus it will not
+ // grow indefinitely
+ emptyRegions.put(regionName, time);
+ }
+
+ @SuppressWarnings("WeakerAccess")
+ public boolean isAlive() {
+ return flushThread != null && flushThread.isAlive();
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ LOG.info("Starting PruneUpperBoundWriter Thread.");
+ startFlushThread();
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ LOG.info("Stopping PruneUpperBoundWriter Thread.");
+ stopped = true;
+ if (flushThread != null) {
+ flushThread.interrupt();
+ flushThread.join(TimeUnit.SECONDS.toMillis(1));
+ if (flushThread.isAlive()) {
+ flushThread.interrupt();
+ flushThread.join(TimeUnit.SECONDS.toMillis(1));
+ }
+ }
+ }
+
+ private void startFlushThread() {
+ flushThread = new Thread("tephra-prune-upper-bound-writer") {
+ @Override
+ public void run() {
+ while ((!isInterrupted()) && (!stopped)) {
+ long now = System.currentTimeMillis();
+ if (now > (lastChecked + pruneFlushInterval)) {
+ // should flush data
+ try {
+ User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ // Record prune upper bound
+ while (!pruneEntries.isEmpty()) {
+ Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry();
+ dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue());
+ // We can now remove the entry only if the key and value match with what we wrote since it is
+ // possible that a new pruneUpperBound for the same key has been added
+ pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue());
+ }
+ // Record empty regions
+ while (!emptyRegions.isEmpty()) {
+ Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry();
+ dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey());
+ // We can now remove the entry only if the key and value match with what we wrote since it is
+ // possible that a new value for the same key has been added
+ emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue());
+ }
+ return null;
+ }
+ });
+ } catch (IOException ex) {
+ LOG.warn("Cannot record prune upper bound for a region to table " +
+ tableName.getNameWithNamespaceInclAsString(), ex);
+ }
+ lastChecked = now;
+ }
+
+ try {
+ TimeUnit.SECONDS.sleep(1);
+ } catch (InterruptedException ex) {
+ interrupt();
+ break;
+ }
+ }
+
+ LOG.info("PruneUpperBound Writer thread terminated.");
+ }
+ };
+
+ flushThread.setDaemon(true);
+ flushThread.start();
+ }
+
+ private void warnIfNotRunning(byte[] regionName) {
+ if (!isRunning() || !isAlive()) {
+ LOG.warn(String.format("Trying to persist prune upper bound for region %s when writer is not %s!",
+ Bytes.toStringBinary(regionName), isRunning() ? "alive" : "running"));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8f958edb/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
new file mode 100644
index 0000000..cb93fab
--- /dev/null
+++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+
+import com.google.common.base.Supplier;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.tephra.coprocessor.CacheSupplier;
+import org.apache.tephra.coprocessor.ReferenceCountedSupplier;
+
+/**
+ * Supplies instances of {@link PruneUpperBoundWriter} implementations.
+ */
+public class PruneUpperBoundWriterSupplier implements CacheSupplier<PruneUpperBoundWriter> {
+
+ private static final ReferenceCountedSupplier<PruneUpperBoundWriter> referenceCountedSupplier =
+ new ReferenceCountedSupplier<>(PruneUpperBoundWriter.class.getSimpleName());
+
+ private final Supplier<PruneUpperBoundWriter> supplier;
+
+ public PruneUpperBoundWriterSupplier(final TableName tableName, final DataJanitorState dataJanitorState,
+ final long pruneFlushInterval) {
+ this.supplier = new Supplier<PruneUpperBoundWriter>() {
+ @Override
+ public PruneUpperBoundWriter get() {
+ return new PruneUpperBoundWriter(tableName, dataJanitorState, pruneFlushInterval);
+ }
+ };
+ }
+
+ @Override
+ public PruneUpperBoundWriter get() {
+ return referenceCountedSupplier.getOrCreate(supplier);
+ }
+
+ public void release() {
+ referenceCountedSupplier.release();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8f958edb/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/TimeRegions.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/TimeRegions.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/TimeRegions.java
new file mode 100644
index 0000000..4ac8887
--- /dev/null
+++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/TimeRegions.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.util.Objects;
+import java.util.SortedSet;
+
+/**
+ * Contains information on the set of transactional regions recorded at a given time
+ */
+@SuppressWarnings("WeakerAccess")
+public class TimeRegions {
+ static final Function<byte[], String> BYTE_ARR_TO_STRING_FN =
+ new Function<byte[], String>() {
+ @Override
+ public String apply(byte[] input) {
+ return Bytes.toStringBinary(input);
+ }
+ };
+
+ private final long time;
+ private final SortedSet<byte[]> regions;
+
+ public TimeRegions(long time, SortedSet<byte[]> regions) {
+ this.time = time;
+ this.regions = regions;
+ }
+
+ public long getTime() {
+ return time;
+ }
+
+ public SortedSet<byte[]> getRegions() {
+ return regions;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TimeRegions that = (TimeRegions) o;
+ return time == that.time &&
+ Objects.equals(regions, that.regions);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(time, regions);
+ }
+
+ @Override
+ public String toString() {
+ Iterable<String> regionStrings = Iterables.transform(regions, BYTE_ARR_TO_STRING_FN);
+ return "TimeRegions{" +
+ "time=" + time +
+ ", regions=[" + Joiner.on(" ").join(regionStrings) + "]" +
+ '}';
+ }
+}