You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tephra.apache.org by poornachandra <gi...@git.apache.org> on 2016/11/22 01:16:16 UTC

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

GitHub user poornachandra opened a pull request:

    https://github.com/apache/incubator-tephra/pull/20

    Compute global prune upper bound using compaction state of every region

    JIRA - https://issues.apache.org/jira/browse/TEPHRA-198
    
    This PR uses the compaction state recorded by co-processors in https://github.com/apache/incubator-tephra/pull/19 to compute the global prune upper bound.
    
    TODO: add some more test cases to test DefaultDataJanitorPlugin.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/poornachandra/incubator-tephra feature/transaction-pruning

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-tephra/pull/20.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #20
    
----
commit 0c4001d088b432bbe68a5251e89665930d710f04
Author: poorna <po...@cask.co>
Date:   2016-11-10T23:09:02Z

    Compute global prune upper bound using compaction state of every region

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

Posted by anew <gi...@git.apache.org>.
Github user anew commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/20#discussion_r89938407
  
    --- Diff: tephra-core/src/main/java/org/apache/tephra/janitor/DataJanitorPlugin.java ---
    @@ -0,0 +1,59 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.tephra.janitor;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +import java.io.IOException;
    +
    +/**
    + * Data janitor interface to manage the invalid transaction list.
    + * There will be one such plugin per data store that will be invoked periodically
    + * to fetch the prune upper bound for each data store.
    + * Invalid transaction list will pruned up to the minimum of prune upper bounds returned by all the plugins.
    + */
    +public interface DataJanitorPlugin {
    +  /**
    +   * Called once at the beginning to initialize the plugin
    +   */
    +  void initialize(Configuration conf) throws IOException;
    +
    +  /**
    +   * Called periodically to fetch prune upper bound for a data store
    +   *
    +   * @param time start time of this prune iteration
    +   * @param pruneUpperBoundForTime upper bound for prune tx id for the given start time
    --- End diff --
    
    this is confusing. I thought this _returns_ the prune upper bound? Why does it need the upper bound passed in as a parameter. I think you need to explain better what this parameter means. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-tephra/pull/20


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

Posted by anew <gi...@git.apache.org>.
Github user anew commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/20#discussion_r90759372
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java ---
    @@ -58,10 +110,248 @@ public long getPruneUpperBound(byte[] regionId) throws IOException {
         }
       }
     
    +  /**
    +   * Get latest prune upper bounds for given regions. This is a batch operation of method
    +   * {@link #getPruneUpperBoundForRegion(byte[])}
    +   *
    +   * @param regions a set of regions
    +   * @return a map containing region id and its latest prune upper bound value
    +   * @throws IOException when not able to read the data from HBase
    +   */
    +  public Map<byte[], Long> getPruneUpperBoundForRegions(SortedSet<byte[]> regions) throws IOException {
    +    Map<byte[], Long> resultMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
    +      Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
    +      scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
    +
    +      try (ResultScanner scanner = stateTable.getScanner(scan)) {
    +        Result next;
    +        while ((next = scanner.next()) != null) {
    +          byte[] region = getRegionFromKey(next.getRow());
    +          if (regions.contains(region)) {
    +            byte[] timeBytes = next.getValue(FAMILY, PRUNE_UPPER_BOUND_COL);
    +            if (timeBytes != null) {
    +              long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
    +              resultMap.put(region, pruneUpperBoundRegion);
    +            }
    +          }
    +        }
    +      }
    +      return resultMap;
    +    }
    +  }
    +
    +  /**
    +   * Delete all regions that are not in the given exclude set and whose prune upper bound is less than a given value.
    +   * After the invalid list is pruned up to deletionPruneUpperBound, we do not need entries for regions that have
    +   * prune upper bound less than deletionPruneUpperBound. We however limit the deletion to only regions that are
    +   * no longer in existence (due to deletion, etc.), to avoid update/delete race conditions.
    +   *
    +   * @param deletionPruneUpperBound prune upper bound below which regions will be deleted
    +   * @param excludeRegions set of regions that should not be deleted
    +   * @throws IOException when not able to delete data in HBase
    +   */
    +  public void deleteRegionsWithPruneUpperBoundBefore(long deletionPruneUpperBound, SortedSet<byte[]> excludeRegions)
    +    throws IOException {
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
    +      Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
    +      scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
    +
    +      try (ResultScanner scanner = stateTable.getScanner(scan)) {
    +        Result next;
    +        while ((next = scanner.next()) != null) {
    +          byte[] region = getRegionFromKey(next.getRow());
    +          if (!excludeRegions.contains(region)) {
    +            byte[] timeBytes = next.getValue(FAMILY, PRUNE_UPPER_BOUND_COL);
    +            if (timeBytes != null) {
    +              long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
    +              if (pruneUpperBoundRegion < deletionPruneUpperBound) {
    +                stateTable.delete(new Delete(next.getRow()));
    +              }
    +            }
    +          }
    +        }
    +      }
    +    }
    +  }
    +
    +  // ---------------------------------------------------
    +  // ------- Methods for regions at a given time -------
    +  // ---------------------------------------------------
    +  // Key: 0x2<time><region-id>
    +  // Col 't': <empty byte array>
    +  // ---------------------------------------------------
    +
    +  /**
    +   * Persist the regions for the given time. {@link HBaseTransactionPruningPlugin} saves the set of
    +   * transactional regions existing in the HBase instance periodically.
    +   *
    +   * @param time timestamp in milliseconds
    +   * @param regions set of regions at the time
    +   * @throws IOException when not able to persist the data to HBase
    +   */
    +  public void saveRegionsForTime(long time, Set<byte[]> regions) throws IOException {
    +    byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      for (byte[] region : regions) {
    +        Put put = new Put(makeTimeRegionKey(timeBytes, region));
    +        put.addColumn(FAMILY, REGION_TIME_COL, EMPTY_BYTE_ARRAY);
    +        stateTable.put(put);
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Return the set of regions saved for the time just before the given time. This method finds the greatest time
    +   * that is less than the given time, and then returns all regions with that exact time, but none that are
    +   * older than that.
    +   *
    +   * @param time timestamp in milliseconds
    +   * @return set of regions and time at which they were recorded
    --- End diff --
    
    or null if no regions were found


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

Posted by anew <gi...@git.apache.org>.
Github user anew commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/20#discussion_r89206461
  
    --- Diff: tephra-core/src/main/java/org/apache/tephra/janitor/DataJanitorPlugin.java ---
    @@ -0,0 +1,59 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.tephra.janitor;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +import java.io.IOException;
    +
    +/**
    + * Data janitor interface to manage the invalid transaction list.
    + * There will be one such plugin per data store that will be invoked periodically
    + * to fetch the prune upper bound for each data store.
    + * Invalid transaction list will pruned up to the minimum of prune upper bounds returned by all the plugins.
    + */
    +public interface DataJanitorPlugin {
    --- End diff --
    
    The naming in not clear. Can we call it a TransactionPruningPlugin? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

Posted by anew <gi...@git.apache.org>.
Github user anew commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/20#discussion_r90759321
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java ---
    @@ -58,10 +110,248 @@ public long getPruneUpperBound(byte[] regionId) throws IOException {
         }
       }
     
    +  /**
    +   * Get latest prune upper bounds for given regions. This is a batch operation of method
    +   * {@link #getPruneUpperBoundForRegion(byte[])}
    +   *
    +   * @param regions a set of regions
    +   * @return a map containing region id and its latest prune upper bound value
    +   * @throws IOException when not able to read the data from HBase
    +   */
    +  public Map<byte[], Long> getPruneUpperBoundForRegions(SortedSet<byte[]> regions) throws IOException {
    +    Map<byte[], Long> resultMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
    +      Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
    +      scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
    +
    +      try (ResultScanner scanner = stateTable.getScanner(scan)) {
    +        Result next;
    +        while ((next = scanner.next()) != null) {
    +          byte[] region = getRegionFromKey(next.getRow());
    +          if (regions.contains(region)) {
    +            byte[] timeBytes = next.getValue(FAMILY, PRUNE_UPPER_BOUND_COL);
    +            if (timeBytes != null) {
    +              long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
    +              resultMap.put(region, pruneUpperBoundRegion);
    +            }
    +          }
    +        }
    +      }
    +      return resultMap;
    +    }
    +  }
    +
    +  /**
    +   * Delete all regions that are not in the given exclude set and whose prune upper bound is less than a given value.
    +   * After the invalid list is pruned up to deletionPruneUpperBound, we do not need entries for regions that have
    +   * prune upper bound less than deletionPruneUpperBound. We however limit the deletion to only regions that are
    +   * no longer in existence (due to deletion, etc.), to avoid update/delete race conditions.
    +   *
    +   * @param deletionPruneUpperBound prune upper bound below which regions will be deleted
    +   * @param excludeRegions set of regions that should not be deleted
    +   * @throws IOException when not able to delete data in HBase
    +   */
    +  public void deleteRegionsWithPruneUpperBoundBefore(long deletionPruneUpperBound, SortedSet<byte[]> excludeRegions)
    +    throws IOException {
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
    +      Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
    +      scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
    +
    +      try (ResultScanner scanner = stateTable.getScanner(scan)) {
    +        Result next;
    +        while ((next = scanner.next()) != null) {
    +          byte[] region = getRegionFromKey(next.getRow());
    +          if (!excludeRegions.contains(region)) {
    +            byte[] timeBytes = next.getValue(FAMILY, PRUNE_UPPER_BOUND_COL);
    +            if (timeBytes != null) {
    +              long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
    +              if (pruneUpperBoundRegion < deletionPruneUpperBound) {
    +                stateTable.delete(new Delete(next.getRow()));
    +              }
    +            }
    +          }
    +        }
    +      }
    +    }
    +  }
    +
    +  // ---------------------------------------------------
    +  // ------- Methods for regions at a given time -------
    +  // ---------------------------------------------------
    +  // Key: 0x2<time><region-id>
    +  // Col 't': <empty byte array>
    +  // ---------------------------------------------------
    +
    +  /**
    +   * Persist the regions for the given time. {@link HBaseTransactionPruningPlugin} saves the set of
    +   * transactional regions existing in the HBase instance periodically.
    +   *
    +   * @param time timestamp in milliseconds
    +   * @param regions set of regions at the time
    +   * @throws IOException when not able to persist the data to HBase
    +   */
    +  public void saveRegionsForTime(long time, Set<byte[]> regions) throws IOException {
    +    byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      for (byte[] region : regions) {
    +        Put put = new Put(makeTimeRegionKey(timeBytes, region));
    +        put.addColumn(FAMILY, REGION_TIME_COL, EMPTY_BYTE_ARRAY);
    +        stateTable.put(put);
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Return the set of regions saved for the time just before the given time. This method finds the greatest time
    --- End diff --
    
    Seems that this is the set of regions at or before the given time. (time itself is included in the scan). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

Posted by poornachandra <gi...@git.apache.org>.
Github user poornachandra commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/20#discussion_r90580295
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java ---
    @@ -19,20 +19,46 @@
     
     package org.apache.tephra.hbase.coprocessor.janitor;
     
    +import com.google.common.collect.Maps;
    +import com.google.common.primitives.Longs;
    +import org.apache.hadoop.hbase.client.Delete;
     import org.apache.hadoop.hbase.client.Get;
     import org.apache.hadoop.hbase.client.Put;
    +import org.apache.hadoop.hbase.client.Result;
    +import org.apache.hadoop.hbase.client.ResultScanner;
    +import org.apache.hadoop.hbase.client.Scan;
     import org.apache.hadoop.hbase.client.Table;
     import org.apache.hadoop.hbase.util.Bytes;
     
     import java.io.IOException;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.SortedSet;
    +import java.util.TreeMap;
    +import java.util.TreeSet;
    +import javax.annotation.Nullable;
     
     /**
      * Persist data janitor state into an HBase table.
    --- End diff --
    
    Added javadoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

Posted by poornachandra <gi...@git.apache.org>.
Github user poornachandra commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/20#discussion_r90765092
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/HBaseTransactionPruningPlugin.java ---
    @@ -0,0 +1,289 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.tephra.hbase.coprocessor.janitor;
    +
    +import com.google.common.base.Function;
    +import com.google.common.collect.Iterables;
    +import com.google.common.collect.Maps;
    +import com.google.common.collect.Sets;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hbase.HRegionInfo;
    +import org.apache.hadoop.hbase.HTableDescriptor;
    +import org.apache.hadoop.hbase.TableName;
    +import org.apache.hadoop.hbase.client.Admin;
    +import org.apache.hadoop.hbase.client.Connection;
    +import org.apache.hadoop.hbase.client.ConnectionFactory;
    +import org.apache.hadoop.hbase.client.Table;
    +import org.apache.hadoop.hbase.util.Bytes;
    +import org.apache.tephra.TxConstants;
    +import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
    +import org.apache.tephra.janitor.TransactionPruningPlugin;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.SortedSet;
    +import java.util.TreeSet;
    +
    +/**
    + * Default implementation of the {@link TransactionPruningPlugin} for HBase.
    + *
    + * This plugin determines the prune upper bound for transactional HBase tables that use
    + * coprocessor {@link TransactionProcessor}.
    + *
    + * <h3>State storage:</h3>
    + *
    + * This plugin expects the TransactionProcessor to save the prune upper bound for invalid transactions
    + * after every major compaction of a region. Let's call this <i>(region, prune upper bound)</i>.
    + * In addition, the plugin also persists the following information on a run at time <i>t</i>
    + * <ul>
    + *   <li>
    + *     <i>(t, set of regions)</i>: Set of transactional regions at time <i>t</i>.
    + *     Transactional regions are regions of the tables that have the coprocessor TransactionProcessor
    + *     attached to them.
    + *   </li>
    + *   <li>
    + *     <i>(t, prune upper bound)</i>: This is the smallest not in-progress transaction that
    + *     will not have writes in any HBase regions that are created after time <i>t</i>.
    + *     This value is determined by the Transaction Service based on the transaction state at time <i>t</i>
    + *     and passed on to the plugin.
    + *   </li>
    + * </ul>
    + *
    + * <h3>Computing prune upper bound:</h3>
    + *
    + * In a typical HBase instance, there can be a constant change in the number of regions due to region creations,
    + * splits and merges. At any given time there can always be a region on which a major compaction has not been run.
    + * Since the prune upper bound will get recorded for a region only after a major compaction,
    + * using only the latest set of regions we may not be able to find the
    + * prune upper bounds for all the current regions. Hence we persist the set of regions that exist at that time
    + * of each run of the plugin, and use historical region set for time <i>t</i>, <i>t - 1</i>, etc.
    + * to determine the prune upper bound.
    + *
    + * From the regions saved at time <i>t</i>, <i>t - 1</i>, etc.,
    + * the plugin tries to find the latest <i>(t, set of regions)</i> where all regions have been major compacted,
    + * i.e, all regions have prune upper bound recorded in <i>(region, prune upper bound)</i>.
    + * <br/>
    + * If such a set is found for time <i>t1</i>, the prune upper bound returned by the plugin is the minimum of
    + * <ul>
    + *   <li>Prune upper bounds of regions in set <i>(t1, set of regions)</i></li>
    + *   <li>Prune upper bound from <i>(t1, prune upper bound)</i></li>
    + * </ul>
    + *
    + * <p/>
    + * Above, when we find <i>(t1, set of regions)</i>, there may a region that was created after time <i>t1</i>,
    + * but has a data write from an invalid transaction that is smaller than the prune upper bounds of all
    + * regions in <i>(t1, set of regions)</i>. This is possible because <i>(region, prune upper bound)</i> persisted by
    + * TransactionProcessor is always the latest prune upper bound for a region.
    + * <br/>
    + * However a region created after time <i>t1</i> cannot have writes from an invalid transaction that is smaller than
    + * <i>min(max(invalid list), min(in-progress list) - 1)</i> at the time the region was created.
    + * Since we limit the plugin prune upper bound using <i>(t1, prune upper bound)</i>, there should be no invalid
    --- End diff --
    
    We need to define and enforce the maximum duration a transaction can be used for writes. I have created TEPHRA-199 for this. I'll add it in the next PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra issue #20: Compute global prune upper bound using compactio...

Posted by anew <gi...@git.apache.org>.
Github user anew commented on the issue:

    https://github.com/apache/incubator-tephra/pull/20
  
    Thanks for the rename, I think this makes the code much clearer. LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

Posted by anew <gi...@git.apache.org>.
Github user anew commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/20#discussion_r89938297
  
    --- Diff: tephra-core/src/main/java/org/apache/tephra/janitor/DataJanitorPlugin.java ---
    @@ -0,0 +1,59 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.tephra.janitor;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +import java.io.IOException;
    +
    +/**
    + * Data janitor interface to manage the invalid transaction list.
    + * There will be one such plugin per data store that will be invoked periodically
    --- End diff --
    
    It would make sense to mention here that this a plugin for the tx manager. Its name is a little misleading because the DataJanitor is a coprocessor that runs in each region server. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

Posted by anew <gi...@git.apache.org>.
Github user anew commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/20#discussion_r90759375
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java ---
    @@ -58,10 +110,248 @@ public long getPruneUpperBound(byte[] regionId) throws IOException {
         }
       }
     
    +  /**
    +   * Get latest prune upper bounds for given regions. This is a batch operation of method
    +   * {@link #getPruneUpperBoundForRegion(byte[])}
    +   *
    +   * @param regions a set of regions
    +   * @return a map containing region id and its latest prune upper bound value
    +   * @throws IOException when not able to read the data from HBase
    +   */
    +  public Map<byte[], Long> getPruneUpperBoundForRegions(SortedSet<byte[]> regions) throws IOException {
    +    Map<byte[], Long> resultMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
    +      Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
    +      scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
    +
    +      try (ResultScanner scanner = stateTable.getScanner(scan)) {
    +        Result next;
    +        while ((next = scanner.next()) != null) {
    +          byte[] region = getRegionFromKey(next.getRow());
    +          if (regions.contains(region)) {
    +            byte[] timeBytes = next.getValue(FAMILY, PRUNE_UPPER_BOUND_COL);
    +            if (timeBytes != null) {
    +              long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
    +              resultMap.put(region, pruneUpperBoundRegion);
    +            }
    +          }
    +        }
    +      }
    +      return resultMap;
    +    }
    +  }
    +
    +  /**
    +   * Delete all regions that are not in the given exclude set and whose prune upper bound is less than a given value.
    +   * After the invalid list is pruned up to deletionPruneUpperBound, we do not need entries for regions that have
    +   * prune upper bound less than deletionPruneUpperBound. We however limit the deletion to only regions that are
    +   * no longer in existence (due to deletion, etc.), to avoid update/delete race conditions.
    +   *
    +   * @param deletionPruneUpperBound prune upper bound below which regions will be deleted
    +   * @param excludeRegions set of regions that should not be deleted
    +   * @throws IOException when not able to delete data in HBase
    +   */
    +  public void deleteRegionsWithPruneUpperBoundBefore(long deletionPruneUpperBound, SortedSet<byte[]> excludeRegions)
    +    throws IOException {
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
    +      Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
    +      scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
    +
    +      try (ResultScanner scanner = stateTable.getScanner(scan)) {
    +        Result next;
    +        while ((next = scanner.next()) != null) {
    +          byte[] region = getRegionFromKey(next.getRow());
    +          if (!excludeRegions.contains(region)) {
    +            byte[] timeBytes = next.getValue(FAMILY, PRUNE_UPPER_BOUND_COL);
    +            if (timeBytes != null) {
    +              long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
    +              if (pruneUpperBoundRegion < deletionPruneUpperBound) {
    +                stateTable.delete(new Delete(next.getRow()));
    +              }
    +            }
    +          }
    +        }
    +      }
    +    }
    +  }
    +
    +  // ---------------------------------------------------
    +  // ------- Methods for regions at a given time -------
    +  // ---------------------------------------------------
    +  // Key: 0x2<time><region-id>
    +  // Col 't': <empty byte array>
    +  // ---------------------------------------------------
    +
    +  /**
    +   * Persist the regions for the given time. {@link HBaseTransactionPruningPlugin} saves the set of
    +   * transactional regions existing in the HBase instance periodically.
    +   *
    +   * @param time timestamp in milliseconds
    +   * @param regions set of regions at the time
    +   * @throws IOException when not able to persist the data to HBase
    +   */
    +  public void saveRegionsForTime(long time, Set<byte[]> regions) throws IOException {
    +    byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      for (byte[] region : regions) {
    +        Put put = new Put(makeTimeRegionKey(timeBytes, region));
    +        put.addColumn(FAMILY, REGION_TIME_COL, EMPTY_BYTE_ARRAY);
    +        stateTable.put(put);
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Return the set of regions saved for the time just before the given time. This method finds the greatest time
    +   * that is less than the given time, and then returns all regions with that exact time, but none that are
    +   * older than that.
    +   *
    +   * @param time timestamp in milliseconds
    +   * @return set of regions and time at which they were recorded
    +   * @throws IOException when not able to read the data from HBase
    +   */
    +  @Nullable
    +  public TimeRegions getRegionsOnOrBeforeTime(long time) throws IOException {
    +    byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
    +      scan.addColumn(FAMILY, REGION_TIME_COL);
    +
    +      SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
    +      long currentRegionTime = -1;
    +      try (ResultScanner scanner = stateTable.getScanner(scan)) {
    +        Result next;
    +        while ((next = scanner.next()) != null) {
    +          Map.Entry<Long, byte[]> timeRegion = getTimeRegion(next.getRow());
    +          // Stop if reached next time value
    +          if (currentRegionTime == -1) {
    +            currentRegionTime = timeRegion.getKey();
    +          } else if (timeRegion.getKey() < currentRegionTime) {
    +            break;
    +          } else if (timeRegion.getKey() > currentRegionTime) {
    +            throw new IllegalStateException(
    +              String.format("Got out of order time %d when expecting time lesser than %d",
    --- End diff --
    
    less than or equal to


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

Posted by anew <gi...@git.apache.org>.
Github user anew commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/20#discussion_r90759422
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java ---
    @@ -58,10 +110,248 @@ public long getPruneUpperBound(byte[] regionId) throws IOException {
         }
       }
     
    +  /**
    +   * Get latest prune upper bounds for given regions. This is a batch operation of method
    +   * {@link #getPruneUpperBoundForRegion(byte[])}
    +   *
    +   * @param regions a set of regions
    +   * @return a map containing region id and its latest prune upper bound value
    +   * @throws IOException when not able to read the data from HBase
    +   */
    +  public Map<byte[], Long> getPruneUpperBoundForRegions(SortedSet<byte[]> regions) throws IOException {
    +    Map<byte[], Long> resultMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
    +      Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
    +      scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
    +
    +      try (ResultScanner scanner = stateTable.getScanner(scan)) {
    +        Result next;
    +        while ((next = scanner.next()) != null) {
    +          byte[] region = getRegionFromKey(next.getRow());
    +          if (regions.contains(region)) {
    +            byte[] timeBytes = next.getValue(FAMILY, PRUNE_UPPER_BOUND_COL);
    +            if (timeBytes != null) {
    +              long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
    +              resultMap.put(region, pruneUpperBoundRegion);
    +            }
    +          }
    +        }
    +      }
    +      return resultMap;
    +    }
    +  }
    +
    +  /**
    +   * Delete all regions that are not in the given exclude set and whose prune upper bound is less than a given value.
    +   * After the invalid list is pruned up to deletionPruneUpperBound, we do not need entries for regions that have
    +   * prune upper bound less than deletionPruneUpperBound. We however limit the deletion to only regions that are
    +   * no longer in existence (due to deletion, etc.), to avoid update/delete race conditions.
    +   *
    +   * @param deletionPruneUpperBound prune upper bound below which regions will be deleted
    +   * @param excludeRegions set of regions that should not be deleted
    +   * @throws IOException when not able to delete data in HBase
    +   */
    +  public void deleteRegionsWithPruneUpperBoundBefore(long deletionPruneUpperBound, SortedSet<byte[]> excludeRegions)
    +    throws IOException {
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
    +      Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
    +      scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
    +
    +      try (ResultScanner scanner = stateTable.getScanner(scan)) {
    +        Result next;
    +        while ((next = scanner.next()) != null) {
    +          byte[] region = getRegionFromKey(next.getRow());
    +          if (!excludeRegions.contains(region)) {
    +            byte[] timeBytes = next.getValue(FAMILY, PRUNE_UPPER_BOUND_COL);
    +            if (timeBytes != null) {
    +              long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
    +              if (pruneUpperBoundRegion < deletionPruneUpperBound) {
    +                stateTable.delete(new Delete(next.getRow()));
    +              }
    +            }
    +          }
    +        }
    +      }
    +    }
    +  }
    +
    +  // ---------------------------------------------------
    +  // ------- Methods for regions at a given time -------
    +  // ---------------------------------------------------
    +  // Key: 0x2<time><region-id>
    +  // Col 't': <empty byte array>
    +  // ---------------------------------------------------
    +
    +  /**
    +   * Persist the regions for the given time. {@link HBaseTransactionPruningPlugin} saves the set of
    +   * transactional regions existing in the HBase instance periodically.
    +   *
    +   * @param time timestamp in milliseconds
    +   * @param regions set of regions at the time
    +   * @throws IOException when not able to persist the data to HBase
    +   */
    +  public void saveRegionsForTime(long time, Set<byte[]> regions) throws IOException {
    +    byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      for (byte[] region : regions) {
    +        Put put = new Put(makeTimeRegionKey(timeBytes, region));
    +        put.addColumn(FAMILY, REGION_TIME_COL, EMPTY_BYTE_ARRAY);
    +        stateTable.put(put);
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Return the set of regions saved for the time just before the given time. This method finds the greatest time
    +   * that is less than the given time, and then returns all regions with that exact time, but none that are
    +   * older than that.
    +   *
    +   * @param time timestamp in milliseconds
    +   * @return set of regions and time at which they were recorded
    +   * @throws IOException when not able to read the data from HBase
    +   */
    +  @Nullable
    +  public TimeRegions getRegionsOnOrBeforeTime(long time) throws IOException {
    +    byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
    +      scan.addColumn(FAMILY, REGION_TIME_COL);
    +
    +      SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
    +      long currentRegionTime = -1;
    +      try (ResultScanner scanner = stateTable.getScanner(scan)) {
    +        Result next;
    +        while ((next = scanner.next()) != null) {
    +          Map.Entry<Long, byte[]> timeRegion = getTimeRegion(next.getRow());
    +          // Stop if reached next time value
    +          if (currentRegionTime == -1) {
    +            currentRegionTime = timeRegion.getKey();
    +          } else if (timeRegion.getKey() < currentRegionTime) {
    +            break;
    +          } else if (timeRegion.getKey() > currentRegionTime) {
    +            throw new IllegalStateException(
    +              String.format("Got out of order time %d when expecting time lesser than %d",
    +                            timeRegion.getKey(), currentRegionTime));
    +          }
    +          regions.add(timeRegion.getValue());
    +        }
    +      }
    +      return regions.isEmpty() ? null : new TimeRegions(currentRegionTime, regions);
    +    }
    +  }
    +
    +  /**
    +   * Delete all the regions that were recorded for all times equal or less than the given time.
    +   *
    +   * @param time timestamp in milliseconds
    +   * @throws IOException when not able to delete data in HBase
    +   */
    +  public void deleteAllRegionsOnOrBeforeTime(long time) throws IOException {
    +    byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
    +      scan.addColumn(FAMILY, REGION_TIME_COL);
    +
    +      try (ResultScanner scanner = stateTable.getScanner(scan)) {
    +        Result next;
    +        while ((next = scanner.next()) != null) {
    +          stateTable.delete(new Delete(next.getRow()));
    +        }
    +      }
    +    }
    +  }
    +
    +  // ----------------------------------------------------------------
    +  // ------- Methods for max prune upper bound for given time -------
    +  // ----------------------------------------------------------------
    +  // Key: 0x3<inverted time>
    +  // Col 'p': <prune upper bound>
    +  // ----------------------------------------------------------------
    +
    +  /**
    +   * Persist prune upper bound for a given time. This is the smallest not in-progress transaction that
    +   * will not have writes in any HBase regions that are created after the given time.
    +   *
    +   * @param time time in milliseconds
    +   * @param pruneUpperBoundTime prune upper bound for the given time
    +   * @throws IOException when not able to persist the data to HBase
    +   */
    +  public void savePruneUpperBoundForTime(long time, long pruneUpperBoundTime) throws IOException {
    --- End diff --
    
    what is persisted with this method? The pruneUpperBound passed to fetchPruneUpperBound() or the one that is returned by that method? I think it would be helpful to use different terms for the two bounds. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

Posted by anew <gi...@git.apache.org>.
Github user anew commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/20#discussion_r90759621
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/HBaseTransactionPruningPlugin.java ---
    @@ -0,0 +1,289 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.tephra.hbase.coprocessor.janitor;
    +
    +import com.google.common.base.Function;
    +import com.google.common.collect.Iterables;
    +import com.google.common.collect.Maps;
    +import com.google.common.collect.Sets;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hbase.HRegionInfo;
    +import org.apache.hadoop.hbase.HTableDescriptor;
    +import org.apache.hadoop.hbase.TableName;
    +import org.apache.hadoop.hbase.client.Admin;
    +import org.apache.hadoop.hbase.client.Connection;
    +import org.apache.hadoop.hbase.client.ConnectionFactory;
    +import org.apache.hadoop.hbase.client.Table;
    +import org.apache.hadoop.hbase.util.Bytes;
    +import org.apache.tephra.TxConstants;
    +import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
    +import org.apache.tephra.janitor.TransactionPruningPlugin;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.SortedSet;
    +import java.util.TreeSet;
    +
    +/**
    + * Default implementation of the {@link TransactionPruningPlugin} for HBase.
    + *
    + * This plugin determines the prune upper bound for transactional HBase tables that use
    + * coprocessor {@link TransactionProcessor}.
    + *
    + * <h3>State storage:</h3>
    + *
    + * This plugin expects the TransactionProcessor to save the prune upper bound for invalid transactions
    + * after every major compaction of a region. Let's call this <i>(region, prune upper bound)</i>.
    + * In addition, the plugin also persists the following information on a run at time <i>t</i>
    + * <ul>
    + *   <li>
    + *     <i>(t, set of regions)</i>: Set of transactional regions at time <i>t</i>.
    + *     Transactional regions are regions of the tables that have the coprocessor TransactionProcessor
    + *     attached to them.
    + *   </li>
    + *   <li>
    + *     <i>(t, prune upper bound)</i>: This is the smallest not in-progress transaction that
    + *     will not have writes in any HBase regions that are created after time <i>t</i>.
    + *     This value is determined by the Transaction Service based on the transaction state at time <i>t</i>
    + *     and passed on to the plugin.
    + *   </li>
    + * </ul>
    + *
    + * <h3>Computing prune upper bound:</h3>
    + *
    + * In a typical HBase instance, there can be a constant change in the number of regions due to region creations,
    + * splits and merges. At any given time there can always be a region on which a major compaction has not been run.
    + * Since the prune upper bound will get recorded for a region only after a major compaction,
    + * using only the latest set of regions we may not be able to find the
    + * prune upper bounds for all the current regions. Hence we persist the set of regions that exist at that time
    + * of each run of the plugin, and use historical region set for time <i>t</i>, <i>t - 1</i>, etc.
    + * to determine the prune upper bound.
    + *
    + * From the regions saved at time <i>t</i>, <i>t - 1</i>, etc.,
    + * the plugin tries to find the latest <i>(t, set of regions)</i> where all regions have been major compacted,
    + * i.e, all regions have prune upper bound recorded in <i>(region, prune upper bound)</i>.
    + * <br/>
    + * If such a set is found for time <i>t1</i>, the prune upper bound returned by the plugin is the minimum of
    + * <ul>
    + *   <li>Prune upper bounds of regions in set <i>(t1, set of regions)</i></li>
    + *   <li>Prune upper bound from <i>(t1, prune upper bound)</i></li>
    + * </ul>
    + *
    + * <p/>
    + * Above, when we find <i>(t1, set of regions)</i>, there may a region that was created after time <i>t1</i>,
    + * but has a data write from an invalid transaction that is smaller than the prune upper bounds of all
    + * regions in <i>(t1, set of regions)</i>. This is possible because <i>(region, prune upper bound)</i> persisted by
    + * TransactionProcessor is always the latest prune upper bound for a region.
    + * <br/>
    + * However a region created after time <i>t1</i> cannot have writes from an invalid transaction that is smaller than
    + * <i>min(max(invalid list), min(in-progress list) - 1)</i> at the time the region was created.
    + * Since we limit the plugin prune upper bound using <i>(t1, prune upper bound)</i>, there should be no invalid
    --- End diff --
    
    so, for t1, the prune upper bound = min(max(invalid list), min(in-progress list) - 1) at time t1? That is the pruneUpperBoundForTime passed to fetchPruneUpperBound is the greatest tx id for which it is guaranteed that it will never create any writes after t1? I think it would be good to find a name for that parameter that reflects that. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra issue #20: Compute global prune upper bound using compactio...

Posted by poornachandra <gi...@git.apache.org>.
Github user poornachandra commented on the issue:

    https://github.com/apache/incubator-tephra/pull/20
  
    Thanks for the review @anew. I've addressed the comments, please take another look.
    
    > Regarding correctness, my major concern is that the (min(max invalid), min(inProgress)-1) is not a safe bound for transaction that are guaranteed not to write any more.
    I have filed TEPHRA-199 to address this, I'll make the changes in the next PR.
    
    Do you have a suggestion for a better name for `pruneUpperBoundTime`? Basically this is an upper bound for transactions that are not active (i.e, can be still used for writing). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

Posted by anew <gi...@git.apache.org>.
Github user anew commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/20#discussion_r89391878
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java ---
    @@ -58,10 +105,237 @@ public long getPruneUpperBound(byte[] regionId) throws IOException {
         }
       }
     
    +  /**
    +   * Get latest prune upper bounds for given regions
    +   *
    +   * @param regions a set of regions
    +   * @return a map containing region id and its latest prune upper bound value
    +   * @throws IOException when not able to read the data from HBase
    +   */
    +  public Map<byte[], Long> getPruneUpperBoundForRegions(SortedSet<byte[]> regions) throws IOException {
    +    Map<byte[], Long> resultMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
    +      Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
    +      scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
    +
    +      try (ResultScanner scanner = stateTable.getScanner(scan)) {
    +        Result next;
    +        while ((next = scanner.next()) != null) {
    +          byte[] region = getRegionFromKey(next.getRow());
    +          if (regions.contains(region)) {
    +            byte[] timeBytes = next.getValue(FAMILY, PRUNE_UPPER_BOUND_COL);
    +            if (timeBytes != null) {
    +              long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
    +              resultMap.put(region, pruneUpperBoundRegion);
    +            }
    +          }
    +        }
    +      }
    +      return resultMap;
    +    }
    +  }
    +
    +  /**
    +   * Delete all regions that are not in the given exclude set and whose prune upper bound is less than a given value
    +   *
    +   * @param deletionPruneUpperBound prune upper bound below which regions will be deleted
    +   * @param excludeRegions set of regions that should not be deleted
    +   * @throws IOException when not able to delete data in HBase
    +   */
    +  public void deleteRegionsWithPruneUpperBoundBefore(long deletionPruneUpperBound, SortedSet<byte[]> excludeRegions)
    +    throws IOException {
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
    +      Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
    +      scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
    +
    +      try (ResultScanner scanner = stateTable.getScanner(scan)) {
    +        Result next;
    +        while ((next = scanner.next()) != null) {
    +          byte[] region = getRegionFromKey(next.getRow());
    +          if (!excludeRegions.contains(region)) {
    +            byte[] timeBytes = next.getValue(FAMILY, PRUNE_UPPER_BOUND_COL);
    +            if (timeBytes != null) {
    +              long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
    +              if (pruneUpperBoundRegion < deletionPruneUpperBound) {
    +                stateTable.delete(new Delete(next.getRow()));
    +              }
    +            }
    +          }
    +        }
    +      }
    +    }
    +  }
    +
    +  // ---------------------------------------------------
    +  // ------- Methods for regions at a given time -------
    +  // Key: 0x2<time><region-id>
    +  // Col 't': <empty byte array>
    +  // ---------------------------------------------------
    +
    +  /**
    +   * Persist the regions for a given time
    +   *
    +   * @param time timestamp in milliseconds
    +   * @param regions set of regions at the time
    +   * @throws IOException when not able to persist the data to HBase
    +   */
    +  public void saveRegionsForTime(long time, Set<byte[]> regions) throws IOException {
    +    byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      for (byte[] region : regions) {
    +        Put put = new Put(makeTimeRegionKey(timeBytes, region));
    +        put.addColumn(FAMILY, REGION_TIME_COL, EMPTY_BYTE_ARRAY);
    +        stateTable.put(put);
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Return all the persisted regions for a time equal to or less than the given time
    +   *
    +   * @param time timestamp in milliseconds
    +   * @return set of regions and time at which they were recorded
    +   * @throws IOException when not able to read the data from HBase
    +   */
    +  @Nullable
    +  public TimeRegions getRegionsOnOrBeforeTime(long time) throws IOException {
    +    byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
    +      scan.addColumn(FAMILY, REGION_TIME_COL);
    +
    +      SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
    +      long currentRegionTime = -1;
    +      try (ResultScanner scanner = stateTable.getScanner(scan)) {
    +        Result next;
    +        while ((next = scanner.next()) != null) {
    +          Map.Entry<Long, byte[]> timeRegion = getTimeRegion(next.getRow());
    +          // Stop if reached next time value
    +          if (currentRegionTime == -1) {
    +            currentRegionTime = timeRegion.getKey();
    +          } else if (timeRegion.getKey() < currentRegionTime) {
    +            break;
    +          } else if (timeRegion.getKey() > currentRegionTime) {
    +            throw new IllegalStateException(
    +              String.format("Got out of order time %d when expecting time lesser than %d",
    +                            timeRegion.getKey(), currentRegionTime));
    +          }
    +          regions.add(timeRegion.getValue());
    +        }
    +      }
    +      return regions.isEmpty() ? null : new TimeRegions(currentRegionTime, regions);
    +    }
    +  }
    +
    +  /**
    +   * Delete all the regions that were recorded for all times equal or less than the given time
    +   *
    +   * @param time timestamp in milliseconds
    +   * @throws IOException when not able to delete data in HBase
    +   */
    +  public void deleteAllRegionsOnOrBeforeTime(long time) throws IOException {
    +    byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
    +      scan.addColumn(FAMILY, REGION_TIME_COL);
    +
    +      try (ResultScanner scanner = stateTable.getScanner(scan)) {
    +        Result next;
    +        while ((next = scanner.next()) != null) {
    +          stateTable.delete(new Delete(next.getRow()));
    +        }
    +      }
    +    }
    +  }
    +
    +  // ----------------------------------------------------------------
    +  // ------- Methods for max prune upper bound for given time -------
    +  // Key: 0x3<inverted time>
    +  // Col 'p': <prune upper bound>
    +  // ----------------------------------------------------------------
    +
    +  /**
    +   * Persist prune upper bound for a given time
    +   *
    +   * @param time time in milliseconds
    +   * @param pruneUpperBoundTime prune upper bound for the given time
    +   * @throws IOException when not able to persist the data to HBase
    +   */
    +  public void savePruneUpperBoundForTime(long time, long pruneUpperBoundTime) throws IOException {
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      Put put = new Put(makePruneUpperBoundTimeKey(Bytes.toBytes(getInvertedTime(time))));
    +      put.addColumn(FAMILY, PRUNE_UPPER_BOUND_TIME_COL, Bytes.toBytes(pruneUpperBoundTime));
    +      stateTable.put(put);
    +    }
    +  }
    +
    +  /**
    +   * Return prune upper bound for the given time
    +   *
    +   * @param time time in milliseconds
    +   * @return prune upper bound for the given time
    +   * @throws IOException when not able to read the data from HBase
    +   */
    +  public long getPruneUpperBoundForTime(long time) throws IOException {
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      Get get = new Get(makePruneUpperBoundTimeKey(Bytes.toBytes(getInvertedTime(time))));
    +      get.addColumn(FAMILY, PRUNE_UPPER_BOUND_TIME_COL);
    +      byte[] result = stateTable.get(get).getValue(FAMILY, PRUNE_UPPER_BOUND_TIME_COL);
    +      return result == null ? -1 : Bytes.toLong(result);
    +    }
    +  }
    +
    +  /**
    +   * Delete all prune upper bounds recorded for a time less than the given time
    +   *
    +   * @param time time in milliseconds
    +   * @throws IOException when not able to delete data in HBase
    +   */
    +  public void deletePruneUpperBoundsOnOrBeforeTime(long time) throws IOException {
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      Scan scan = new Scan(makePruneUpperBoundTimeKey(Bytes.toBytes(getInvertedTime(time))),
    +                           PRUNE_UPPER_BOUND_TIME_KEY_PREFIX_STOP);
    +      scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_TIME_COL);
    +
    +      try (ResultScanner scanner = stateTable.getScanner(scan)) {
    +        Result next;
    +        while ((next = scanner.next()) != null) {
    +          stateTable.delete(new Delete(next.getRow()));
    +        }
    +      }
    +    }
    +  }
    +
       private byte[] makeRegionKey(byte[] regionId) {
         return Bytes.add(REGION_KEY_PREFIX, regionId);
       }
     
    +  private byte[] getRegionFromKey(byte[] regionKey) {
    +    return Bytes.copy(regionKey, 1, regionKey.length - 1);
    +  }
    +
    +  private byte[] makeTimeRegionKey(byte[] time, byte[] regionId) {
    +    return Bytes.add(REGION_TIME_KEY_PREFIX, time, regionId);
    +  }
    +
    +  private byte[] makePruneUpperBoundTimeKey(byte[] time) {
    +    return Bytes.add(PRUNE_UPPER_BOUND_TIME_KEY_PREFIX, time);
    +  }
    +
    +  private Map.Entry<Long, byte[]> getTimeRegion(byte[] key) {
    +    int offset = 1;
    +    long time = getInvertedTime(Bytes.toLong(key, offset));
    +    offset += Longs.BYTES;
    --- End diff --
    
    better use Bytes.SIZEOF_LONG here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

Posted by anew <gi...@git.apache.org>.
Github user anew commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/20#discussion_r90759195
  
    --- Diff: tephra-core/src/main/java/org/apache/tephra/janitor/TransactionPruningPlugin.java ---
    @@ -0,0 +1,90 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.tephra.janitor;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +import java.io.IOException;
    +
    +/**
    + * Data janitor interface to manage the invalid transaction list.
    + *
    + * <p/>
    + * An invalid transaction can only be removed from the invalid list after the data written
    + * by the invalid transactions has been removed from all the data stores.
    + * The term data store is used here to represent a set of tables in a database that have
    + * the same data clean up policy, like all Apache Phoenix tables in an HBase instance.
    + *
    + * <p/>
    + * Typically every data store will have a background job which cleans up the data written by invalid transactions.
    + * Prune upper bound for a data store is defined as the largest invalid transaction whose data has been
    + * cleaned up from that data store.
    + * <pre>
    + * prune-upper-bound = min(max(invalid list), min(in-progress list) - 1)
    + * </pre>
    + * where invalid list and in-progress list are from the transaction snapshot used to clean up the invalid data in the
    + * data store.
    + *
    + * <p/>
    + * There will be one such plugin per data store. The plugins will be executed as part of the Transaction Service.
    + * Each plugin will be invoked periodically to fetch the prune upper bound for its data store.
    + * Invalid transaction list can pruned up to the minimum of prune upper bounds returned by all the plugins.
    + */
    +public interface TransactionPruningPlugin {
    +  /**
    +   * Called once when the Transaction Service starts up.
    +   *
    +   * @param conf configuration for the plugin
    +   */
    +  void initialize(Configuration conf) throws IOException;
    +
    +  /**
    +   * Called periodically to fetch prune upper bound for a data store. The plugin examines the state of data cleanup
    +   * in the data store and determines the smallest invalid transaction whose writes no longer exist in the data
    +   * store. It then returns this smallest invalid transaction as the prune upper bound for this data store.
    +   *
    +   * @param time start time of this prune iteration in milliseconds
    +   * @param pruneUpperBoundForTime the largest invalid transaction that can be possibly removed
    +   *                               from the invalid list for the given time.
    +   *                               In terms of HBase, this is the smallest not in-progress transaction that will
    +   *                               not have writes in any HBase regions that are created after the given time.
    +   *                               The plugin will typically return a reduced upper bound based on the state of
    +   *                               the invalid transaction data clean up in the data store.
    --- End diff --
    
    I still don't understand what this is. I though this is an upper bound determined by the tx manager, based on its knowlegde of what invalid transactions may still have active processes and therefore future writes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

Posted by poornachandra <gi...@git.apache.org>.
Github user poornachandra commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/20#discussion_r90765102
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/HBaseTransactionPruningPlugin.java ---
    @@ -0,0 +1,289 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.tephra.hbase.coprocessor.janitor;
    +
    +import com.google.common.base.Function;
    +import com.google.common.collect.Iterables;
    +import com.google.common.collect.Maps;
    +import com.google.common.collect.Sets;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hbase.HRegionInfo;
    +import org.apache.hadoop.hbase.HTableDescriptor;
    +import org.apache.hadoop.hbase.TableName;
    +import org.apache.hadoop.hbase.client.Admin;
    +import org.apache.hadoop.hbase.client.Connection;
    +import org.apache.hadoop.hbase.client.ConnectionFactory;
    +import org.apache.hadoop.hbase.client.Table;
    +import org.apache.hadoop.hbase.util.Bytes;
    +import org.apache.tephra.TxConstants;
    +import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
    +import org.apache.tephra.janitor.TransactionPruningPlugin;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.SortedSet;
    +import java.util.TreeSet;
    +
    +/**
    + * Default implementation of the {@link TransactionPruningPlugin} for HBase.
    + *
    + * This plugin determines the prune upper bound for transactional HBase tables that use
    + * coprocessor {@link TransactionProcessor}.
    + *
    + * <h3>State storage:</h3>
    + *
    + * This plugin expects the TransactionProcessor to save the prune upper bound for invalid transactions
    + * after every major compaction of a region. Let's call this <i>(region, prune upper bound)</i>.
    + * In addition, the plugin also persists the following information on a run at time <i>t</i>
    + * <ul>
    + *   <li>
    + *     <i>(t, set of regions)</i>: Set of transactional regions at time <i>t</i>.
    + *     Transactional regions are regions of the tables that have the coprocessor TransactionProcessor
    + *     attached to them.
    + *   </li>
    + *   <li>
    + *     <i>(t, prune upper bound)</i>: This is the smallest not in-progress transaction that
    + *     will not have writes in any HBase regions that are created after time <i>t</i>.
    + *     This value is determined by the Transaction Service based on the transaction state at time <i>t</i>
    + *     and passed on to the plugin.
    + *   </li>
    + * </ul>
    + *
    + * <h3>Computing prune upper bound:</h3>
    + *
    + * In a typical HBase instance, there can be a constant change in the number of regions due to region creations,
    + * splits and merges. At any given time there can always be a region on which a major compaction has not been run.
    + * Since the prune upper bound will get recorded for a region only after a major compaction,
    + * using only the latest set of regions we may not be able to find the
    + * prune upper bounds for all the current regions. Hence we persist the set of regions that exist at that time
    + * of each run of the plugin, and use historical region set for time <i>t</i>, <i>t - 1</i>, etc.
    + * to determine the prune upper bound.
    + *
    + * From the regions saved at time <i>t</i>, <i>t - 1</i>, etc.,
    + * the plugin tries to find the latest <i>(t, set of regions)</i> where all regions have been major compacted,
    + * i.e, all regions have prune upper bound recorded in <i>(region, prune upper bound)</i>.
    + * <br/>
    + * If such a set is found for time <i>t1</i>, the prune upper bound returned by the plugin is the minimum of
    + * <ul>
    + *   <li>Prune upper bounds of regions in set <i>(t1, set of regions)</i></li>
    + *   <li>Prune upper bound from <i>(t1, prune upper bound)</i></li>
    + * </ul>
    + *
    + * <p/>
    + * Above, when we find <i>(t1, set of regions)</i>, there may a region that was created after time <i>t1</i>,
    + * but has a data write from an invalid transaction that is smaller than the prune upper bounds of all
    + * regions in <i>(t1, set of regions)</i>. This is possible because <i>(region, prune upper bound)</i> persisted by
    + * TransactionProcessor is always the latest prune upper bound for a region.
    + * <br/>
    + * However a region created after time <i>t1</i> cannot have writes from an invalid transaction that is smaller than
    + * <i>min(max(invalid list), min(in-progress list) - 1)</i> at the time the region was created.
    + * Since we limit the plugin prune upper bound using <i>(t1, prune upper bound)</i>, there should be no invalid
    + * transactions smaller than the plugin prune upper bound with writes in any transactional region of
    + * this HBase instance.
    + *
    + * <p/>
    + * Note: If your tables uses a transactional coprocessor other than TransactionProcessor,
    + * then you may need to write a new plugin to compute prune upper bound for those tables.
    + */
    +@SuppressWarnings("WeakerAccess")
    +public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
    +  public static final Logger LOG = LoggerFactory.getLogger(HBaseTransactionPruningPlugin.class);
    +
    +  protected Configuration conf;
    +  protected Connection connection;
    +  protected DataJanitorState dataJanitorState;
    +
    +  @Override
    +  public void initialize(Configuration conf) throws IOException {
    +    this.conf = conf;
    +    this.connection = ConnectionFactory.createConnection(conf);
    +
    +    final TableName stateTable = TableName.valueOf(conf.get(TxConstants.DataJanitor.PRUNE_STATE_TABLE,
    +                                                            TxConstants.DataJanitor.DEFAULT_PRUNE_STATE_TABLE));
    +    LOG.info("Initializing plugin with state table {}", stateTable.getNameWithNamespaceInclAsString());
    +    this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
    +      @Override
    +      public Table get() throws IOException {
    +        return connection.getTable(stateTable);
    +      }
    +    });
    +  }
    +
    +  /**
    +   * Determines prune upper bound for the data store as mentioned above.
    +   */
    +  @Override
    +  public long fetchPruneUpperBound(long time, long pruneUpperBoundForTime) throws IOException {
    +    LOG.debug("Fetching prune upper bound for time {} and max prune upper bound {}", time, pruneUpperBoundForTime);
    +    if (time < 0 || pruneUpperBoundForTime < 0) {
    +      return -1;
    +    }
    +
    +    // Get all the current transactional regions
    +    SortedSet<byte[]> transactionalRegions = getTransactionalRegions();
    +    if (!transactionalRegions.isEmpty()) {
    +      LOG.debug("Saving {} transactional regions for time {}", transactionalRegions.size(), time);
    +      dataJanitorState.saveRegionsForTime(time, transactionalRegions);
    +      // Save prune upper bound for time as the final step.
    +      // We can then use its existence to make sure that the data for a given time is complete or not
    +      LOG.debug("Saving max prune upper bound {} for time {}", pruneUpperBoundForTime, time);
    +      dataJanitorState.savePruneUpperBoundForTime(time, pruneUpperBoundForTime);
    +    }
    +
    +    return computePruneUpperBound(new TimeRegions(time, transactionalRegions));
    +  }
    +
    +  /**
    +   * After invalid list has been pruned, this cleans up state information that is no longer required.
    +   * This includes -
    +   * <ul>
    +   *   <li>
    +   *     <i>(region, prune upper bound)</i> - prune upper bound for regions that are older
    +   *     than maxPrunedInvalid
    +   *   </li>
    +   *   <li>
    +   *     <i>(t, set of regions) - Regions set that were recorded on or before the start time
    +   *     of maxPrunedInvalid
    +   *   </li>
    +   *   <li>
    +   *     (t, prune upper bound) - Smallest not in-progress transaction without any writes in new regions
    +   *     information recorded on or before the start time of maxPrunedInvalid
    +   *   </li>
    +   * </ul>
    +   */
    +  @Override
    +  public void pruneComplete(long time, long maxPrunedInvalid) throws IOException {
    +    LOG.debug("Prune complete for time {} and prune upper bound {}", time, maxPrunedInvalid);
    +    if (time < 0 || maxPrunedInvalid < 0) {
    +      return;
    +    }
    +
    +    // Get regions for given time, so as to not delete them
    +    TimeRegions regionsToExclude = dataJanitorState.getRegionsOnOrBeforeTime(time);
    +    if (regionsToExclude != null) {
    +      LOG.debug("Deleting stale region - prune upper bound record before {}", maxPrunedInvalid);
    --- End diff --
    
    Added clarifying comments


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra issue #20: Compute global prune upper bound using compactio...

Posted by anew <gi...@git.apache.org>.
Github user anew commented on the issue:

    https://github.com/apache/incubator-tephra/pull/20
  
    LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

Posted by anew <gi...@git.apache.org>.
Github user anew commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/20#discussion_r90766371
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/HBaseTransactionPruningPlugin.java ---
    @@ -259,6 +262,8 @@ private long computePruneUpperBound(TimeRegions timeRegions) throws IOException
             if (pruneUpperBoundForTime != -1) {
               Long minPruneUpperBoundRegions = Collections.min(pruneUpperBoundRegions.values());
               return Math.min(pruneUpperBoundForTime, minPruneUpperBoundRegions);
    +        } else {
    +          LOG.debug("Ignoring invalid prune upper bound -1 for time {}", time);
    --- End diff --
    
    A better message would be: Ignoring regions for time {} because no pruneUpperBound was found for that time and the data must be incomplete. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

Posted by anew <gi...@git.apache.org>.
Github user anew commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/20#discussion_r89938020
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DefaultDataJanitorPlugin.java ---
    @@ -0,0 +1,189 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.tephra.hbase.coprocessor.janitor;
    +
    +import com.google.common.base.Function;
    +import com.google.common.collect.Iterables;
    +import com.google.common.collect.Maps;
    +import com.google.common.collect.Sets;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hbase.HRegionInfo;
    +import org.apache.hadoop.hbase.HTableDescriptor;
    +import org.apache.hadoop.hbase.TableName;
    +import org.apache.hadoop.hbase.client.Admin;
    +import org.apache.hadoop.hbase.client.Connection;
    +import org.apache.hadoop.hbase.client.ConnectionFactory;
    +import org.apache.hadoop.hbase.client.Table;
    +import org.apache.hadoop.hbase.util.Bytes;
    +import org.apache.tephra.TxConstants;
    +import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
    +import org.apache.tephra.janitor.DataJanitorPlugin;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.SortedSet;
    +import java.util.TreeSet;
    +
    +/**
    + * Default implementation of the {@link DataJanitorPlugin} for HBase
    + */
    +@SuppressWarnings("WeakerAccess")
    +public class DefaultDataJanitorPlugin implements DataJanitorPlugin {
    --- End diff --
    
    I think this is the HBaseJanitorPlugin? And if you had another store, it would be entirely different?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

Posted by anew <gi...@git.apache.org>.
Github user anew commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/20#discussion_r89377823
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java ---
    @@ -58,10 +105,237 @@ public long getPruneUpperBound(byte[] regionId) throws IOException {
         }
       }
     
    +  /**
    +   * Get latest prune upper bounds for given regions
    +   *
    +   * @param regions a set of regions
    +   * @return a map containing region id and its latest prune upper bound value
    +   * @throws IOException when not able to read the data from HBase
    +   */
    +  public Map<byte[], Long> getPruneUpperBoundForRegions(SortedSet<byte[]> regions) throws IOException {
    +    Map<byte[], Long> resultMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
    +      Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
    +      scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
    +
    +      try (ResultScanner scanner = stateTable.getScanner(scan)) {
    +        Result next;
    +        while ((next = scanner.next()) != null) {
    +          byte[] region = getRegionFromKey(next.getRow());
    +          if (regions.contains(region)) {
    +            byte[] timeBytes = next.getValue(FAMILY, PRUNE_UPPER_BOUND_COL);
    +            if (timeBytes != null) {
    +              long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
    +              resultMap.put(region, pruneUpperBoundRegion);
    +            }
    +          }
    +        }
    +      }
    +      return resultMap;
    +    }
    +  }
    +
    +  /**
    +   * Delete all regions that are not in the given exclude set and whose prune upper bound is less than a given value
    +   *
    +   * @param deletionPruneUpperBound prune upper bound below which regions will be deleted
    +   * @param excludeRegions set of regions that should not be deleted
    +   * @throws IOException when not able to delete data in HBase
    +   */
    +  public void deleteRegionsWithPruneUpperBoundBefore(long deletionPruneUpperBound, SortedSet<byte[]> excludeRegions)
    +    throws IOException {
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
    +      Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
    +      scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
    +
    +      try (ResultScanner scanner = stateTable.getScanner(scan)) {
    +        Result next;
    +        while ((next = scanner.next()) != null) {
    +          byte[] region = getRegionFromKey(next.getRow());
    +          if (!excludeRegions.contains(region)) {
    +            byte[] timeBytes = next.getValue(FAMILY, PRUNE_UPPER_BOUND_COL);
    +            if (timeBytes != null) {
    +              long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
    +              if (pruneUpperBoundRegion < deletionPruneUpperBound) {
    +                stateTable.delete(new Delete(next.getRow()));
    +              }
    +            }
    +          }
    +        }
    +      }
    +    }
    +  }
    +
    +  // ---------------------------------------------------
    +  // ------- Methods for regions at a given time -------
    +  // Key: 0x2<time><region-id>
    +  // Col 't': <empty byte array>
    +  // ---------------------------------------------------
    +
    +  /**
    +   * Persist the regions for a given time
    +   *
    +   * @param time timestamp in milliseconds
    +   * @param regions set of regions at the time
    +   * @throws IOException when not able to persist the data to HBase
    +   */
    +  public void saveRegionsForTime(long time, Set<byte[]> regions) throws IOException {
    +    byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      for (byte[] region : regions) {
    +        Put put = new Put(makeTimeRegionKey(timeBytes, region));
    +        put.addColumn(FAMILY, REGION_TIME_COL, EMPTY_BYTE_ARRAY);
    +        stateTable.put(put);
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Return all the persisted regions for a time equal to or less than the given time
    +   *
    --- End diff --
    
    If I read the code correctly, then this finds the greatest time that is less than the given time, and then return all regions with that exact time, but none that are older than that. Is that correct? The javadoc is not that clear about it. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra issue #20: Compute global prune upper bound using compactio...

Posted by poornachandra <gi...@git.apache.org>.
Github user poornachandra commented on the issue:

    https://github.com/apache/incubator-tephra/pull/20
  
    @anew Please take a look at the last rename commit


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

Posted by anew <gi...@git.apache.org>.
Github user anew commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/20#discussion_r89206674
  
    --- Diff: tephra-core/src/main/java/org/apache/tephra/janitor/DataJanitorPlugin.java ---
    @@ -0,0 +1,59 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.tephra.janitor;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +import java.io.IOException;
    +
    +/**
    + * Data janitor interface to manage the invalid transaction list.
    + * There will be one such plugin per data store that will be invoked periodically
    + * to fetch the prune upper bound for each data store.
    + * Invalid transaction list will pruned up to the minimum of prune upper bounds returned by all the plugins.
    + */
    +public interface DataJanitorPlugin {
    +  /**
    +   * Called once at the beginning to initialize the plugin
    +   */
    +  void initialize(Configuration conf) throws IOException;
    +
    +  /**
    +   * Called periodically to fetch prune upper bound for a data store
    +   *
    +   * @param time start time of this prune iteration
    +   * @param pruneUpperBoundForTime upper bound for prune tx id for the given start time
    +   */
    +  long fetchPruneUpperBound(long time, long pruneUpperBoundForTime) throws IOException;
    +
    +  /**
    +   * Called after pruning the invalid list.
    +   * The plugin can use the pruneUpperBound passed to clean up its state
    +   *
    +   * @param time start time of this prune iteration
    +   * @param pruneUpperBound prune upper bound used to prune the invalid list
    +   */
    +  void pruneComplete(long time, long pruneUpperBound) throws IOException;
    +
    +  /**
    +   * Called once during shutdown
    +   */
    +  void destroy() throws IOException;
    --- End diff --
    
    should destroy() throw exceptions? I mean, normally it should not.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

Posted by anew <gi...@git.apache.org>.
Github user anew commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/20#discussion_r90759970
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/HBaseTransactionPruningPlugin.java ---
    @@ -0,0 +1,289 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.tephra.hbase.coprocessor.janitor;
    +
    +import com.google.common.base.Function;
    +import com.google.common.collect.Iterables;
    +import com.google.common.collect.Maps;
    +import com.google.common.collect.Sets;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hbase.HRegionInfo;
    +import org.apache.hadoop.hbase.HTableDescriptor;
    +import org.apache.hadoop.hbase.TableName;
    +import org.apache.hadoop.hbase.client.Admin;
    +import org.apache.hadoop.hbase.client.Connection;
    +import org.apache.hadoop.hbase.client.ConnectionFactory;
    +import org.apache.hadoop.hbase.client.Table;
    +import org.apache.hadoop.hbase.util.Bytes;
    +import org.apache.tephra.TxConstants;
    +import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
    +import org.apache.tephra.janitor.TransactionPruningPlugin;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.SortedSet;
    +import java.util.TreeSet;
    +
    +/**
    + * Default implementation of the {@link TransactionPruningPlugin} for HBase.
    + *
    + * This plugin determines the prune upper bound for transactional HBase tables that use
    + * coprocessor {@link TransactionProcessor}.
    + *
    + * <h3>State storage:</h3>
    + *
    + * This plugin expects the TransactionProcessor to save the prune upper bound for invalid transactions
    + * after every major compaction of a region. Let's call this <i>(region, prune upper bound)</i>.
    + * In addition, the plugin also persists the following information on a run at time <i>t</i>
    + * <ul>
    + *   <li>
    + *     <i>(t, set of regions)</i>: Set of transactional regions at time <i>t</i>.
    + *     Transactional regions are regions of the tables that have the coprocessor TransactionProcessor
    + *     attached to them.
    + *   </li>
    + *   <li>
    + *     <i>(t, prune upper bound)</i>: This is the smallest not in-progress transaction that
    + *     will not have writes in any HBase regions that are created after time <i>t</i>.
    + *     This value is determined by the Transaction Service based on the transaction state at time <i>t</i>
    + *     and passed on to the plugin.
    + *   </li>
    + * </ul>
    + *
    + * <h3>Computing prune upper bound:</h3>
    + *
    + * In a typical HBase instance, there can be a constant change in the number of regions due to region creations,
    + * splits and merges. At any given time there can always be a region on which a major compaction has not been run.
    + * Since the prune upper bound will get recorded for a region only after a major compaction,
    + * using only the latest set of regions we may not be able to find the
    + * prune upper bounds for all the current regions. Hence we persist the set of regions that exist at that time
    + * of each run of the plugin, and use historical region set for time <i>t</i>, <i>t - 1</i>, etc.
    + * to determine the prune upper bound.
    + *
    + * From the regions saved at time <i>t</i>, <i>t - 1</i>, etc.,
    + * the plugin tries to find the latest <i>(t, set of regions)</i> where all regions have been major compacted,
    + * i.e, all regions have prune upper bound recorded in <i>(region, prune upper bound)</i>.
    + * <br/>
    + * If such a set is found for time <i>t1</i>, the prune upper bound returned by the plugin is the minimum of
    + * <ul>
    + *   <li>Prune upper bounds of regions in set <i>(t1, set of regions)</i></li>
    + *   <li>Prune upper bound from <i>(t1, prune upper bound)</i></li>
    + * </ul>
    + *
    + * <p/>
    + * Above, when we find <i>(t1, set of regions)</i>, there may a region that was created after time <i>t1</i>,
    + * but has a data write from an invalid transaction that is smaller than the prune upper bounds of all
    + * regions in <i>(t1, set of regions)</i>. This is possible because <i>(region, prune upper bound)</i> persisted by
    + * TransactionProcessor is always the latest prune upper bound for a region.
    + * <br/>
    + * However a region created after time <i>t1</i> cannot have writes from an invalid transaction that is smaller than
    + * <i>min(max(invalid list), min(in-progress list) - 1)</i> at the time the region was created.
    + * Since we limit the plugin prune upper bound using <i>(t1, prune upper bound)</i>, there should be no invalid
    + * transactions smaller than the plugin prune upper bound with writes in any transactional region of
    + * this HBase instance.
    + *
    + * <p/>
    + * Note: If your tables uses a transactional coprocessor other than TransactionProcessor,
    + * then you may need to write a new plugin to compute prune upper bound for those tables.
    + */
    +@SuppressWarnings("WeakerAccess")
    +public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
    +  public static final Logger LOG = LoggerFactory.getLogger(HBaseTransactionPruningPlugin.class);
    +
    +  protected Configuration conf;
    +  protected Connection connection;
    +  protected DataJanitorState dataJanitorState;
    +
    +  @Override
    +  public void initialize(Configuration conf) throws IOException {
    +    this.conf = conf;
    +    this.connection = ConnectionFactory.createConnection(conf);
    +
    +    final TableName stateTable = TableName.valueOf(conf.get(TxConstants.DataJanitor.PRUNE_STATE_TABLE,
    +                                                            TxConstants.DataJanitor.DEFAULT_PRUNE_STATE_TABLE));
    +    LOG.info("Initializing plugin with state table {}", stateTable.getNameWithNamespaceInclAsString());
    +    this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
    +      @Override
    +      public Table get() throws IOException {
    +        return connection.getTable(stateTable);
    +      }
    +    });
    +  }
    +
    +  /**
    +   * Determines prune upper bound for the data store as mentioned above.
    +   */
    +  @Override
    +  public long fetchPruneUpperBound(long time, long pruneUpperBoundForTime) throws IOException {
    +    LOG.debug("Fetching prune upper bound for time {} and max prune upper bound {}", time, pruneUpperBoundForTime);
    +    if (time < 0 || pruneUpperBoundForTime < 0) {
    +      return -1;
    +    }
    +
    +    // Get all the current transactional regions
    +    SortedSet<byte[]> transactionalRegions = getTransactionalRegions();
    +    if (!transactionalRegions.isEmpty()) {
    +      LOG.debug("Saving {} transactional regions for time {}", transactionalRegions.size(), time);
    +      dataJanitorState.saveRegionsForTime(time, transactionalRegions);
    +      // Save prune upper bound for time as the final step.
    +      // We can then use its existence to make sure that the data for a given time is complete or not
    +      LOG.debug("Saving max prune upper bound {} for time {}", pruneUpperBoundForTime, time);
    +      dataJanitorState.savePruneUpperBoundForTime(time, pruneUpperBoundForTime);
    +    }
    +
    +    return computePruneUpperBound(new TimeRegions(time, transactionalRegions));
    +  }
    +
    +  /**
    +   * After invalid list has been pruned, this cleans up state information that is no longer required.
    +   * This includes -
    +   * <ul>
    +   *   <li>
    +   *     <i>(region, prune upper bound)</i> - prune upper bound for regions that are older
    +   *     than maxPrunedInvalid
    +   *   </li>
    +   *   <li>
    +   *     <i>(t, set of regions) - Regions set that were recorded on or before the start time
    +   *     of maxPrunedInvalid
    +   *   </li>
    +   *   <li>
    +   *     (t, prune upper bound) - Smallest not in-progress transaction without any writes in new regions
    +   *     information recorded on or before the start time of maxPrunedInvalid
    +   *   </li>
    +   * </ul>
    +   */
    +  @Override
    +  public void pruneComplete(long time, long maxPrunedInvalid) throws IOException {
    +    LOG.debug("Prune complete for time {} and prune upper bound {}", time, maxPrunedInvalid);
    +    if (time < 0 || maxPrunedInvalid < 0) {
    +      return;
    +    }
    +
    +    // Get regions for given time, so as to not delete them
    +    TimeRegions regionsToExclude = dataJanitorState.getRegionsOnOrBeforeTime(time);
    +    if (regionsToExclude != null) {
    +      LOG.debug("Deleting stale region - prune upper bound record before {}", maxPrunedInvalid);
    --- End diff --
    
    So this will only delete the entries for regions that do not exist any longer? Meaning, for regions that still exist, we never clean up that state? It's not clear why we have to keep it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra issue #20: Compute global prune upper bound using compactio...

Posted by anew <gi...@git.apache.org>.
Github user anew commented on the issue:

    https://github.com/apache/incubator-tephra/pull/20
  
    I did another pass. Only a few more comments... 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

Posted by anew <gi...@git.apache.org>.
Github user anew commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/20#discussion_r89938559
  
    --- Diff: tephra-core/src/main/java/org/apache/tephra/janitor/DataJanitorPlugin.java ---
    @@ -0,0 +1,59 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.tephra.janitor;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +import java.io.IOException;
    +
    +/**
    + * Data janitor interface to manage the invalid transaction list.
    + * There will be one such plugin per data store that will be invoked periodically
    + * to fetch the prune upper bound for each data store.
    + * Invalid transaction list will pruned up to the minimum of prune upper bounds returned by all the plugins.
    + */
    +public interface DataJanitorPlugin {
    +  /**
    +   * Called once at the beginning to initialize the plugin
    +   */
    +  void initialize(Configuration conf) throws IOException;
    +
    +  /**
    +   * Called periodically to fetch prune upper bound for a data store
    +   *
    +   * @param time start time of this prune iteration
    +   * @param pruneUpperBoundForTime upper bound for prune tx id for the given start time
    +   */
    +  long fetchPruneUpperBound(long time, long pruneUpperBoundForTime) throws IOException;
    +
    +  /**
    +   * Called after pruning the invalid list.
    +   * The plugin can use the pruneUpperBound passed to clean up its state
    --- End diff --
    
    is this the same pruneUpperBound that was returned by fetchPruneUpperBound? What is the semantic of this - does it mean it is guaranteed that all invalid transactions less than that upper bound have been removed from the invalid list?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

Posted by anew <gi...@git.apache.org>.
Github user anew commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/20#discussion_r89207839
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java ---
    @@ -58,10 +105,237 @@ public long getPruneUpperBound(byte[] regionId) throws IOException {
         }
       }
     
    +  /**
    +   * Get latest prune upper bounds for given regions
    +   *
    +   * @param regions a set of regions
    +   * @return a map containing region id and its latest prune upper bound value
    +   * @throws IOException when not able to read the data from HBase
    +   */
    +  public Map<byte[], Long> getPruneUpperBoundForRegions(SortedSet<byte[]> regions) throws IOException {
    +    Map<byte[], Long> resultMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
    +      Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
    +      scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
    +
    +      try (ResultScanner scanner = stateTable.getScanner(scan)) {
    +        Result next;
    +        while ((next = scanner.next()) != null) {
    +          byte[] region = getRegionFromKey(next.getRow());
    +          if (regions.contains(region)) {
    +            byte[] timeBytes = next.getValue(FAMILY, PRUNE_UPPER_BOUND_COL);
    +            if (timeBytes != null) {
    +              long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
    +              resultMap.put(region, pruneUpperBoundRegion);
    +            }
    +          }
    +        }
    +      }
    +      return resultMap;
    +    }
    +  }
    +
    +  /**
    +   * Delete all regions that are not in the given exclude set and whose prune upper bound is less than a given value
    --- End diff --
    
    period.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

Posted by poornachandra <gi...@git.apache.org>.
Github user poornachandra commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/20#discussion_r90766611
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/HBaseTransactionPruningPlugin.java ---
    @@ -259,6 +262,8 @@ private long computePruneUpperBound(TimeRegions timeRegions) throws IOException
             if (pruneUpperBoundForTime != -1) {
               Long minPruneUpperBoundRegions = Collections.min(pruneUpperBoundRegions.values());
               return Math.min(pruneUpperBoundForTime, minPruneUpperBoundRegions);
    +        } else {
    +          LOG.debug("Ignoring invalid prune upper bound -1 for time {}", time);
             }
           } else {
             if (LOG.isDebugEnabled()) {
    --- End diff --
    
    Sure, will update the message


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

Posted by anew <gi...@git.apache.org>.
Github user anew commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/20#discussion_r89391984
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java ---
    @@ -58,10 +105,237 @@ public long getPruneUpperBound(byte[] regionId) throws IOException {
         }
       }
     
    +  /**
    +   * Get latest prune upper bounds for given regions
    +   *
    +   * @param regions a set of regions
    +   * @return a map containing region id and its latest prune upper bound value
    +   * @throws IOException when not able to read the data from HBase
    +   */
    +  public Map<byte[], Long> getPruneUpperBoundForRegions(SortedSet<byte[]> regions) throws IOException {
    +    Map<byte[], Long> resultMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
    +      Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
    +      scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
    +
    +      try (ResultScanner scanner = stateTable.getScanner(scan)) {
    +        Result next;
    +        while ((next = scanner.next()) != null) {
    +          byte[] region = getRegionFromKey(next.getRow());
    +          if (regions.contains(region)) {
    +            byte[] timeBytes = next.getValue(FAMILY, PRUNE_UPPER_BOUND_COL);
    +            if (timeBytes != null) {
    +              long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
    +              resultMap.put(region, pruneUpperBoundRegion);
    +            }
    +          }
    +        }
    +      }
    +      return resultMap;
    +    }
    +  }
    +
    +  /**
    +   * Delete all regions that are not in the given exclude set and whose prune upper bound is less than a given value
    +   *
    +   * @param deletionPruneUpperBound prune upper bound below which regions will be deleted
    +   * @param excludeRegions set of regions that should not be deleted
    +   * @throws IOException when not able to delete data in HBase
    +   */
    +  public void deleteRegionsWithPruneUpperBoundBefore(long deletionPruneUpperBound, SortedSet<byte[]> excludeRegions)
    +    throws IOException {
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
    +      Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
    +      scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
    +
    +      try (ResultScanner scanner = stateTable.getScanner(scan)) {
    +        Result next;
    +        while ((next = scanner.next()) != null) {
    +          byte[] region = getRegionFromKey(next.getRow());
    +          if (!excludeRegions.contains(region)) {
    +            byte[] timeBytes = next.getValue(FAMILY, PRUNE_UPPER_BOUND_COL);
    +            if (timeBytes != null) {
    +              long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
    +              if (pruneUpperBoundRegion < deletionPruneUpperBound) {
    +                stateTable.delete(new Delete(next.getRow()));
    +              }
    +            }
    +          }
    +        }
    +      }
    +    }
    +  }
    +
    +  // ---------------------------------------------------
    +  // ------- Methods for regions at a given time -------
    +  // Key: 0x2<time><region-id>
    +  // Col 't': <empty byte array>
    +  // ---------------------------------------------------
    +
    +  /**
    +   * Persist the regions for a given time
    +   *
    +   * @param time timestamp in milliseconds
    +   * @param regions set of regions at the time
    +   * @throws IOException when not able to persist the data to HBase
    +   */
    +  public void saveRegionsForTime(long time, Set<byte[]> regions) throws IOException {
    +    byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      for (byte[] region : regions) {
    +        Put put = new Put(makeTimeRegionKey(timeBytes, region));
    +        put.addColumn(FAMILY, REGION_TIME_COL, EMPTY_BYTE_ARRAY);
    +        stateTable.put(put);
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Return all the persisted regions for a time equal to or less than the given time
    +   *
    +   * @param time timestamp in milliseconds
    +   * @return set of regions and time at which they were recorded
    +   * @throws IOException when not able to read the data from HBase
    +   */
    +  @Nullable
    +  public TimeRegions getRegionsOnOrBeforeTime(long time) throws IOException {
    +    byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
    +      scan.addColumn(FAMILY, REGION_TIME_COL);
    +
    +      SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
    +      long currentRegionTime = -1;
    +      try (ResultScanner scanner = stateTable.getScanner(scan)) {
    +        Result next;
    +        while ((next = scanner.next()) != null) {
    +          Map.Entry<Long, byte[]> timeRegion = getTimeRegion(next.getRow());
    +          // Stop if reached next time value
    +          if (currentRegionTime == -1) {
    +            currentRegionTime = timeRegion.getKey();
    +          } else if (timeRegion.getKey() < currentRegionTime) {
    +            break;
    +          } else if (timeRegion.getKey() > currentRegionTime) {
    +            throw new IllegalStateException(
    +              String.format("Got out of order time %d when expecting time lesser than %d",
    +                            timeRegion.getKey(), currentRegionTime));
    +          }
    +          regions.add(timeRegion.getValue());
    +        }
    +      }
    +      return regions.isEmpty() ? null : new TimeRegions(currentRegionTime, regions);
    +    }
    +  }
    +
    +  /**
    +   * Delete all the regions that were recorded for all times equal or less than the given time
    +   *
    +   * @param time timestamp in milliseconds
    +   * @throws IOException when not able to delete data in HBase
    +   */
    +  public void deleteAllRegionsOnOrBeforeTime(long time) throws IOException {
    +    byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
    +      scan.addColumn(FAMILY, REGION_TIME_COL);
    +
    +      try (ResultScanner scanner = stateTable.getScanner(scan)) {
    +        Result next;
    +        while ((next = scanner.next()) != null) {
    +          stateTable.delete(new Delete(next.getRow()));
    +        }
    +      }
    +    }
    +  }
    +
    +  // ----------------------------------------------------------------
    +  // ------- Methods for max prune upper bound for given time -------
    +  // Key: 0x3<inverted time>
    +  // Col 'p': <prune upper bound>
    +  // ----------------------------------------------------------------
    +
    +  /**
    +   * Persist prune upper bound for a given time
    +   *
    +   * @param time time in milliseconds
    +   * @param pruneUpperBoundTime prune upper bound for the given time
    +   * @throws IOException when not able to persist the data to HBase
    +   */
    +  public void savePruneUpperBoundForTime(long time, long pruneUpperBoundTime) throws IOException {
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      Put put = new Put(makePruneUpperBoundTimeKey(Bytes.toBytes(getInvertedTime(time))));
    +      put.addColumn(FAMILY, PRUNE_UPPER_BOUND_TIME_COL, Bytes.toBytes(pruneUpperBoundTime));
    +      stateTable.put(put);
    +    }
    +  }
    +
    +  /**
    +   * Return prune upper bound for the given time
    +   *
    +   * @param time time in milliseconds
    +   * @return prune upper bound for the given time
    +   * @throws IOException when not able to read the data from HBase
    +   */
    +  public long getPruneUpperBoundForTime(long time) throws IOException {
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      Get get = new Get(makePruneUpperBoundTimeKey(Bytes.toBytes(getInvertedTime(time))));
    +      get.addColumn(FAMILY, PRUNE_UPPER_BOUND_TIME_COL);
    +      byte[] result = stateTable.get(get).getValue(FAMILY, PRUNE_UPPER_BOUND_TIME_COL);
    +      return result == null ? -1 : Bytes.toLong(result);
    +    }
    +  }
    +
    +  /**
    +   * Delete all prune upper bounds recorded for a time less than the given time
    +   *
    +   * @param time time in milliseconds
    +   * @throws IOException when not able to delete data in HBase
    +   */
    +  public void deletePruneUpperBoundsOnOrBeforeTime(long time) throws IOException {
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      Scan scan = new Scan(makePruneUpperBoundTimeKey(Bytes.toBytes(getInvertedTime(time))),
    +                           PRUNE_UPPER_BOUND_TIME_KEY_PREFIX_STOP);
    +      scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_TIME_COL);
    +
    +      try (ResultScanner scanner = stateTable.getScanner(scan)) {
    +        Result next;
    +        while ((next = scanner.next()) != null) {
    +          stateTable.delete(new Delete(next.getRow()));
    +        }
    +      }
    +    }
    +  }
    +
       private byte[] makeRegionKey(byte[] regionId) {
         return Bytes.add(REGION_KEY_PREFIX, regionId);
       }
     
    +  private byte[] getRegionFromKey(byte[] regionKey) {
    +    return Bytes.copy(regionKey, 1, regionKey.length - 1);
    +  }
    +
    +  private byte[] makeTimeRegionKey(byte[] time, byte[] regionId) {
    +    return Bytes.add(REGION_TIME_KEY_PREFIX, time, regionId);
    +  }
    +
    +  private byte[] makePruneUpperBoundTimeKey(byte[] time) {
    +    return Bytes.add(PRUNE_UPPER_BOUND_TIME_KEY_PREFIX, time);
    +  }
    +
    +  private Map.Entry<Long, byte[]> getTimeRegion(byte[] key) {
    +    int offset = 1;
    --- End diff --
    
    REGION_TIME_KEY_PREFIX.length


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

Posted by anew <gi...@git.apache.org>.
Github user anew commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/20#discussion_r89206584
  
    --- Diff: tephra-core/src/main/java/org/apache/tephra/janitor/DataJanitorPlugin.java ---
    @@ -0,0 +1,59 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.tephra.janitor;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +import java.io.IOException;
    +
    +/**
    + * Data janitor interface to manage the invalid transaction list.
    + * There will be one such plugin per data store that will be invoked periodically
    --- End diff --
    
    Is there one per each HBase instance? Or one per HBase table?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

Posted by anew <gi...@git.apache.org>.
Github user anew commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/20#discussion_r89939212
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DefaultDataJanitorPlugin.java ---
    @@ -0,0 +1,189 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.tephra.hbase.coprocessor.janitor;
    +
    +import com.google.common.base.Function;
    +import com.google.common.collect.Iterables;
    +import com.google.common.collect.Maps;
    +import com.google.common.collect.Sets;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hbase.HRegionInfo;
    +import org.apache.hadoop.hbase.HTableDescriptor;
    +import org.apache.hadoop.hbase.TableName;
    +import org.apache.hadoop.hbase.client.Admin;
    +import org.apache.hadoop.hbase.client.Connection;
    +import org.apache.hadoop.hbase.client.ConnectionFactory;
    +import org.apache.hadoop.hbase.client.Table;
    +import org.apache.hadoop.hbase.util.Bytes;
    +import org.apache.tephra.TxConstants;
    +import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
    +import org.apache.tephra.janitor.DataJanitorPlugin;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.SortedSet;
    +import java.util.TreeSet;
    +
    +/**
    + * Default implementation of the {@link DataJanitorPlugin} for HBase
    + */
    +@SuppressWarnings("WeakerAccess")
    +public class DefaultDataJanitorPlugin implements DataJanitorPlugin {
    +  public static final Logger LOG = LoggerFactory.getLogger(DefaultDataJanitorPlugin.class);
    +
    +  protected Configuration conf;
    +  protected Connection connection;
    +  protected DataJanitorState dataJanitorState;
    +
    +  @Override
    +  public void initialize(Configuration conf) throws IOException {
    +    this.conf = conf;
    +    this.connection = ConnectionFactory.createConnection(conf);
    +
    +    final TableName stateTable = TableName.valueOf(conf.get(TxConstants.DataJanitor.PRUNE_STATE_TABLE,
    +                                                            TxConstants.DataJanitor.DEFAULT_PRUNE_STATE_TABLE));
    +    LOG.info("Initializing plugin with state table {}", stateTable.getNameWithNamespaceInclAsString());
    +    this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
    +      @Override
    +      public Table get() throws IOException {
    +        return connection.getTable(stateTable);
    +      }
    +    });
    +  }
    +
    +  @Override
    +  public long fetchPruneUpperBound(long time, long pruneUpperBoundForTime) throws IOException {
    +    LOG.debug("Fetching prune upper bound for time {} and max prune upper bound {}", time, pruneUpperBoundForTime);
    +    if (time < 0 || pruneUpperBoundForTime < 0) {
    +      return -1;
    +    }
    +
    +    SortedSet<byte[]> transactionalRegions = getTransactionalRegions();
    +    if (!transactionalRegions.isEmpty()) {
    +      LOG.debug("Saving {} transactional regions for time {}", transactionalRegions.size(), time);
    +      dataJanitorState.saveRegionsForTime(time, transactionalRegions);
    +      // Save prune upper bound for time as the final step.
    +      // We can then use its existence to make sure that the data for a given time is complete or not
    +      LOG.debug("Saving max prune upper bound {} for time {}", pruneUpperBoundForTime, time);
    +      dataJanitorState.savePruneUpperBoundForTime(time, pruneUpperBoundForTime);
    +    }
    +
    +    return computePruneUpperBound(new TimeRegions(time, transactionalRegions));
    +  }
    +
    +  @Override
    +  public void pruneComplete(long time, long pruneUpperBound) throws IOException {
    +    LOG.debug("Prune complete for time {} and prune upper bound {}", time, pruneUpperBound);
    +    // Get regions for given time, so as to not delete them
    +    TimeRegions regionsToExclude = dataJanitorState.getRegionsOnOrBeforeTime(time);
    +    if (regionsToExclude != null) {
    +      LOG.debug("Deleting stale region - prune upper bound record before {}", pruneUpperBound);
    +      dataJanitorState.deleteRegionsWithPruneUpperBoundBefore(pruneUpperBound, regionsToExclude.getRegions());
    +    } else {
    +      LOG.warn("Cannot find saved regions on or before time {}", time);
    +    }
    +    long pruneTime = pruneUpperBound / TxConstants.MAX_TX_PER_MS;
    +    LOG.debug("Deleting regions recorded before time {}", pruneTime);
    +    dataJanitorState.deleteAllRegionsOnOrBeforeTime(pruneTime);
    +    LOG.debug("Deleting prune upper bounds recorded on or before time {}", pruneTime);
    +    dataJanitorState.deletePruneUpperBoundsOnOrBeforeTime(pruneTime);
    +  }
    +
    +  @Override
    +  public void destroy() throws IOException {
    +    LOG.info("Stopping plugin...");
    +    connection.close();
    +  }
    +
    +  protected boolean isTransactionalTable(HTableDescriptor tableDescriptor) {
    +    return tableDescriptor.hasCoprocessor(TransactionProcessor.class.getName());
    +  }
    +
    +  protected SortedSet<byte[]> getTransactionalRegions() throws IOException {
    +    SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
    +    try (Admin admin = connection.getAdmin()) {
    +      HTableDescriptor[] tableDescriptors = admin.listTables();
    +      LOG.debug("Got {} tables to process", tableDescriptors == null ? 0 : tableDescriptors.length);
    +      if (tableDescriptors != null) {
    +        for (HTableDescriptor tableDescriptor : tableDescriptors) {
    +          if (isTransactionalTable(tableDescriptor)) {
    +            List<HRegionInfo> tableRegions = admin.getTableRegions(tableDescriptor.getTableName());
    +            LOG.debug("Regions for table {}: {}", tableDescriptor.getTableName(), tableRegions);
    +            if (tableRegions != null) {
    +              for (HRegionInfo region : tableRegions) {
    +                regions.add(region.getRegionName());
    +              }
    +            }
    +          } else {
    +            LOG.debug("{} is not a transactional table", tableDescriptor.getTableName());
    +          }
    +        }
    +      }
    +    }
    +    return regions;
    +  }
    +
    +  private long computePruneUpperBound(TimeRegions timeRegions) throws IOException {
    --- End diff --
    
    It would really help to explain this method in comments. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

Posted by anew <gi...@git.apache.org>.
Github user anew commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/20#discussion_r90759085
  
    --- Diff: tephra-core/src/main/java/org/apache/tephra/janitor/TransactionPruningPlugin.java ---
    @@ -0,0 +1,90 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.tephra.janitor;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +import java.io.IOException;
    +
    +/**
    + * Data janitor interface to manage the invalid transaction list.
    + *
    + * <p/>
    + * An invalid transaction can only be removed from the invalid list after the data written
    + * by the invalid transactions has been removed from all the data stores.
    + * The term data store is used here to represent a set of tables in a database that have
    + * the same data clean up policy, like all Apache Phoenix tables in an HBase instance.
    + *
    + * <p/>
    + * Typically every data store will have a background job which cleans up the data written by invalid transactions.
    + * Prune upper bound for a data store is defined as the largest invalid transaction whose data has been
    + * cleaned up from that data store.
    + * <pre>
    + * prune-upper-bound = min(max(invalid list), min(in-progress list) - 1)
    + * </pre>
    + * where invalid list and in-progress list are from the transaction snapshot used to clean up the invalid data in the
    + * data store.
    + *
    + * <p/>
    + * There will be one such plugin per data store. The plugins will be executed as part of the Transaction Service.
    + * Each plugin will be invoked periodically to fetch the prune upper bound for its data store.
    + * Invalid transaction list can pruned up to the minimum of prune upper bounds returned by all the plugins.
    + */
    +public interface TransactionPruningPlugin {
    +  /**
    +   * Called once when the Transaction Service starts up.
    +   *
    +   * @param conf configuration for the plugin
    +   */
    +  void initialize(Configuration conf) throws IOException;
    +
    +  /**
    +   * Called periodically to fetch prune upper bound for a data store. The plugin examines the state of data cleanup
    +   * in the data store and determines the smallest invalid transaction whose writes no longer exist in the data
    --- End diff --
    
    or a greatest lower bound for transaction ids that may not be pruned?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

Posted by anew <gi...@git.apache.org>.
Github user anew commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/20#discussion_r90766319
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/HBaseTransactionPruningPlugin.java ---
    @@ -179,15 +180,17 @@ public void pruneComplete(long time, long maxPrunedInvalid) throws IOException {
           return;
         }
     
    -    // Get regions for given time, so as to not delete them
    +    // Get regions for the given time, so as to not delete them. The prune upper bounds for regions are recorded
    +    // by TransactionProcessor and the deletion is done by this class. To avoid update/delete race condition,
    +    // we only delete stale regions.
    --- End diff --
    
    OK, the comment makes this clear that it is because of race condition. Not sure what kind of race conditions? Also, I believe my comment still holds that this means, as long as a region exists, we will never delete its entries here. Do we really have to keep a record of a region's existence for every single pruneInterval since the beginning of time? That can be a lot of records over time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

Posted by anew <gi...@git.apache.org>.
Github user anew commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/20#discussion_r89938666
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java ---
    @@ -19,20 +19,46 @@
     
     package org.apache.tephra.hbase.coprocessor.janitor;
     
    +import com.google.common.collect.Maps;
    +import com.google.common.primitives.Longs;
    +import org.apache.hadoop.hbase.client.Delete;
     import org.apache.hadoop.hbase.client.Get;
     import org.apache.hadoop.hbase.client.Put;
    +import org.apache.hadoop.hbase.client.Result;
    +import org.apache.hadoop.hbase.client.ResultScanner;
    +import org.apache.hadoop.hbase.client.Scan;
     import org.apache.hadoop.hbase.client.Table;
     import org.apache.hadoop.hbase.util.Bytes;
     
     import java.io.IOException;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.SortedSet;
    +import java.util.TreeMap;
    +import java.util.TreeSet;
    +import javax.annotation.Nullable;
     
     /**
      * Persist data janitor state into an HBase table.
    --- End diff --
    
    Is it true that this class is shared between the DataJanitor coprocessor (for saving prune state) and the DataJanitorPlugin in the tx manager (for reading the state)? If so, then this deserves mentioning in the javadocs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

Posted by anew <gi...@git.apache.org>.
Github user anew commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/20#discussion_r90759660
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/HBaseTransactionPruningPlugin.java ---
    @@ -0,0 +1,289 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.tephra.hbase.coprocessor.janitor;
    +
    +import com.google.common.base.Function;
    +import com.google.common.collect.Iterables;
    +import com.google.common.collect.Maps;
    +import com.google.common.collect.Sets;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hbase.HRegionInfo;
    +import org.apache.hadoop.hbase.HTableDescriptor;
    +import org.apache.hadoop.hbase.TableName;
    +import org.apache.hadoop.hbase.client.Admin;
    +import org.apache.hadoop.hbase.client.Connection;
    +import org.apache.hadoop.hbase.client.ConnectionFactory;
    +import org.apache.hadoop.hbase.client.Table;
    +import org.apache.hadoop.hbase.util.Bytes;
    +import org.apache.tephra.TxConstants;
    +import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
    +import org.apache.tephra.janitor.TransactionPruningPlugin;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.SortedSet;
    +import java.util.TreeSet;
    +
    +/**
    + * Default implementation of the {@link TransactionPruningPlugin} for HBase.
    + *
    + * This plugin determines the prune upper bound for transactional HBase tables that use
    + * coprocessor {@link TransactionProcessor}.
    + *
    + * <h3>State storage:</h3>
    + *
    + * This plugin expects the TransactionProcessor to save the prune upper bound for invalid transactions
    + * after every major compaction of a region. Let's call this <i>(region, prune upper bound)</i>.
    + * In addition, the plugin also persists the following information on a run at time <i>t</i>
    + * <ul>
    + *   <li>
    + *     <i>(t, set of regions)</i>: Set of transactional regions at time <i>t</i>.
    + *     Transactional regions are regions of the tables that have the coprocessor TransactionProcessor
    + *     attached to them.
    + *   </li>
    + *   <li>
    + *     <i>(t, prune upper bound)</i>: This is the smallest not in-progress transaction that
    + *     will not have writes in any HBase regions that are created after time <i>t</i>.
    + *     This value is determined by the Transaction Service based on the transaction state at time <i>t</i>
    + *     and passed on to the plugin.
    + *   </li>
    + * </ul>
    + *
    + * <h3>Computing prune upper bound:</h3>
    + *
    + * In a typical HBase instance, there can be a constant change in the number of regions due to region creations,
    + * splits and merges. At any given time there can always be a region on which a major compaction has not been run.
    + * Since the prune upper bound will get recorded for a region only after a major compaction,
    + * using only the latest set of regions we may not be able to find the
    + * prune upper bounds for all the current regions. Hence we persist the set of regions that exist at that time
    + * of each run of the plugin, and use historical region set for time <i>t</i>, <i>t - 1</i>, etc.
    + * to determine the prune upper bound.
    + *
    + * From the regions saved at time <i>t</i>, <i>t - 1</i>, etc.,
    + * the plugin tries to find the latest <i>(t, set of regions)</i> where all regions have been major compacted,
    + * i.e, all regions have prune upper bound recorded in <i>(region, prune upper bound)</i>.
    + * <br/>
    + * If such a set is found for time <i>t1</i>, the prune upper bound returned by the plugin is the minimum of
    + * <ul>
    + *   <li>Prune upper bounds of regions in set <i>(t1, set of regions)</i></li>
    + *   <li>Prune upper bound from <i>(t1, prune upper bound)</i></li>
    + * </ul>
    + *
    + * <p/>
    + * Above, when we find <i>(t1, set of regions)</i>, there may a region that was created after time <i>t1</i>,
    + * but has a data write from an invalid transaction that is smaller than the prune upper bounds of all
    + * regions in <i>(t1, set of regions)</i>. This is possible because <i>(region, prune upper bound)</i> persisted by
    + * TransactionProcessor is always the latest prune upper bound for a region.
    + * <br/>
    + * However a region created after time <i>t1</i> cannot have writes from an invalid transaction that is smaller than
    + * <i>min(max(invalid list), min(in-progress list) - 1)</i> at the time the region was created.
    + * Since we limit the plugin prune upper bound using <i>(t1, prune upper bound)</i>, there should be no invalid
    --- End diff --
    
    Also, I am not sure whether min(max(invalid list), min(in-progress list) - 1) is correct. A transaction may still generate writes after it became invalid. For example, if the transaction timeout is 30 seconds, the client may perform a write after 60 seconds, and then crash and never commit or rollback. If we prune this transaction between the 30 and 60 seconds, then we will have an invalid write in the store. 
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

Posted by anew <gi...@git.apache.org>.
Github user anew commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/20#discussion_r90766397
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/HBaseTransactionPruningPlugin.java ---
    @@ -259,6 +262,8 @@ private long computePruneUpperBound(TimeRegions timeRegions) throws IOException
             if (pruneUpperBoundForTime != -1) {
               Long minPruneUpperBoundRegions = Collections.min(pruneUpperBoundRegions.values());
               return Math.min(pruneUpperBoundForTime, minPruneUpperBoundRegions);
    +        } else {
    +          LOG.debug("Ignoring invalid prune upper bound -1 for time {}", time);
             }
           } else {
             if (LOG.isDebugEnabled()) {
    --- End diff --
    
    Maybe the message here should also be rephrased: "Ignoring regions for time {} because not all of the regions recorded a pruneUpperBound."
    
    This may seem picky, but remember that these messages will be seen by devops who does not understand the internals. If you rephrase it this way, they will understand better what is gong on. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

Posted by anew <gi...@git.apache.org>.
Github user anew commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/20#discussion_r90760015
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/HBaseTransactionPruningPlugin.java ---
    @@ -0,0 +1,289 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.tephra.hbase.coprocessor.janitor;
    +
    +import com.google.common.base.Function;
    +import com.google.common.collect.Iterables;
    +import com.google.common.collect.Maps;
    +import com.google.common.collect.Sets;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hbase.HRegionInfo;
    +import org.apache.hadoop.hbase.HTableDescriptor;
    +import org.apache.hadoop.hbase.TableName;
    +import org.apache.hadoop.hbase.client.Admin;
    +import org.apache.hadoop.hbase.client.Connection;
    +import org.apache.hadoop.hbase.client.ConnectionFactory;
    +import org.apache.hadoop.hbase.client.Table;
    +import org.apache.hadoop.hbase.util.Bytes;
    +import org.apache.tephra.TxConstants;
    +import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
    +import org.apache.tephra.janitor.TransactionPruningPlugin;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.SortedSet;
    +import java.util.TreeSet;
    +
    +/**
    + * Default implementation of the {@link TransactionPruningPlugin} for HBase.
    + *
    + * This plugin determines the prune upper bound for transactional HBase tables that use
    + * coprocessor {@link TransactionProcessor}.
    + *
    + * <h3>State storage:</h3>
    + *
    + * This plugin expects the TransactionProcessor to save the prune upper bound for invalid transactions
    + * after every major compaction of a region. Let's call this <i>(region, prune upper bound)</i>.
    + * In addition, the plugin also persists the following information on a run at time <i>t</i>
    + * <ul>
    + *   <li>
    + *     <i>(t, set of regions)</i>: Set of transactional regions at time <i>t</i>.
    + *     Transactional regions are regions of the tables that have the coprocessor TransactionProcessor
    + *     attached to them.
    + *   </li>
    + *   <li>
    + *     <i>(t, prune upper bound)</i>: This is the smallest not in-progress transaction that
    + *     will not have writes in any HBase regions that are created after time <i>t</i>.
    + *     This value is determined by the Transaction Service based on the transaction state at time <i>t</i>
    + *     and passed on to the plugin.
    + *   </li>
    + * </ul>
    + *
    + * <h3>Computing prune upper bound:</h3>
    + *
    + * In a typical HBase instance, there can be a constant change in the number of regions due to region creations,
    + * splits and merges. At any given time there can always be a region on which a major compaction has not been run.
    + * Since the prune upper bound will get recorded for a region only after a major compaction,
    + * using only the latest set of regions we may not be able to find the
    + * prune upper bounds for all the current regions. Hence we persist the set of regions that exist at that time
    + * of each run of the plugin, and use historical region set for time <i>t</i>, <i>t - 1</i>, etc.
    + * to determine the prune upper bound.
    + *
    + * From the regions saved at time <i>t</i>, <i>t - 1</i>, etc.,
    + * the plugin tries to find the latest <i>(t, set of regions)</i> where all regions have been major compacted,
    + * i.e, all regions have prune upper bound recorded in <i>(region, prune upper bound)</i>.
    + * <br/>
    + * If such a set is found for time <i>t1</i>, the prune upper bound returned by the plugin is the minimum of
    + * <ul>
    + *   <li>Prune upper bounds of regions in set <i>(t1, set of regions)</i></li>
    + *   <li>Prune upper bound from <i>(t1, prune upper bound)</i></li>
    + * </ul>
    + *
    + * <p/>
    + * Above, when we find <i>(t1, set of regions)</i>, there may a region that was created after time <i>t1</i>,
    + * but has a data write from an invalid transaction that is smaller than the prune upper bounds of all
    + * regions in <i>(t1, set of regions)</i>. This is possible because <i>(region, prune upper bound)</i> persisted by
    + * TransactionProcessor is always the latest prune upper bound for a region.
    + * <br/>
    + * However a region created after time <i>t1</i> cannot have writes from an invalid transaction that is smaller than
    + * <i>min(max(invalid list), min(in-progress list) - 1)</i> at the time the region was created.
    + * Since we limit the plugin prune upper bound using <i>(t1, prune upper bound)</i>, there should be no invalid
    + * transactions smaller than the plugin prune upper bound with writes in any transactional region of
    + * this HBase instance.
    + *
    + * <p/>
    + * Note: If your tables uses a transactional coprocessor other than TransactionProcessor,
    + * then you may need to write a new plugin to compute prune upper bound for those tables.
    + */
    +@SuppressWarnings("WeakerAccess")
    +public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
    +  public static final Logger LOG = LoggerFactory.getLogger(HBaseTransactionPruningPlugin.class);
    +
    +  protected Configuration conf;
    +  protected Connection connection;
    +  protected DataJanitorState dataJanitorState;
    +
    +  @Override
    +  public void initialize(Configuration conf) throws IOException {
    +    this.conf = conf;
    +    this.connection = ConnectionFactory.createConnection(conf);
    +
    +    final TableName stateTable = TableName.valueOf(conf.get(TxConstants.DataJanitor.PRUNE_STATE_TABLE,
    +                                                            TxConstants.DataJanitor.DEFAULT_PRUNE_STATE_TABLE));
    +    LOG.info("Initializing plugin with state table {}", stateTable.getNameWithNamespaceInclAsString());
    +    this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
    +      @Override
    +      public Table get() throws IOException {
    +        return connection.getTable(stateTable);
    +      }
    +    });
    +  }
    +
    +  /**
    +   * Determines prune upper bound for the data store as mentioned above.
    +   */
    +  @Override
    +  public long fetchPruneUpperBound(long time, long pruneUpperBoundForTime) throws IOException {
    +    LOG.debug("Fetching prune upper bound for time {} and max prune upper bound {}", time, pruneUpperBoundForTime);
    +    if (time < 0 || pruneUpperBoundForTime < 0) {
    +      return -1;
    +    }
    +
    +    // Get all the current transactional regions
    +    SortedSet<byte[]> transactionalRegions = getTransactionalRegions();
    +    if (!transactionalRegions.isEmpty()) {
    +      LOG.debug("Saving {} transactional regions for time {}", transactionalRegions.size(), time);
    +      dataJanitorState.saveRegionsForTime(time, transactionalRegions);
    +      // Save prune upper bound for time as the final step.
    +      // We can then use its existence to make sure that the data for a given time is complete or not
    +      LOG.debug("Saving max prune upper bound {} for time {}", pruneUpperBoundForTime, time);
    +      dataJanitorState.savePruneUpperBoundForTime(time, pruneUpperBoundForTime);
    +    }
    +
    +    return computePruneUpperBound(new TimeRegions(time, transactionalRegions));
    +  }
    +
    +  /**
    +   * After invalid list has been pruned, this cleans up state information that is no longer required.
    +   * This includes -
    +   * <ul>
    +   *   <li>
    +   *     <i>(region, prune upper bound)</i> - prune upper bound for regions that are older
    +   *     than maxPrunedInvalid
    +   *   </li>
    +   *   <li>
    +   *     <i>(t, set of regions) - Regions set that were recorded on or before the start time
    +   *     of maxPrunedInvalid
    +   *   </li>
    +   *   <li>
    +   *     (t, prune upper bound) - Smallest not in-progress transaction without any writes in new regions
    +   *     information recorded on or before the start time of maxPrunedInvalid
    +   *   </li>
    +   * </ul>
    +   */
    +  @Override
    +  public void pruneComplete(long time, long maxPrunedInvalid) throws IOException {
    +    LOG.debug("Prune complete for time {} and prune upper bound {}", time, maxPrunedInvalid);
    +    if (time < 0 || maxPrunedInvalid < 0) {
    +      return;
    +    }
    +
    +    // Get regions for given time, so as to not delete them
    +    TimeRegions regionsToExclude = dataJanitorState.getRegionsOnOrBeforeTime(time);
    +    if (regionsToExclude != null) {
    +      LOG.debug("Deleting stale region - prune upper bound record before {}", maxPrunedInvalid);
    +      dataJanitorState.deleteRegionsWithPruneUpperBoundBefore(maxPrunedInvalid, regionsToExclude.getRegions());
    +    } else {
    +      LOG.warn("Cannot find saved regions on or before time {}", time);
    +    }
    +    long pruneTime = maxPrunedInvalid / TxConstants.MAX_TX_PER_MS;
    --- End diff --
    
    It seems that there should be a helper method to extract the time stamp from a transaction id?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

Posted by anew <gi...@git.apache.org>.
Github user anew commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/20#discussion_r89207961
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java ---
    @@ -58,10 +105,237 @@ public long getPruneUpperBound(byte[] regionId) throws IOException {
         }
       }
     
    +  /**
    +   * Get latest prune upper bounds for given regions
    +   *
    +   * @param regions a set of regions
    +   * @return a map containing region id and its latest prune upper bound value
    +   * @throws IOException when not able to read the data from HBase
    +   */
    +  public Map<byte[], Long> getPruneUpperBoundForRegions(SortedSet<byte[]> regions) throws IOException {
    +    Map<byte[], Long> resultMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
    +      Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
    +      scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
    +
    +      try (ResultScanner scanner = stateTable.getScanner(scan)) {
    +        Result next;
    +        while ((next = scanner.next()) != null) {
    +          byte[] region = getRegionFromKey(next.getRow());
    +          if (regions.contains(region)) {
    +            byte[] timeBytes = next.getValue(FAMILY, PRUNE_UPPER_BOUND_COL);
    +            if (timeBytes != null) {
    +              long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
    +              resultMap.put(region, pruneUpperBoundRegion);
    +            }
    +          }
    +        }
    +      }
    +      return resultMap;
    +    }
    +  }
    +
    +  /**
    +   * Delete all regions that are not in the given exclude set and whose prune upper bound is less than a given value
    --- End diff --
    
    what is this used for? Might be useful to say that in the javadoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

Posted by poornachandra <gi...@git.apache.org>.
Github user poornachandra commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/20#discussion_r90766057
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/HBaseTransactionPruningPlugin.java ---
    @@ -0,0 +1,289 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.tephra.hbase.coprocessor.janitor;
    +
    +import com.google.common.base.Function;
    +import com.google.common.collect.Iterables;
    +import com.google.common.collect.Maps;
    +import com.google.common.collect.Sets;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hbase.HRegionInfo;
    +import org.apache.hadoop.hbase.HTableDescriptor;
    +import org.apache.hadoop.hbase.TableName;
    +import org.apache.hadoop.hbase.client.Admin;
    +import org.apache.hadoop.hbase.client.Connection;
    +import org.apache.hadoop.hbase.client.ConnectionFactory;
    +import org.apache.hadoop.hbase.client.Table;
    +import org.apache.hadoop.hbase.util.Bytes;
    +import org.apache.tephra.TxConstants;
    +import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
    +import org.apache.tephra.janitor.TransactionPruningPlugin;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.SortedSet;
    +import java.util.TreeSet;
    +
    +/**
    + * Default implementation of the {@link TransactionPruningPlugin} for HBase.
    + *
    + * This plugin determines the prune upper bound for transactional HBase tables that use
    + * coprocessor {@link TransactionProcessor}.
    + *
    + * <h3>State storage:</h3>
    + *
    + * This plugin expects the TransactionProcessor to save the prune upper bound for invalid transactions
    + * after every major compaction of a region. Let's call this <i>(region, prune upper bound)</i>.
    + * In addition, the plugin also persists the following information on a run at time <i>t</i>
    + * <ul>
    + *   <li>
    + *     <i>(t, set of regions)</i>: Set of transactional regions at time <i>t</i>.
    + *     Transactional regions are regions of the tables that have the coprocessor TransactionProcessor
    + *     attached to them.
    + *   </li>
    + *   <li>
    + *     <i>(t, prune upper bound)</i>: This is the smallest not in-progress transaction that
    + *     will not have writes in any HBase regions that are created after time <i>t</i>.
    + *     This value is determined by the Transaction Service based on the transaction state at time <i>t</i>
    + *     and passed on to the plugin.
    + *   </li>
    + * </ul>
    + *
    + * <h3>Computing prune upper bound:</h3>
    + *
    + * In a typical HBase instance, there can be a constant change in the number of regions due to region creations,
    + * splits and merges. At any given time there can always be a region on which a major compaction has not been run.
    + * Since the prune upper bound will get recorded for a region only after a major compaction,
    + * using only the latest set of regions we may not be able to find the
    + * prune upper bounds for all the current regions. Hence we persist the set of regions that exist at that time
    + * of each run of the plugin, and use historical region set for time <i>t</i>, <i>t - 1</i>, etc.
    + * to determine the prune upper bound.
    + *
    + * From the regions saved at time <i>t</i>, <i>t - 1</i>, etc.,
    + * the plugin tries to find the latest <i>(t, set of regions)</i> where all regions have been major compacted,
    + * i.e, all regions have prune upper bound recorded in <i>(region, prune upper bound)</i>.
    + * <br/>
    + * If such a set is found for time <i>t1</i>, the prune upper bound returned by the plugin is the minimum of
    + * <ul>
    + *   <li>Prune upper bounds of regions in set <i>(t1, set of regions)</i></li>
    + *   <li>Prune upper bound from <i>(t1, prune upper bound)</i></li>
    + * </ul>
    + *
    + * <p/>
    + * Above, when we find <i>(t1, set of regions)</i>, there may a region that was created after time <i>t1</i>,
    + * but has a data write from an invalid transaction that is smaller than the prune upper bounds of all
    + * regions in <i>(t1, set of regions)</i>. This is possible because <i>(region, prune upper bound)</i> persisted by
    + * TransactionProcessor is always the latest prune upper bound for a region.
    + * <br/>
    + * However a region created after time <i>t1</i> cannot have writes from an invalid transaction that is smaller than
    + * <i>min(max(invalid list), min(in-progress list) - 1)</i> at the time the region was created.
    + * Since we limit the plugin prune upper bound using <i>(t1, prune upper bound)</i>, there should be no invalid
    + * transactions smaller than the plugin prune upper bound with writes in any transactional region of
    + * this HBase instance.
    + *
    + * <p/>
    + * Note: If your tables uses a transactional coprocessor other than TransactionProcessor,
    + * then you may need to write a new plugin to compute prune upper bound for those tables.
    + */
    +@SuppressWarnings("WeakerAccess")
    +public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
    +  public static final Logger LOG = LoggerFactory.getLogger(HBaseTransactionPruningPlugin.class);
    +
    +  protected Configuration conf;
    +  protected Connection connection;
    +  protected DataJanitorState dataJanitorState;
    +
    +  @Override
    +  public void initialize(Configuration conf) throws IOException {
    +    this.conf = conf;
    +    this.connection = ConnectionFactory.createConnection(conf);
    +
    +    final TableName stateTable = TableName.valueOf(conf.get(TxConstants.DataJanitor.PRUNE_STATE_TABLE,
    +                                                            TxConstants.DataJanitor.DEFAULT_PRUNE_STATE_TABLE));
    +    LOG.info("Initializing plugin with state table {}", stateTable.getNameWithNamespaceInclAsString());
    +    this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
    +      @Override
    +      public Table get() throws IOException {
    +        return connection.getTable(stateTable);
    +      }
    +    });
    +  }
    +
    +  /**
    +   * Determines prune upper bound for the data store as mentioned above.
    +   */
    +  @Override
    +  public long fetchPruneUpperBound(long time, long pruneUpperBoundForTime) throws IOException {
    +    LOG.debug("Fetching prune upper bound for time {} and max prune upper bound {}", time, pruneUpperBoundForTime);
    +    if (time < 0 || pruneUpperBoundForTime < 0) {
    +      return -1;
    +    }
    +
    +    // Get all the current transactional regions
    +    SortedSet<byte[]> transactionalRegions = getTransactionalRegions();
    +    if (!transactionalRegions.isEmpty()) {
    +      LOG.debug("Saving {} transactional regions for time {}", transactionalRegions.size(), time);
    +      dataJanitorState.saveRegionsForTime(time, transactionalRegions);
    +      // Save prune upper bound for time as the final step.
    +      // We can then use its existence to make sure that the data for a given time is complete or not
    +      LOG.debug("Saving max prune upper bound {} for time {}", pruneUpperBoundForTime, time);
    +      dataJanitorState.savePruneUpperBoundForTime(time, pruneUpperBoundForTime);
    +    }
    +
    +    return computePruneUpperBound(new TimeRegions(time, transactionalRegions));
    +  }
    +
    +  /**
    +   * After invalid list has been pruned, this cleans up state information that is no longer required.
    +   * This includes -
    +   * <ul>
    +   *   <li>
    +   *     <i>(region, prune upper bound)</i> - prune upper bound for regions that are older
    +   *     than maxPrunedInvalid
    +   *   </li>
    +   *   <li>
    +   *     <i>(t, set of regions) - Regions set that were recorded on or before the start time
    +   *     of maxPrunedInvalid
    +   *   </li>
    +   *   <li>
    +   *     (t, prune upper bound) - Smallest not in-progress transaction without any writes in new regions
    +   *     information recorded on or before the start time of maxPrunedInvalid
    +   *   </li>
    +   * </ul>
    +   */
    +  @Override
    +  public void pruneComplete(long time, long maxPrunedInvalid) throws IOException {
    +    LOG.debug("Prune complete for time {} and prune upper bound {}", time, maxPrunedInvalid);
    +    if (time < 0 || maxPrunedInvalid < 0) {
    +      return;
    +    }
    +
    +    // Get regions for given time, so as to not delete them
    +    TimeRegions regionsToExclude = dataJanitorState.getRegionsOnOrBeforeTime(time);
    +    if (regionsToExclude != null) {
    +      LOG.debug("Deleting stale region - prune upper bound record before {}", maxPrunedInvalid);
    +      dataJanitorState.deleteRegionsWithPruneUpperBoundBefore(maxPrunedInvalid, regionsToExclude.getRegions());
    +    } else {
    +      LOG.warn("Cannot find saved regions on or before time {}", time);
    +    }
    +    long pruneTime = maxPrunedInvalid / TxConstants.MAX_TX_PER_MS;
    +    LOG.debug("Deleting regions recorded before time {}", pruneTime);
    +    dataJanitorState.deleteAllRegionsOnOrBeforeTime(pruneTime);
    +    LOG.debug("Deleting prune upper bounds recorded on or before time {}", pruneTime);
    +    dataJanitorState.deletePruneUpperBoundsOnOrBeforeTime(pruneTime);
    +  }
    +
    +  @Override
    +  public void destroy() {
    +    LOG.info("Stopping plugin...");
    +    try {
    +      connection.close();
    +    } catch (IOException e) {
    +      LOG.error("Got exception while closing HBase connection", e);
    +    }
    +  }
    +
    +  protected boolean isTransactionalTable(HTableDescriptor tableDescriptor) {
    +    return tableDescriptor.hasCoprocessor(TransactionProcessor.class.getName());
    +  }
    +
    +  protected SortedSet<byte[]> getTransactionalRegions() throws IOException {
    +    SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
    +    try (Admin admin = connection.getAdmin()) {
    +      HTableDescriptor[] tableDescriptors = admin.listTables();
    +      LOG.debug("Got {} tables to process", tableDescriptors == null ? 0 : tableDescriptors.length);
    +      if (tableDescriptors != null) {
    +        for (HTableDescriptor tableDescriptor : tableDescriptors) {
    +          if (isTransactionalTable(tableDescriptor)) {
    +            List<HRegionInfo> tableRegions = admin.getTableRegions(tableDescriptor.getTableName());
    +            LOG.debug("Regions for table {}: {}", tableDescriptor.getTableName(), tableRegions);
    +            if (tableRegions != null) {
    +              for (HRegionInfo region : tableRegions) {
    +                regions.add(region.getRegionName());
    +              }
    +            }
    +          } else {
    +            LOG.debug("{} is not a transactional table", tableDescriptor.getTableName());
    +          }
    +        }
    +      }
    +    }
    +    return regions;
    +  }
    +
    +  /**
    +   * Try to find the latest set of regions in which all regions have been major compacted, and
    +   * compute prune upper bound from them. Starting from newest to oldest, this looks into the
    +   * region set that has been saved periodically, and joins it with the prune upper bound data
    +   * for a region recorded after a major compaction.
    +   *
    +   * @param timeRegions the latest set of regions
    +   * @return prune upper bound
    +   * @throws IOException when not able to talk to HBase
    +   */
    +  private long computePruneUpperBound(TimeRegions timeRegions) throws IOException {
    +    do {
    +      LOG.debug("Computing prune upper bound for {}", timeRegions);
    +      SortedSet<byte[]> transactionalRegions = timeRegions.getRegions();
    +      long time = timeRegions.getTime();
    +
    +      Map<byte[], Long> pruneUpperBoundRegions = dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions);
    +      logPruneUpperBoundRegions(pruneUpperBoundRegions);
    +      // If prune upper bounds are found for all the transactional regions, then compute the prune upper bound
    +      // across all regions
    +      if (!transactionalRegions.isEmpty() && pruneUpperBoundRegions.size() == transactionalRegions.size()) {
    +        long pruneUpperBoundForTime = dataJanitorState.getPruneUpperBoundForTime(time);
    +        LOG.debug("Found max prune upper bound {} for time {}", pruneUpperBoundForTime, time);
    +        // If pruneUpperBoundForTime is not recorded then that means the data is not complete for these regions
    +        if (pruneUpperBoundForTime != -1) {
    --- End diff --
    
    Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

Posted by anew <gi...@git.apache.org>.
Github user anew commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/20#discussion_r90759960
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/HBaseTransactionPruningPlugin.java ---
    @@ -0,0 +1,289 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.tephra.hbase.coprocessor.janitor;
    +
    +import com.google.common.base.Function;
    +import com.google.common.collect.Iterables;
    +import com.google.common.collect.Maps;
    +import com.google.common.collect.Sets;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hbase.HRegionInfo;
    +import org.apache.hadoop.hbase.HTableDescriptor;
    +import org.apache.hadoop.hbase.TableName;
    +import org.apache.hadoop.hbase.client.Admin;
    +import org.apache.hadoop.hbase.client.Connection;
    +import org.apache.hadoop.hbase.client.ConnectionFactory;
    +import org.apache.hadoop.hbase.client.Table;
    +import org.apache.hadoop.hbase.util.Bytes;
    +import org.apache.tephra.TxConstants;
    +import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
    +import org.apache.tephra.janitor.TransactionPruningPlugin;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.SortedSet;
    +import java.util.TreeSet;
    +
    +/**
    + * Default implementation of the {@link TransactionPruningPlugin} for HBase.
    + *
    + * This plugin determines the prune upper bound for transactional HBase tables that use
    + * coprocessor {@link TransactionProcessor}.
    + *
    + * <h3>State storage:</h3>
    + *
    + * This plugin expects the TransactionProcessor to save the prune upper bound for invalid transactions
    + * after every major compaction of a region. Let's call this <i>(region, prune upper bound)</i>.
    + * In addition, the plugin also persists the following information on a run at time <i>t</i>
    + * <ul>
    + *   <li>
    + *     <i>(t, set of regions)</i>: Set of transactional regions at time <i>t</i>.
    + *     Transactional regions are regions of the tables that have the coprocessor TransactionProcessor
    + *     attached to them.
    + *   </li>
    + *   <li>
    + *     <i>(t, prune upper bound)</i>: This is the smallest not in-progress transaction that
    + *     will not have writes in any HBase regions that are created after time <i>t</i>.
    + *     This value is determined by the Transaction Service based on the transaction state at time <i>t</i>
    + *     and passed on to the plugin.
    + *   </li>
    + * </ul>
    + *
    + * <h3>Computing prune upper bound:</h3>
    + *
    + * In a typical HBase instance, there can be a constant change in the number of regions due to region creations,
    + * splits and merges. At any given time there can always be a region on which a major compaction has not been run.
    + * Since the prune upper bound will get recorded for a region only after a major compaction,
    + * using only the latest set of regions we may not be able to find the
    + * prune upper bounds for all the current regions. Hence we persist the set of regions that exist at that time
    + * of each run of the plugin, and use historical region set for time <i>t</i>, <i>t - 1</i>, etc.
    + * to determine the prune upper bound.
    + *
    + * From the regions saved at time <i>t</i>, <i>t - 1</i>, etc.,
    + * the plugin tries to find the latest <i>(t, set of regions)</i> where all regions have been major compacted,
    + * i.e, all regions have prune upper bound recorded in <i>(region, prune upper bound)</i>.
    + * <br/>
    + * If such a set is found for time <i>t1</i>, the prune upper bound returned by the plugin is the minimum of
    + * <ul>
    + *   <li>Prune upper bounds of regions in set <i>(t1, set of regions)</i></li>
    + *   <li>Prune upper bound from <i>(t1, prune upper bound)</i></li>
    + * </ul>
    + *
    + * <p/>
    + * Above, when we find <i>(t1, set of regions)</i>, there may a region that was created after time <i>t1</i>,
    + * but has a data write from an invalid transaction that is smaller than the prune upper bounds of all
    + * regions in <i>(t1, set of regions)</i>. This is possible because <i>(region, prune upper bound)</i> persisted by
    + * TransactionProcessor is always the latest prune upper bound for a region.
    + * <br/>
    + * However a region created after time <i>t1</i> cannot have writes from an invalid transaction that is smaller than
    + * <i>min(max(invalid list), min(in-progress list) - 1)</i> at the time the region was created.
    + * Since we limit the plugin prune upper bound using <i>(t1, prune upper bound)</i>, there should be no invalid
    + * transactions smaller than the plugin prune upper bound with writes in any transactional region of
    + * this HBase instance.
    + *
    + * <p/>
    + * Note: If your tables uses a transactional coprocessor other than TransactionProcessor,
    + * then you may need to write a new plugin to compute prune upper bound for those tables.
    + */
    +@SuppressWarnings("WeakerAccess")
    +public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
    +  public static final Logger LOG = LoggerFactory.getLogger(HBaseTransactionPruningPlugin.class);
    +
    +  protected Configuration conf;
    +  protected Connection connection;
    +  protected DataJanitorState dataJanitorState;
    +
    +  @Override
    +  public void initialize(Configuration conf) throws IOException {
    +    this.conf = conf;
    +    this.connection = ConnectionFactory.createConnection(conf);
    +
    +    final TableName stateTable = TableName.valueOf(conf.get(TxConstants.DataJanitor.PRUNE_STATE_TABLE,
    +                                                            TxConstants.DataJanitor.DEFAULT_PRUNE_STATE_TABLE));
    +    LOG.info("Initializing plugin with state table {}", stateTable.getNameWithNamespaceInclAsString());
    +    this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
    +      @Override
    +      public Table get() throws IOException {
    +        return connection.getTable(stateTable);
    +      }
    +    });
    +  }
    +
    +  /**
    +   * Determines prune upper bound for the data store as mentioned above.
    +   */
    +  @Override
    +  public long fetchPruneUpperBound(long time, long pruneUpperBoundForTime) throws IOException {
    +    LOG.debug("Fetching prune upper bound for time {} and max prune upper bound {}", time, pruneUpperBoundForTime);
    +    if (time < 0 || pruneUpperBoundForTime < 0) {
    +      return -1;
    +    }
    +
    +    // Get all the current transactional regions
    +    SortedSet<byte[]> transactionalRegions = getTransactionalRegions();
    +    if (!transactionalRegions.isEmpty()) {
    +      LOG.debug("Saving {} transactional regions for time {}", transactionalRegions.size(), time);
    +      dataJanitorState.saveRegionsForTime(time, transactionalRegions);
    +      // Save prune upper bound for time as the final step.
    +      // We can then use its existence to make sure that the data for a given time is complete or not
    +      LOG.debug("Saving max prune upper bound {} for time {}", pruneUpperBoundForTime, time);
    +      dataJanitorState.savePruneUpperBoundForTime(time, pruneUpperBoundForTime);
    +    }
    +
    +    return computePruneUpperBound(new TimeRegions(time, transactionalRegions));
    +  }
    +
    +  /**
    +   * After invalid list has been pruned, this cleans up state information that is no longer required.
    +   * This includes -
    +   * <ul>
    +   *   <li>
    +   *     <i>(region, prune upper bound)</i> - prune upper bound for regions that are older
    +   *     than maxPrunedInvalid
    +   *   </li>
    +   *   <li>
    +   *     <i>(t, set of regions) - Regions set that were recorded on or before the start time
    +   *     of maxPrunedInvalid
    +   *   </li>
    +   *   <li>
    +   *     (t, prune upper bound) - Smallest not in-progress transaction without any writes in new regions
    +   *     information recorded on or before the start time of maxPrunedInvalid
    +   *   </li>
    +   * </ul>
    +   */
    +  @Override
    +  public void pruneComplete(long time, long maxPrunedInvalid) throws IOException {
    +    LOG.debug("Prune complete for time {} and prune upper bound {}", time, maxPrunedInvalid);
    +    if (time < 0 || maxPrunedInvalid < 0) {
    +      return;
    +    }
    +
    +    // Get regions for given time, so as to not delete them
    +    TimeRegions regionsToExclude = dataJanitorState.getRegionsOnOrBeforeTime(time);
    +    if (regionsToExclude != null) {
    +      LOG.debug("Deleting stale region - prune upper bound record before {}", maxPrunedInvalid);
    --- End diff --
    
    regions with p.u.b. recorded before


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

Posted by anew <gi...@git.apache.org>.
Github user anew commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/20#discussion_r90759897
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/HBaseTransactionPruningPlugin.java ---
    @@ -0,0 +1,289 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.tephra.hbase.coprocessor.janitor;
    +
    +import com.google.common.base.Function;
    +import com.google.common.collect.Iterables;
    +import com.google.common.collect.Maps;
    +import com.google.common.collect.Sets;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hbase.HRegionInfo;
    +import org.apache.hadoop.hbase.HTableDescriptor;
    +import org.apache.hadoop.hbase.TableName;
    +import org.apache.hadoop.hbase.client.Admin;
    +import org.apache.hadoop.hbase.client.Connection;
    +import org.apache.hadoop.hbase.client.ConnectionFactory;
    +import org.apache.hadoop.hbase.client.Table;
    +import org.apache.hadoop.hbase.util.Bytes;
    +import org.apache.tephra.TxConstants;
    +import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
    +import org.apache.tephra.janitor.TransactionPruningPlugin;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.SortedSet;
    +import java.util.TreeSet;
    +
    +/**
    + * Default implementation of the {@link TransactionPruningPlugin} for HBase.
    + *
    + * This plugin determines the prune upper bound for transactional HBase tables that use
    + * coprocessor {@link TransactionProcessor}.
    + *
    + * <h3>State storage:</h3>
    + *
    + * This plugin expects the TransactionProcessor to save the prune upper bound for invalid transactions
    + * after every major compaction of a region. Let's call this <i>(region, prune upper bound)</i>.
    + * In addition, the plugin also persists the following information on a run at time <i>t</i>
    + * <ul>
    + *   <li>
    + *     <i>(t, set of regions)</i>: Set of transactional regions at time <i>t</i>.
    + *     Transactional regions are regions of the tables that have the coprocessor TransactionProcessor
    + *     attached to them.
    + *   </li>
    + *   <li>
    + *     <i>(t, prune upper bound)</i>: This is the smallest not in-progress transaction that
    + *     will not have writes in any HBase regions that are created after time <i>t</i>.
    + *     This value is determined by the Transaction Service based on the transaction state at time <i>t</i>
    + *     and passed on to the plugin.
    + *   </li>
    + * </ul>
    + *
    + * <h3>Computing prune upper bound:</h3>
    + *
    + * In a typical HBase instance, there can be a constant change in the number of regions due to region creations,
    + * splits and merges. At any given time there can always be a region on which a major compaction has not been run.
    + * Since the prune upper bound will get recorded for a region only after a major compaction,
    + * using only the latest set of regions we may not be able to find the
    + * prune upper bounds for all the current regions. Hence we persist the set of regions that exist at that time
    + * of each run of the plugin, and use historical region set for time <i>t</i>, <i>t - 1</i>, etc.
    + * to determine the prune upper bound.
    + *
    + * From the regions saved at time <i>t</i>, <i>t - 1</i>, etc.,
    + * the plugin tries to find the latest <i>(t, set of regions)</i> where all regions have been major compacted,
    + * i.e, all regions have prune upper bound recorded in <i>(region, prune upper bound)</i>.
    + * <br/>
    + * If such a set is found for time <i>t1</i>, the prune upper bound returned by the plugin is the minimum of
    + * <ul>
    + *   <li>Prune upper bounds of regions in set <i>(t1, set of regions)</i></li>
    + *   <li>Prune upper bound from <i>(t1, prune upper bound)</i></li>
    + * </ul>
    + *
    + * <p/>
    + * Above, when we find <i>(t1, set of regions)</i>, there may a region that was created after time <i>t1</i>,
    + * but has a data write from an invalid transaction that is smaller than the prune upper bounds of all
    + * regions in <i>(t1, set of regions)</i>. This is possible because <i>(region, prune upper bound)</i> persisted by
    + * TransactionProcessor is always the latest prune upper bound for a region.
    + * <br/>
    + * However a region created after time <i>t1</i> cannot have writes from an invalid transaction that is smaller than
    + * <i>min(max(invalid list), min(in-progress list) - 1)</i> at the time the region was created.
    + * Since we limit the plugin prune upper bound using <i>(t1, prune upper bound)</i>, there should be no invalid
    + * transactions smaller than the plugin prune upper bound with writes in any transactional region of
    + * this HBase instance.
    + *
    + * <p/>
    + * Note: If your tables uses a transactional coprocessor other than TransactionProcessor,
    + * then you may need to write a new plugin to compute prune upper bound for those tables.
    + */
    +@SuppressWarnings("WeakerAccess")
    +public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
    +  public static final Logger LOG = LoggerFactory.getLogger(HBaseTransactionPruningPlugin.class);
    +
    +  protected Configuration conf;
    +  protected Connection connection;
    +  protected DataJanitorState dataJanitorState;
    +
    +  @Override
    +  public void initialize(Configuration conf) throws IOException {
    +    this.conf = conf;
    +    this.connection = ConnectionFactory.createConnection(conf);
    +
    +    final TableName stateTable = TableName.valueOf(conf.get(TxConstants.DataJanitor.PRUNE_STATE_TABLE,
    +                                                            TxConstants.DataJanitor.DEFAULT_PRUNE_STATE_TABLE));
    +    LOG.info("Initializing plugin with state table {}", stateTable.getNameWithNamespaceInclAsString());
    +    this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
    +      @Override
    +      public Table get() throws IOException {
    +        return connection.getTable(stateTable);
    +      }
    +    });
    +  }
    +
    +  /**
    +   * Determines prune upper bound for the data store as mentioned above.
    +   */
    +  @Override
    +  public long fetchPruneUpperBound(long time, long pruneUpperBoundForTime) throws IOException {
    +    LOG.debug("Fetching prune upper bound for time {} and max prune upper bound {}", time, pruneUpperBoundForTime);
    +    if (time < 0 || pruneUpperBoundForTime < 0) {
    +      return -1;
    +    }
    +
    +    // Get all the current transactional regions
    +    SortedSet<byte[]> transactionalRegions = getTransactionalRegions();
    +    if (!transactionalRegions.isEmpty()) {
    +      LOG.debug("Saving {} transactional regions for time {}", transactionalRegions.size(), time);
    +      dataJanitorState.saveRegionsForTime(time, transactionalRegions);
    +      // Save prune upper bound for time as the final step.
    +      // We can then use its existence to make sure that the data for a given time is complete or not
    +      LOG.debug("Saving max prune upper bound {} for time {}", pruneUpperBoundForTime, time);
    +      dataJanitorState.savePruneUpperBoundForTime(time, pruneUpperBoundForTime);
    +    }
    +
    +    return computePruneUpperBound(new TimeRegions(time, transactionalRegions));
    +  }
    +
    +  /**
    +   * After invalid list has been pruned, this cleans up state information that is no longer required.
    +   * This includes -
    +   * <ul>
    +   *   <li>
    +   *     <i>(region, prune upper bound)</i> - prune upper bound for regions that are older
    +   *     than maxPrunedInvalid
    +   *   </li>
    +   *   <li>
    +   *     <i>(t, set of regions) - Regions set that were recorded on or before the start time
    +   *     of maxPrunedInvalid
    +   *   </li>
    +   *   <li>
    +   *     (t, prune upper bound) - Smallest not in-progress transaction without any writes in new regions
    +   *     information recorded on or before the start time of maxPrunedInvalid
    +   *   </li>
    +   * </ul>
    +   */
    +  @Override
    +  public void pruneComplete(long time, long maxPrunedInvalid) throws IOException {
    +    LOG.debug("Prune complete for time {} and prune upper bound {}", time, maxPrunedInvalid);
    +    if (time < 0 || maxPrunedInvalid < 0) {
    +      return;
    +    }
    +
    +    // Get regions for given time, so as to not delete them
    +    TimeRegions regionsToExclude = dataJanitorState.getRegionsOnOrBeforeTime(time);
    +    if (regionsToExclude != null) {
    +      LOG.debug("Deleting stale region - prune upper bound record before {}", maxPrunedInvalid);
    +      dataJanitorState.deleteRegionsWithPruneUpperBoundBefore(maxPrunedInvalid, regionsToExclude.getRegions());
    +    } else {
    +      LOG.warn("Cannot find saved regions on or before time {}", time);
    +    }
    +    long pruneTime = maxPrunedInvalid / TxConstants.MAX_TX_PER_MS;
    +    LOG.debug("Deleting regions recorded before time {}", pruneTime);
    +    dataJanitorState.deleteAllRegionsOnOrBeforeTime(pruneTime);
    +    LOG.debug("Deleting prune upper bounds recorded on or before time {}", pruneTime);
    +    dataJanitorState.deletePruneUpperBoundsOnOrBeforeTime(pruneTime);
    +  }
    +
    +  @Override
    +  public void destroy() {
    +    LOG.info("Stopping plugin...");
    +    try {
    +      connection.close();
    +    } catch (IOException e) {
    +      LOG.error("Got exception while closing HBase connection", e);
    +    }
    +  }
    +
    +  protected boolean isTransactionalTable(HTableDescriptor tableDescriptor) {
    +    return tableDescriptor.hasCoprocessor(TransactionProcessor.class.getName());
    +  }
    +
    +  protected SortedSet<byte[]> getTransactionalRegions() throws IOException {
    +    SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
    +    try (Admin admin = connection.getAdmin()) {
    +      HTableDescriptor[] tableDescriptors = admin.listTables();
    +      LOG.debug("Got {} tables to process", tableDescriptors == null ? 0 : tableDescriptors.length);
    +      if (tableDescriptors != null) {
    +        for (HTableDescriptor tableDescriptor : tableDescriptors) {
    +          if (isTransactionalTable(tableDescriptor)) {
    +            List<HRegionInfo> tableRegions = admin.getTableRegions(tableDescriptor.getTableName());
    +            LOG.debug("Regions for table {}: {}", tableDescriptor.getTableName(), tableRegions);
    +            if (tableRegions != null) {
    +              for (HRegionInfo region : tableRegions) {
    +                regions.add(region.getRegionName());
    +              }
    +            }
    +          } else {
    +            LOG.debug("{} is not a transactional table", tableDescriptor.getTableName());
    +          }
    +        }
    +      }
    +    }
    +    return regions;
    +  }
    +
    +  /**
    +   * Try to find the latest set of regions in which all regions have been major compacted, and
    +   * compute prune upper bound from them. Starting from newest to oldest, this looks into the
    +   * region set that has been saved periodically, and joins it with the prune upper bound data
    +   * for a region recorded after a major compaction.
    +   *
    +   * @param timeRegions the latest set of regions
    +   * @return prune upper bound
    +   * @throws IOException when not able to talk to HBase
    +   */
    +  private long computePruneUpperBound(TimeRegions timeRegions) throws IOException {
    +    do {
    +      LOG.debug("Computing prune upper bound for {}", timeRegions);
    +      SortedSet<byte[]> transactionalRegions = timeRegions.getRegions();
    +      long time = timeRegions.getTime();
    +
    +      Map<byte[], Long> pruneUpperBoundRegions = dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions);
    +      logPruneUpperBoundRegions(pruneUpperBoundRegions);
    +      // If prune upper bounds are found for all the transactional regions, then compute the prune upper bound
    +      // across all regions
    +      if (!transactionalRegions.isEmpty() && pruneUpperBoundRegions.size() == transactionalRegions.size()) {
    +        long pruneUpperBoundForTime = dataJanitorState.getPruneUpperBoundForTime(time);
    +        LOG.debug("Found max prune upper bound {} for time {}", pruneUpperBoundForTime, time);
    +        // If pruneUpperBoundForTime is not recorded then that means the data is not complete for these regions
    +        if (pruneUpperBoundForTime != -1) {
    --- End diff --
    
    It would be good to debug-log if it is -1. That would indicate the reason for the debug message in line 264.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

Posted by anew <gi...@git.apache.org>.
Github user anew commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/20#discussion_r89938969
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DefaultDataJanitorPlugin.java ---
    @@ -0,0 +1,189 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.tephra.hbase.coprocessor.janitor;
    +
    +import com.google.common.base.Function;
    +import com.google.common.collect.Iterables;
    +import com.google.common.collect.Maps;
    +import com.google.common.collect.Sets;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hbase.HRegionInfo;
    +import org.apache.hadoop.hbase.HTableDescriptor;
    +import org.apache.hadoop.hbase.TableName;
    +import org.apache.hadoop.hbase.client.Admin;
    +import org.apache.hadoop.hbase.client.Connection;
    +import org.apache.hadoop.hbase.client.ConnectionFactory;
    +import org.apache.hadoop.hbase.client.Table;
    +import org.apache.hadoop.hbase.util.Bytes;
    +import org.apache.tephra.TxConstants;
    +import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
    +import org.apache.tephra.janitor.DataJanitorPlugin;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.SortedSet;
    +import java.util.TreeSet;
    +
    +/**
    + * Default implementation of the {@link DataJanitorPlugin} for HBase
    + */
    +@SuppressWarnings("WeakerAccess")
    +public class DefaultDataJanitorPlugin implements DataJanitorPlugin {
    +  public static final Logger LOG = LoggerFactory.getLogger(DefaultDataJanitorPlugin.class);
    +
    +  protected Configuration conf;
    +  protected Connection connection;
    +  protected DataJanitorState dataJanitorState;
    +
    +  @Override
    +  public void initialize(Configuration conf) throws IOException {
    +    this.conf = conf;
    +    this.connection = ConnectionFactory.createConnection(conf);
    +
    +    final TableName stateTable = TableName.valueOf(conf.get(TxConstants.DataJanitor.PRUNE_STATE_TABLE,
    +                                                            TxConstants.DataJanitor.DEFAULT_PRUNE_STATE_TABLE));
    +    LOG.info("Initializing plugin with state table {}", stateTable.getNameWithNamespaceInclAsString());
    +    this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
    +      @Override
    +      public Table get() throws IOException {
    +        return connection.getTable(stateTable);
    +      }
    +    });
    +  }
    +
    +  @Override
    +  public long fetchPruneUpperBound(long time, long pruneUpperBoundForTime) throws IOException {
    +    LOG.debug("Fetching prune upper bound for time {} and max prune upper bound {}", time, pruneUpperBoundForTime);
    +    if (time < 0 || pruneUpperBoundForTime < 0) {
    +      return -1;
    +    }
    +
    +    SortedSet<byte[]> transactionalRegions = getTransactionalRegions();
    +    if (!transactionalRegions.isEmpty()) {
    +      LOG.debug("Saving {} transactional regions for time {}", transactionalRegions.size(), time);
    +      dataJanitorState.saveRegionsForTime(time, transactionalRegions);
    --- End diff --
    
    I am not sure I understand. Why does fetch() have to save anything? Shouldn't it just read (=fetch)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

Posted by anew <gi...@git.apache.org>.
Github user anew commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/20#discussion_r89391681
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java ---
    @@ -58,10 +105,237 @@ public long getPruneUpperBound(byte[] regionId) throws IOException {
         }
       }
     
    +  /**
    +   * Get latest prune upper bounds for given regions
    +   *
    +   * @param regions a set of regions
    +   * @return a map containing region id and its latest prune upper bound value
    +   * @throws IOException when not able to read the data from HBase
    +   */
    +  public Map<byte[], Long> getPruneUpperBoundForRegions(SortedSet<byte[]> regions) throws IOException {
    +    Map<byte[], Long> resultMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
    +      Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
    +      scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
    +
    +      try (ResultScanner scanner = stateTable.getScanner(scan)) {
    +        Result next;
    +        while ((next = scanner.next()) != null) {
    +          byte[] region = getRegionFromKey(next.getRow());
    +          if (regions.contains(region)) {
    +            byte[] timeBytes = next.getValue(FAMILY, PRUNE_UPPER_BOUND_COL);
    +            if (timeBytes != null) {
    +              long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
    +              resultMap.put(region, pruneUpperBoundRegion);
    +            }
    +          }
    +        }
    +      }
    +      return resultMap;
    +    }
    +  }
    +
    +  /**
    +   * Delete all regions that are not in the given exclude set and whose prune upper bound is less than a given value
    +   *
    +   * @param deletionPruneUpperBound prune upper bound below which regions will be deleted
    +   * @param excludeRegions set of regions that should not be deleted
    +   * @throws IOException when not able to delete data in HBase
    +   */
    +  public void deleteRegionsWithPruneUpperBoundBefore(long deletionPruneUpperBound, SortedSet<byte[]> excludeRegions)
    +    throws IOException {
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
    +      Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
    +      scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
    +
    +      try (ResultScanner scanner = stateTable.getScanner(scan)) {
    +        Result next;
    +        while ((next = scanner.next()) != null) {
    +          byte[] region = getRegionFromKey(next.getRow());
    +          if (!excludeRegions.contains(region)) {
    +            byte[] timeBytes = next.getValue(FAMILY, PRUNE_UPPER_BOUND_COL);
    +            if (timeBytes != null) {
    +              long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
    +              if (pruneUpperBoundRegion < deletionPruneUpperBound) {
    +                stateTable.delete(new Delete(next.getRow()));
    +              }
    +            }
    +          }
    +        }
    +      }
    +    }
    +  }
    +
    +  // ---------------------------------------------------
    +  // ------- Methods for regions at a given time -------
    +  // Key: 0x2<time><region-id>
    +  // Col 't': <empty byte array>
    +  // ---------------------------------------------------
    +
    +  /**
    +   * Persist the regions for a given time
    +   *
    +   * @param time timestamp in milliseconds
    +   * @param regions set of regions at the time
    +   * @throws IOException when not able to persist the data to HBase
    +   */
    +  public void saveRegionsForTime(long time, Set<byte[]> regions) throws IOException {
    +    byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      for (byte[] region : regions) {
    +        Put put = new Put(makeTimeRegionKey(timeBytes, region));
    +        put.addColumn(FAMILY, REGION_TIME_COL, EMPTY_BYTE_ARRAY);
    +        stateTable.put(put);
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Return all the persisted regions for a time equal to or less than the given time
    +   *
    +   * @param time timestamp in milliseconds
    +   * @return set of regions and time at which they were recorded
    +   * @throws IOException when not able to read the data from HBase
    +   */
    +  @Nullable
    +  public TimeRegions getRegionsOnOrBeforeTime(long time) throws IOException {
    +    byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
    +      scan.addColumn(FAMILY, REGION_TIME_COL);
    +
    +      SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
    +      long currentRegionTime = -1;
    +      try (ResultScanner scanner = stateTable.getScanner(scan)) {
    +        Result next;
    +        while ((next = scanner.next()) != null) {
    +          Map.Entry<Long, byte[]> timeRegion = getTimeRegion(next.getRow());
    +          // Stop if reached next time value
    +          if (currentRegionTime == -1) {
    +            currentRegionTime = timeRegion.getKey();
    +          } else if (timeRegion.getKey() < currentRegionTime) {
    +            break;
    +          } else if (timeRegion.getKey() > currentRegionTime) {
    +            throw new IllegalStateException(
    +              String.format("Got out of order time %d when expecting time lesser than %d",
    +                            timeRegion.getKey(), currentRegionTime));
    +          }
    +          regions.add(timeRegion.getValue());
    +        }
    +      }
    +      return regions.isEmpty() ? null : new TimeRegions(currentRegionTime, regions);
    +    }
    +  }
    +
    +  /**
    +   * Delete all the regions that were recorded for all times equal or less than the given time
    +   *
    +   * @param time timestamp in milliseconds
    +   * @throws IOException when not able to delete data in HBase
    +   */
    +  public void deleteAllRegionsOnOrBeforeTime(long time) throws IOException {
    +    byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
    +      scan.addColumn(FAMILY, REGION_TIME_COL);
    +
    +      try (ResultScanner scanner = stateTable.getScanner(scan)) {
    +        Result next;
    +        while ((next = scanner.next()) != null) {
    +          stateTable.delete(new Delete(next.getRow()));
    +        }
    +      }
    +    }
    +  }
    +
    +  // ----------------------------------------------------------------
    +  // ------- Methods for max prune upper bound for given time -------
    +  // Key: 0x3<inverted time>
    +  // Col 'p': <prune upper bound>
    +  // ----------------------------------------------------------------
    +
    +  /**
    +   * Persist prune upper bound for a given time
    +   *
    +   * @param time time in milliseconds
    +   * @param pruneUpperBoundTime prune upper bound for the given time
    +   * @throws IOException when not able to persist the data to HBase
    +   */
    +  public void savePruneUpperBoundForTime(long time, long pruneUpperBoundTime) throws IOException {
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      Put put = new Put(makePruneUpperBoundTimeKey(Bytes.toBytes(getInvertedTime(time))));
    +      put.addColumn(FAMILY, PRUNE_UPPER_BOUND_TIME_COL, Bytes.toBytes(pruneUpperBoundTime));
    +      stateTable.put(put);
    +    }
    +  }
    +
    +  /**
    +   * Return prune upper bound for the given time
    +   *
    +   * @param time time in milliseconds
    +   * @return prune upper bound for the given time
    +   * @throws IOException when not able to read the data from HBase
    +   */
    +  public long getPruneUpperBoundForTime(long time) throws IOException {
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      Get get = new Get(makePruneUpperBoundTimeKey(Bytes.toBytes(getInvertedTime(time))));
    +      get.addColumn(FAMILY, PRUNE_UPPER_BOUND_TIME_COL);
    +      byte[] result = stateTable.get(get).getValue(FAMILY, PRUNE_UPPER_BOUND_TIME_COL);
    +      return result == null ? -1 : Bytes.toLong(result);
    +    }
    +  }
    +
    +  /**
    +   * Delete all prune upper bounds recorded for a time less than the given time
    +   *
    +   * @param time time in milliseconds
    +   * @throws IOException when not able to delete data in HBase
    +   */
    +  public void deletePruneUpperBoundsOnOrBeforeTime(long time) throws IOException {
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      Scan scan = new Scan(makePruneUpperBoundTimeKey(Bytes.toBytes(getInvertedTime(time))),
    +                           PRUNE_UPPER_BOUND_TIME_KEY_PREFIX_STOP);
    +      scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_TIME_COL);
    +
    +      try (ResultScanner scanner = stateTable.getScanner(scan)) {
    +        Result next;
    +        while ((next = scanner.next()) != null) {
    +          stateTable.delete(new Delete(next.getRow()));
    +        }
    +      }
    +    }
    +  }
    +
       private byte[] makeRegionKey(byte[] regionId) {
         return Bytes.add(REGION_KEY_PREFIX, regionId);
       }
     
    +  private byte[] getRegionFromKey(byte[] regionKey) {
    +    return Bytes.copy(regionKey, 1, regionKey.length - 1);
    --- End diff --
    
    strictly speaking, this should be REGION_KEY_PREFIX.length instead of 1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

Posted by anew <gi...@git.apache.org>.
Github user anew commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/20#discussion_r89938828
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java ---
    @@ -58,10 +105,237 @@ public long getPruneUpperBound(byte[] regionId) throws IOException {
         }
       }
     
    +  /**
    +   * Get latest prune upper bounds for given regions
    +   *
    +   * @param regions a set of regions
    +   * @return a map containing region id and its latest prune upper bound value
    +   * @throws IOException when not able to read the data from HBase
    +   */
    +  public Map<byte[], Long> getPruneUpperBoundForRegions(SortedSet<byte[]> regions) throws IOException {
    +    Map<byte[], Long> resultMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
    +      Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
    +      scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
    +
    +      try (ResultScanner scanner = stateTable.getScanner(scan)) {
    +        Result next;
    +        while ((next = scanner.next()) != null) {
    +          byte[] region = getRegionFromKey(next.getRow());
    +          if (regions.contains(region)) {
    +            byte[] timeBytes = next.getValue(FAMILY, PRUNE_UPPER_BOUND_COL);
    +            if (timeBytes != null) {
    +              long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
    +              resultMap.put(region, pruneUpperBoundRegion);
    +            }
    +          }
    +        }
    +      }
    +      return resultMap;
    +    }
    +  }
    +
    +  /**
    +   * Delete all regions that are not in the given exclude set and whose prune upper bound is less than a given value
    +   *
    +   * @param deletionPruneUpperBound prune upper bound below which regions will be deleted
    +   * @param excludeRegions set of regions that should not be deleted
    +   * @throws IOException when not able to delete data in HBase
    +   */
    +  public void deleteRegionsWithPruneUpperBoundBefore(long deletionPruneUpperBound, SortedSet<byte[]> excludeRegions)
    +    throws IOException {
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
    +      Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
    +      scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
    +
    +      try (ResultScanner scanner = stateTable.getScanner(scan)) {
    +        Result next;
    +        while ((next = scanner.next()) != null) {
    +          byte[] region = getRegionFromKey(next.getRow());
    +          if (!excludeRegions.contains(region)) {
    +            byte[] timeBytes = next.getValue(FAMILY, PRUNE_UPPER_BOUND_COL);
    +            if (timeBytes != null) {
    +              long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
    +              if (pruneUpperBoundRegion < deletionPruneUpperBound) {
    +                stateTable.delete(new Delete(next.getRow()));
    +              }
    +            }
    +          }
    +        }
    +      }
    +    }
    +  }
    +
    +  // ---------------------------------------------------
    +  // ------- Methods for regions at a given time -------
    +  // Key: 0x2<time><region-id>
    +  // Col 't': <empty byte array>
    +  // ---------------------------------------------------
    +
    +  /**
    +   * Persist the regions for a given time
    --- End diff --
    
    what exactly does this persist? Simply what regions existed at that time?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

Posted by anew <gi...@git.apache.org>.
Github user anew commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/20#discussion_r89938230
  
    --- Diff: tephra-core/src/main/java/org/apache/tephra/janitor/DataJanitorPlugin.java ---
    @@ -0,0 +1,59 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.tephra.janitor;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +import java.io.IOException;
    +
    +/**
    + * Data janitor interface to manage the invalid transaction list.
    + * There will be one such plugin per data store that will be invoked periodically
    + * to fetch the prune upper bound for each data store.
    + * Invalid transaction list will pruned up to the minimum of prune upper bounds returned by all the plugins.
    + */
    +public interface DataJanitorPlugin {
    +  /**
    +   * Called once at the beginning to initialize the plugin
    +   */
    +  void initialize(Configuration conf) throws IOException;
    --- End diff --
    
    at the beginning of what? The lifetime of a transaction manager? Or the beginning of a prune operation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---