You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by go...@apache.org on 2017/03/06 21:47:30 UTC
incubator-tephra git commit: (TEPHRA-227) Add new command to get the
set of regions that have not been compacted
Repository: incubator-tephra
Updated Branches:
refs/heads/release/0.11.0-incubating 95c6bfb6b -> 66a2fce79
(TEPHRA-227) Add new command to get the set of regions that have not been compacted
This closes #40 from GitHub.
Signed-off-by: Gokul Gunasekaran <go...@cask.co>
Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/66a2fce7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/66a2fce7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/66a2fce7
Branch: refs/heads/release/0.11.0-incubating
Commit: 66a2fce795dec34da7b992aeda883345fbbf3083
Parents: 95c6bfb
Author: Gokul Gunasekaran <go...@cask.co>
Authored: Wed Mar 1 14:54:40 2017 -0800
Committer: Gokul Gunasekaran <go...@cask.co>
Committed: Mon Mar 6 13:47:14 2017 -0800
----------------------------------------------------------------------
.../hbase/txprune/InvalidListPruningDebug.java | 105 +++++++++++++++++--
.../hbase/txprune/InvalidListPruningDebug.java | 105 +++++++++++++++++--
.../hbase/txprune/InvalidListPruningDebug.java | 105 +++++++++++++++++--
.../hbase/txprune/InvalidListPruningDebug.java | 105 +++++++++++++++++--
.../hbase/txprune/InvalidListPruningDebug.java | 105 +++++++++++++++++--
5 files changed, 495 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/66a2fce7/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
index d48e48d..620885b 100644
--- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
+++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
@@ -21,6 +21,7 @@ package org.apache.tephra.hbase.txprune;
import com.google.common.collect.Iterables;
import com.google.common.collect.MinMaxPriorityQueue;
+import com.google.common.collect.Sets;
import com.google.gson.Gson;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -36,12 +37,15 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.PrintWriter;
+import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
+import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import javax.annotation.Nullable;
@@ -53,6 +57,8 @@ public class InvalidListPruningDebug {
private static final Logger LOG = LoggerFactory.getLogger(InvalidListPruningDebug.class);
private static final Gson GSON = new Gson();
private DataJanitorState dataJanitorState;
+ private HConnection connection;
+ private TableName tableName;
/**
* Initialize the Invalid List Debug Tool.
@@ -61,20 +67,76 @@ public class InvalidListPruningDebug {
*/
public void initialize(final Configuration conf) throws IOException {
LOG.debug("InvalidListPruningDebugMain : initialize method called");
- final HConnection connection = new HBaseAdmin(conf).getConnection();
+ connection = new HBaseAdmin(conf).getConnection();
+ tableName = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
@Override
public HTableInterface get() throws IOException {
- return connection.getTable(TableName.valueOf(
- conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
- TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE)));
+ return connection.getTable(tableName);
}
});
}
+ public void destroy() throws IOException {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+
+ /**
+ * Returns a set of regions that are live but are not empty nor have a prune upper bound recorded. These regions
+ * will stop the progress of pruning.
+ *
+ * @param numRegions number of regions
+ * @return {@link Set} of regions that needs to be compacted and flushed
+ */
+ public Set<String> getRegionsToBeCompacted(Integer numRegions) throws IOException {
+ // Fetch the live regions
+ Map<Long, SortedSet<String>> latestTimeRegion = getRegionsOnOrBeforeTime(System.currentTimeMillis());
+ if (latestTimeRegion.isEmpty()) {
+ return new HashSet<>();
+ }
+
+ Long timestamp = latestTimeRegion.keySet().iterator().next();
+ SortedSet<String> liveRegions = latestTimeRegion.get(timestamp);
+
+ SortedSet<byte[]> emptyRegions = dataJanitorState.getEmptyRegionsAfterTime(timestamp, null);
+ SortedSet<String> emptyRegionNames = new TreeSet<>();
+ Iterable<String> regionStrings = Iterables.transform(emptyRegions, TimeRegions.BYTE_ARR_TO_STRING_FN);
+ for (String regionString : regionStrings) {
+ emptyRegionNames.add(regionString);
+ }
+
+ Set<String> nonEmptyRegions = Sets.newHashSet(Sets.difference(liveRegions, emptyRegionNames));
+
+ // Get all pruned regions and remove them from the nonEmptyRegions, resulting in a set of regions that are
+ // not empty and have not been registered prune upper bound
+ Queue<RegionPruneInfo> prunedRegions = getIdleRegions(-1);
+ for (RegionPruneInfo prunedRegion : prunedRegions) {
+ if (nonEmptyRegions.contains(prunedRegion.getRegionNameAsString())) {
+ nonEmptyRegions.remove(prunedRegion.getRegionNameAsString());
+ }
+ }
+
+ if ((numRegions < 0) || (numRegions >= nonEmptyRegions.size())) {
+ return nonEmptyRegions;
+ }
+
+ Set<String> subsetRegions = new HashSet<>(numRegions);
+ for (String regionName : nonEmptyRegions) {
+ if (subsetRegions.size() == numRegions) {
+ break;
+ }
+ subsetRegions.add(regionName);
+ }
+ return subsetRegions;
+ }
+
/**
* Return a list of RegionPruneInfo. These regions are the ones that have the lowest prune upper bounds.
- * If -1 is passed in, all the regions and their prune upper bound will be returned.
+ * If -1 is passed in, all the regions and their prune upper bound will be returned. Note that only the regions
+ * that are known to be live will be returned.
*
* @param numRegions number of regions
* @return Map of region name and its prune upper bound
@@ -85,10 +147,32 @@ public class InvalidListPruningDebug {
return new LinkedList<>();
}
+ // Create a set with region names
+ Set<String> pruneRegionNameSet = new HashSet<>();
+ for (RegionPruneInfo regionPruneInfo : regionPruneInfos) {
+ pruneRegionNameSet.add(regionPruneInfo.getRegionNameAsString());
+ }
+
+ // Fetch the live regions
+ Map<Long, SortedSet<String>> latestTimeRegion = getRegionsOnOrBeforeTime(System.currentTimeMillis());
+ if (!latestTimeRegion.isEmpty()) {
+ SortedSet<String> liveRegions = latestTimeRegion.values().iterator().next();
+ Set<String> liveRegionsWithPruneInfo = Sets.intersection(liveRegions, pruneRegionNameSet);
+ List<RegionPruneInfo> liveRegionWithPruneInfoList = new ArrayList<>();
+ for (RegionPruneInfo regionPruneInfo : regionPruneInfos) {
+ if (liveRegionsWithPruneInfo.contains(regionPruneInfo.getRegionNameAsString())) {
+ liveRegionWithPruneInfoList.add(regionPruneInfo);
+ }
+ }
+
+ // Use the subset of live regions and prune regions
+ regionPruneInfos = liveRegionWithPruneInfoList;
+ }
+
if (numRegions < 0) {
numRegions = regionPruneInfos.size();
}
-
+
Queue<RegionPruneInfo> lowestPrunes = MinMaxPriorityQueue.orderedBy(new Comparator<RegionPruneInfo>() {
@Override
public int compare(RegionPruneInfo o1, RegionPruneInfo o2) {
@@ -148,6 +232,9 @@ public class InvalidListPruningDebug {
"provided as the limit, prune upper bounds of all regions are returned.");
pw.println("prune-info region-name-as-string");
pw.println("Desc: Prints out the Pruning information for the region 'region-name-as-string'");
+ pw.println("to-compact-regions limit");
+ pw.println("Desc: Prints out 'limit' number of regions that are active, but are not empty, " +
+ "and have not registered a prune upper bound.");
}
private boolean execute(String[] args) throws IOException {
@@ -177,6 +264,11 @@ public class InvalidListPruningDebug {
pw.println(String.format("No prune info found for the region %s.", parameter));
}
return true;
+ } else if ("to-compact-regions".equals(command)) {
+ Integer numRegions = Integer.parseInt(parameter);
+ Set<String> toBeCompactedRegions = getRegionsToBeCompacted(numRegions);
+ pw.println(GSON.toJson(toBeCompactedRegions));
+ return true;
} else {
pw.println(String.format("%s is not a valid command.", command));
printUsage(pw);
@@ -191,6 +283,7 @@ public class InvalidListPruningDebug {
try {
pruningDebug.initialize(hConf);
boolean success = pruningDebug.execute(args);
+ pruningDebug.destroy();
if (!success) {
System.exit(1);
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/66a2fce7/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
index d48e48d..620885b 100644
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
@@ -21,6 +21,7 @@ package org.apache.tephra.hbase.txprune;
import com.google.common.collect.Iterables;
import com.google.common.collect.MinMaxPriorityQueue;
+import com.google.common.collect.Sets;
import com.google.gson.Gson;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -36,12 +37,15 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.PrintWriter;
+import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
+import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import javax.annotation.Nullable;
@@ -53,6 +57,8 @@ public class InvalidListPruningDebug {
private static final Logger LOG = LoggerFactory.getLogger(InvalidListPruningDebug.class);
private static final Gson GSON = new Gson();
private DataJanitorState dataJanitorState;
+ private HConnection connection;
+ private TableName tableName;
/**
* Initialize the Invalid List Debug Tool.
@@ -61,20 +67,76 @@ public class InvalidListPruningDebug {
*/
public void initialize(final Configuration conf) throws IOException {
LOG.debug("InvalidListPruningDebugMain : initialize method called");
- final HConnection connection = new HBaseAdmin(conf).getConnection();
+ connection = new HBaseAdmin(conf).getConnection();
+ tableName = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
@Override
public HTableInterface get() throws IOException {
- return connection.getTable(TableName.valueOf(
- conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
- TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE)));
+ return connection.getTable(tableName);
}
});
}
+ public void destroy() throws IOException {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+
+ /**
+ * Returns a set of regions that are live but are not empty nor have a prune upper bound recorded. These regions
+ * will stop the progress of pruning.
+ *
+ * @param numRegions number of regions
+ * @return {@link Set} of regions that needs to be compacted and flushed
+ */
+ public Set<String> getRegionsToBeCompacted(Integer numRegions) throws IOException {
+ // Fetch the live regions
+ Map<Long, SortedSet<String>> latestTimeRegion = getRegionsOnOrBeforeTime(System.currentTimeMillis());
+ if (latestTimeRegion.isEmpty()) {
+ return new HashSet<>();
+ }
+
+ Long timestamp = latestTimeRegion.keySet().iterator().next();
+ SortedSet<String> liveRegions = latestTimeRegion.get(timestamp);
+
+ SortedSet<byte[]> emptyRegions = dataJanitorState.getEmptyRegionsAfterTime(timestamp, null);
+ SortedSet<String> emptyRegionNames = new TreeSet<>();
+ Iterable<String> regionStrings = Iterables.transform(emptyRegions, TimeRegions.BYTE_ARR_TO_STRING_FN);
+ for (String regionString : regionStrings) {
+ emptyRegionNames.add(regionString);
+ }
+
+ Set<String> nonEmptyRegions = Sets.newHashSet(Sets.difference(liveRegions, emptyRegionNames));
+
+ // Get all pruned regions and remove them from the nonEmptyRegions, resulting in a set of regions that are
+ // not empty and have not been registered prune upper bound
+ Queue<RegionPruneInfo> prunedRegions = getIdleRegions(-1);
+ for (RegionPruneInfo prunedRegion : prunedRegions) {
+ if (nonEmptyRegions.contains(prunedRegion.getRegionNameAsString())) {
+ nonEmptyRegions.remove(prunedRegion.getRegionNameAsString());
+ }
+ }
+
+ if ((numRegions < 0) || (numRegions >= nonEmptyRegions.size())) {
+ return nonEmptyRegions;
+ }
+
+ Set<String> subsetRegions = new HashSet<>(numRegions);
+ for (String regionName : nonEmptyRegions) {
+ if (subsetRegions.size() == numRegions) {
+ break;
+ }
+ subsetRegions.add(regionName);
+ }
+ return subsetRegions;
+ }
+
/**
* Return a list of RegionPruneInfo. These regions are the ones that have the lowest prune upper bounds.
- * If -1 is passed in, all the regions and their prune upper bound will be returned.
+ * If -1 is passed in, all the regions and their prune upper bound will be returned. Note that only the regions
+ * that are known to be live will be returned.
*
* @param numRegions number of regions
* @return Map of region name and its prune upper bound
@@ -85,10 +147,32 @@ public class InvalidListPruningDebug {
return new LinkedList<>();
}
+ // Create a set with region names
+ Set<String> pruneRegionNameSet = new HashSet<>();
+ for (RegionPruneInfo regionPruneInfo : regionPruneInfos) {
+ pruneRegionNameSet.add(regionPruneInfo.getRegionNameAsString());
+ }
+
+ // Fetch the live regions
+ Map<Long, SortedSet<String>> latestTimeRegion = getRegionsOnOrBeforeTime(System.currentTimeMillis());
+ if (!latestTimeRegion.isEmpty()) {
+ SortedSet<String> liveRegions = latestTimeRegion.values().iterator().next();
+ Set<String> liveRegionsWithPruneInfo = Sets.intersection(liveRegions, pruneRegionNameSet);
+ List<RegionPruneInfo> liveRegionWithPruneInfoList = new ArrayList<>();
+ for (RegionPruneInfo regionPruneInfo : regionPruneInfos) {
+ if (liveRegionsWithPruneInfo.contains(regionPruneInfo.getRegionNameAsString())) {
+ liveRegionWithPruneInfoList.add(regionPruneInfo);
+ }
+ }
+
+ // Use the subset of live regions and prune regions
+ regionPruneInfos = liveRegionWithPruneInfoList;
+ }
+
if (numRegions < 0) {
numRegions = regionPruneInfos.size();
}
-
+
Queue<RegionPruneInfo> lowestPrunes = MinMaxPriorityQueue.orderedBy(new Comparator<RegionPruneInfo>() {
@Override
public int compare(RegionPruneInfo o1, RegionPruneInfo o2) {
@@ -148,6 +232,9 @@ public class InvalidListPruningDebug {
"provided as the limit, prune upper bounds of all regions are returned.");
pw.println("prune-info region-name-as-string");
pw.println("Desc: Prints out the Pruning information for the region 'region-name-as-string'");
+ pw.println("to-compact-regions limit");
+ pw.println("Desc: Prints out 'limit' number of regions that are active, but are not empty, " +
+ "and have not registered a prune upper bound.");
}
private boolean execute(String[] args) throws IOException {
@@ -177,6 +264,11 @@ public class InvalidListPruningDebug {
pw.println(String.format("No prune info found for the region %s.", parameter));
}
return true;
+ } else if ("to-compact-regions".equals(command)) {
+ Integer numRegions = Integer.parseInt(parameter);
+ Set<String> toBeCompactedRegions = getRegionsToBeCompacted(numRegions);
+ pw.println(GSON.toJson(toBeCompactedRegions));
+ return true;
} else {
pw.println(String.format("%s is not a valid command.", command));
printUsage(pw);
@@ -191,6 +283,7 @@ public class InvalidListPruningDebug {
try {
pruningDebug.initialize(hConf);
boolean success = pruningDebug.execute(args);
+ pruningDebug.destroy();
if (!success) {
System.exit(1);
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/66a2fce7/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
index e748d90..443c998 100644
--- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
+++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
@@ -21,6 +21,7 @@ package org.apache.tephra.hbase.txprune;
import com.google.common.collect.Iterables;
import com.google.common.collect.MinMaxPriorityQueue;
+import com.google.common.collect.Sets;
import com.google.gson.Gson;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -36,12 +37,15 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.PrintWriter;
+import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
+import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import javax.annotation.Nullable;
@@ -53,6 +57,8 @@ public class InvalidListPruningDebug {
private static final Logger LOG = LoggerFactory.getLogger(InvalidListPruningDebug.class);
private static final Gson GSON = new Gson();
private DataJanitorState dataJanitorState;
+ private Connection connection;
+ private TableName tableName;
/**
* Initialize the Invalid List Debug Tool.
@@ -61,20 +67,76 @@ public class InvalidListPruningDebug {
*/
public void initialize(final Configuration conf) throws IOException {
LOG.debug("InvalidListPruningDebugMain : initialize method called");
- final Connection connection = ConnectionFactory.createConnection(conf);
+ connection = ConnectionFactory.createConnection(conf);
+ tableName = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
@Override
public Table get() throws IOException {
- return connection.getTable(TableName.valueOf(
- conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
- TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE)));
+ return connection.getTable(tableName);
}
});
}
+ public void destroy() throws IOException {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+
+ /**
+ * Returns a set of regions that are live but are not empty nor have a prune upper bound recorded. These regions
+ * will stop the progress of pruning.
+ *
+ * @param numRegions number of regions
+ * @return {@link Set} of regions that needs to be compacted and flushed
+ */
+ public Set<String> getRegionsToBeCompacted(Integer numRegions) throws IOException {
+ // Fetch the live regions
+ Map<Long, SortedSet<String>> latestTimeRegion = getRegionsOnOrBeforeTime(System.currentTimeMillis());
+ if (latestTimeRegion.isEmpty()) {
+ return new HashSet<>();
+ }
+
+ Long timestamp = latestTimeRegion.keySet().iterator().next();
+ SortedSet<String> liveRegions = latestTimeRegion.get(timestamp);
+
+ SortedSet<byte[]> emptyRegions = dataJanitorState.getEmptyRegionsAfterTime(timestamp, null);
+ SortedSet<String> emptyRegionNames = new TreeSet<>();
+ Iterable<String> regionStrings = Iterables.transform(emptyRegions, TimeRegions.BYTE_ARR_TO_STRING_FN);
+ for (String regionString : regionStrings) {
+ emptyRegionNames.add(regionString);
+ }
+
+ Set<String> nonEmptyRegions = Sets.newHashSet(Sets.difference(liveRegions, emptyRegionNames));
+
+ // Get all pruned regions and remove them from the nonEmptyRegions, resulting in a set of regions that are
+ // not empty and have not been registered prune upper bound
+ Queue<RegionPruneInfo> prunedRegions = getIdleRegions(-1);
+ for (RegionPruneInfo prunedRegion : prunedRegions) {
+ if (nonEmptyRegions.contains(prunedRegion.getRegionNameAsString())) {
+ nonEmptyRegions.remove(prunedRegion.getRegionNameAsString());
+ }
+ }
+
+ if ((numRegions < 0) || (numRegions >= nonEmptyRegions.size())) {
+ return nonEmptyRegions;
+ }
+
+ Set<String> subsetRegions = new HashSet<>(numRegions);
+ for (String regionName : nonEmptyRegions) {
+ if (subsetRegions.size() == numRegions) {
+ break;
+ }
+ subsetRegions.add(regionName);
+ }
+ return subsetRegions;
+ }
+
/**
* Return a list of RegionPruneInfo. These regions are the ones that have the lowest prune upper bounds.
- * If -1 is passed in, all the regions and their prune upper bound will be returned.
+ * If -1 is passed in, all the regions and their prune upper bound will be returned. Note that only the regions
+ * that are known to be live will be returned.
*
* @param numRegions number of regions
* @return Map of region name and its prune upper bound
@@ -85,10 +147,32 @@ public class InvalidListPruningDebug {
return new LinkedList<>();
}
+ // Create a set with region names
+ Set<String> pruneRegionNameSet = new HashSet<>();
+ for (RegionPruneInfo regionPruneInfo : regionPruneInfos) {
+ pruneRegionNameSet.add(regionPruneInfo.getRegionNameAsString());
+ }
+
+ // Fetch the live regions
+ Map<Long, SortedSet<String>> latestTimeRegion = getRegionsOnOrBeforeTime(System.currentTimeMillis());
+ if (!latestTimeRegion.isEmpty()) {
+ SortedSet<String> liveRegions = latestTimeRegion.values().iterator().next();
+ Set<String> liveRegionsWithPruneInfo = Sets.intersection(liveRegions, pruneRegionNameSet);
+ List<RegionPruneInfo> liveRegionWithPruneInfoList = new ArrayList<>();
+ for (RegionPruneInfo regionPruneInfo : regionPruneInfos) {
+ if (liveRegionsWithPruneInfo.contains(regionPruneInfo.getRegionNameAsString())) {
+ liveRegionWithPruneInfoList.add(regionPruneInfo);
+ }
+ }
+
+ // Use the subset of live regions and prune regions
+ regionPruneInfos = liveRegionWithPruneInfoList;
+ }
+
if (numRegions < 0) {
numRegions = regionPruneInfos.size();
}
-
+
Queue<RegionPruneInfo> lowestPrunes = MinMaxPriorityQueue.orderedBy(new Comparator<RegionPruneInfo>() {
@Override
public int compare(RegionPruneInfo o1, RegionPruneInfo o2) {
@@ -148,6 +232,9 @@ public class InvalidListPruningDebug {
"provided as the limit, prune upper bounds of all regions are returned.");
pw.println("prune-info region-name-as-string");
pw.println("Desc: Prints out the Pruning information for the region 'region-name-as-string'");
+ pw.println("to-compact-regions limit");
+ pw.println("Desc: Prints out 'limit' number of regions that are active, but are not empty, " +
+ "and have not registered a prune upper bound.");
}
private boolean execute(String[] args) throws IOException {
@@ -177,6 +264,11 @@ public class InvalidListPruningDebug {
pw.println(String.format("No prune info found for the region %s.", parameter));
}
return true;
+ } else if ("to-compact-regions".equals(command)) {
+ Integer numRegions = Integer.parseInt(parameter);
+ Set<String> toBeCompactedRegions = getRegionsToBeCompacted(numRegions);
+ pw.println(GSON.toJson(toBeCompactedRegions));
+ return true;
} else {
pw.println(String.format("%s is not a valid command.", command));
printUsage(pw);
@@ -191,6 +283,7 @@ public class InvalidListPruningDebug {
try {
pruningDebug.initialize(hConf);
boolean success = pruningDebug.execute(args);
+ pruningDebug.destroy();
if (!success) {
System.exit(1);
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/66a2fce7/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
index e748d90..443c998 100644
--- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
@@ -21,6 +21,7 @@ package org.apache.tephra.hbase.txprune;
import com.google.common.collect.Iterables;
import com.google.common.collect.MinMaxPriorityQueue;
+import com.google.common.collect.Sets;
import com.google.gson.Gson;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -36,12 +37,15 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.PrintWriter;
+import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
+import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import javax.annotation.Nullable;
@@ -53,6 +57,8 @@ public class InvalidListPruningDebug {
private static final Logger LOG = LoggerFactory.getLogger(InvalidListPruningDebug.class);
private static final Gson GSON = new Gson();
private DataJanitorState dataJanitorState;
+ private Connection connection;
+ private TableName tableName;
/**
* Initialize the Invalid List Debug Tool.
@@ -61,20 +67,76 @@ public class InvalidListPruningDebug {
*/
public void initialize(final Configuration conf) throws IOException {
LOG.debug("InvalidListPruningDebugMain : initialize method called");
- final Connection connection = ConnectionFactory.createConnection(conf);
+ connection = ConnectionFactory.createConnection(conf);
+ tableName = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
@Override
public Table get() throws IOException {
- return connection.getTable(TableName.valueOf(
- conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
- TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE)));
+ return connection.getTable(tableName);
}
});
}
+ public void destroy() throws IOException {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+
+ /**
+ * Returns a set of regions that are live but are not empty nor have a prune upper bound recorded. These regions
+ * will stop the progress of pruning.
+ *
+ * @param numRegions number of regions
+ * @return {@link Set} of regions that needs to be compacted and flushed
+ */
+ public Set<String> getRegionsToBeCompacted(Integer numRegions) throws IOException {
+ // Fetch the live regions
+ Map<Long, SortedSet<String>> latestTimeRegion = getRegionsOnOrBeforeTime(System.currentTimeMillis());
+ if (latestTimeRegion.isEmpty()) {
+ return new HashSet<>();
+ }
+
+ Long timestamp = latestTimeRegion.keySet().iterator().next();
+ SortedSet<String> liveRegions = latestTimeRegion.get(timestamp);
+
+ SortedSet<byte[]> emptyRegions = dataJanitorState.getEmptyRegionsAfterTime(timestamp, null);
+ SortedSet<String> emptyRegionNames = new TreeSet<>();
+ Iterable<String> regionStrings = Iterables.transform(emptyRegions, TimeRegions.BYTE_ARR_TO_STRING_FN);
+ for (String regionString : regionStrings) {
+ emptyRegionNames.add(regionString);
+ }
+
+ Set<String> nonEmptyRegions = Sets.newHashSet(Sets.difference(liveRegions, emptyRegionNames));
+
+ // Get all pruned regions and remove them from the nonEmptyRegions, resulting in a set of regions that are
+ // not empty and have not been registered prune upper bound
+ Queue<RegionPruneInfo> prunedRegions = getIdleRegions(-1);
+ for (RegionPruneInfo prunedRegion : prunedRegions) {
+ if (nonEmptyRegions.contains(prunedRegion.getRegionNameAsString())) {
+ nonEmptyRegions.remove(prunedRegion.getRegionNameAsString());
+ }
+ }
+
+ if ((numRegions < 0) || (numRegions >= nonEmptyRegions.size())) {
+ return nonEmptyRegions;
+ }
+
+ Set<String> subsetRegions = new HashSet<>(numRegions);
+ for (String regionName : nonEmptyRegions) {
+ if (subsetRegions.size() == numRegions) {
+ break;
+ }
+ subsetRegions.add(regionName);
+ }
+ return subsetRegions;
+ }
+
/**
* Return a list of RegionPruneInfo. These regions are the ones that have the lowest prune upper bounds.
- * If -1 is passed in, all the regions and their prune upper bound will be returned.
+ * If -1 is passed in, all the regions and their prune upper bound will be returned. Note that only the regions
+ * that are known to be live will be returned.
*
* @param numRegions number of regions
* @return Map of region name and its prune upper bound
@@ -85,10 +147,32 @@ public class InvalidListPruningDebug {
return new LinkedList<>();
}
+ // Create a set with region names
+ Set<String> pruneRegionNameSet = new HashSet<>();
+ for (RegionPruneInfo regionPruneInfo : regionPruneInfos) {
+ pruneRegionNameSet.add(regionPruneInfo.getRegionNameAsString());
+ }
+
+ // Fetch the live regions
+ Map<Long, SortedSet<String>> latestTimeRegion = getRegionsOnOrBeforeTime(System.currentTimeMillis());
+ if (!latestTimeRegion.isEmpty()) {
+ SortedSet<String> liveRegions = latestTimeRegion.values().iterator().next();
+ Set<String> liveRegionsWithPruneInfo = Sets.intersection(liveRegions, pruneRegionNameSet);
+ List<RegionPruneInfo> liveRegionWithPruneInfoList = new ArrayList<>();
+ for (RegionPruneInfo regionPruneInfo : regionPruneInfos) {
+ if (liveRegionsWithPruneInfo.contains(regionPruneInfo.getRegionNameAsString())) {
+ liveRegionWithPruneInfoList.add(regionPruneInfo);
+ }
+ }
+
+ // Use the subset of live regions and prune regions
+ regionPruneInfos = liveRegionWithPruneInfoList;
+ }
+
if (numRegions < 0) {
numRegions = regionPruneInfos.size();
}
-
+
Queue<RegionPruneInfo> lowestPrunes = MinMaxPriorityQueue.orderedBy(new Comparator<RegionPruneInfo>() {
@Override
public int compare(RegionPruneInfo o1, RegionPruneInfo o2) {
@@ -148,6 +232,9 @@ public class InvalidListPruningDebug {
"provided as the limit, prune upper bounds of all regions are returned.");
pw.println("prune-info region-name-as-string");
pw.println("Desc: Prints out the Pruning information for the region 'region-name-as-string'");
+ pw.println("to-compact-regions limit");
+ pw.println("Desc: Prints out 'limit' number of regions that are active, but are not empty, " +
+ "and have not registered a prune upper bound.");
}
private boolean execute(String[] args) throws IOException {
@@ -177,6 +264,11 @@ public class InvalidListPruningDebug {
pw.println(String.format("No prune info found for the region %s.", parameter));
}
return true;
+ } else if ("to-compact-regions".equals(command)) {
+ Integer numRegions = Integer.parseInt(parameter);
+ Set<String> toBeCompactedRegions = getRegionsToBeCompacted(numRegions);
+ pw.println(GSON.toJson(toBeCompactedRegions));
+ return true;
} else {
pw.println(String.format("%s is not a valid command.", command));
printUsage(pw);
@@ -191,6 +283,7 @@ public class InvalidListPruningDebug {
try {
pruningDebug.initialize(hConf);
boolean success = pruningDebug.execute(args);
+ pruningDebug.destroy();
if (!success) {
System.exit(1);
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/66a2fce7/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
index e748d90..443c998 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
@@ -21,6 +21,7 @@ package org.apache.tephra.hbase.txprune;
import com.google.common.collect.Iterables;
import com.google.common.collect.MinMaxPriorityQueue;
+import com.google.common.collect.Sets;
import com.google.gson.Gson;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -36,12 +37,15 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.PrintWriter;
+import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
+import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import javax.annotation.Nullable;
@@ -53,6 +57,8 @@ public class InvalidListPruningDebug {
private static final Logger LOG = LoggerFactory.getLogger(InvalidListPruningDebug.class);
private static final Gson GSON = new Gson();
private DataJanitorState dataJanitorState;
+ private Connection connection;
+ private TableName tableName;
/**
* Initialize the Invalid List Debug Tool.
@@ -61,20 +67,76 @@ public class InvalidListPruningDebug {
*/
public void initialize(final Configuration conf) throws IOException {
LOG.debug("InvalidListPruningDebugMain : initialize method called");
- final Connection connection = ConnectionFactory.createConnection(conf);
+ connection = ConnectionFactory.createConnection(conf);
+ tableName = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
@Override
public Table get() throws IOException {
- return connection.getTable(TableName.valueOf(
- conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
- TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE)));
+ return connection.getTable(tableName);
}
});
}
+ public void destroy() throws IOException {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+
+ /**
+ * Returns a set of regions that are live but are not empty nor have a prune upper bound recorded. These regions
+ * will stop the progress of pruning.
+ *
+ * @param numRegions number of regions
+ * @return {@link Set} of regions that needs to be compacted and flushed
+ */
+ public Set<String> getRegionsToBeCompacted(Integer numRegions) throws IOException {
+ // Fetch the live regions
+ Map<Long, SortedSet<String>> latestTimeRegion = getRegionsOnOrBeforeTime(System.currentTimeMillis());
+ if (latestTimeRegion.isEmpty()) {
+ return new HashSet<>();
+ }
+
+ Long timestamp = latestTimeRegion.keySet().iterator().next();
+ SortedSet<String> liveRegions = latestTimeRegion.get(timestamp);
+
+ SortedSet<byte[]> emptyRegions = dataJanitorState.getEmptyRegionsAfterTime(timestamp, null);
+ SortedSet<String> emptyRegionNames = new TreeSet<>();
+ Iterable<String> regionStrings = Iterables.transform(emptyRegions, TimeRegions.BYTE_ARR_TO_STRING_FN);
+ for (String regionString : regionStrings) {
+ emptyRegionNames.add(regionString);
+ }
+
+ Set<String> nonEmptyRegions = Sets.newHashSet(Sets.difference(liveRegions, emptyRegionNames));
+
+ // Get all pruned regions and remove them from the nonEmptyRegions, resulting in a set of regions that are
+ // not empty and have not been registered prune upper bound
+ Queue<RegionPruneInfo> prunedRegions = getIdleRegions(-1);
+ for (RegionPruneInfo prunedRegion : prunedRegions) {
+ if (nonEmptyRegions.contains(prunedRegion.getRegionNameAsString())) {
+ nonEmptyRegions.remove(prunedRegion.getRegionNameAsString());
+ }
+ }
+
+ if ((numRegions < 0) || (numRegions >= nonEmptyRegions.size())) {
+ return nonEmptyRegions;
+ }
+
+ Set<String> subsetRegions = new HashSet<>(numRegions);
+ for (String regionName : nonEmptyRegions) {
+ if (subsetRegions.size() == numRegions) {
+ break;
+ }
+ subsetRegions.add(regionName);
+ }
+ return subsetRegions;
+ }
+
/**
* Return a list of RegionPruneInfo. These regions are the ones that have the lowest prune upper bounds.
- * If -1 is passed in, all the regions and their prune upper bound will be returned.
+ * If -1 is passed in, all the regions and their prune upper bound will be returned. Note that only the regions
+ * that are known to be live will be returned.
*
* @param numRegions number of regions
* @return Map of region name and its prune upper bound
@@ -85,10 +147,32 @@ public class InvalidListPruningDebug {
return new LinkedList<>();
}
+ // Create a set with region names
+ Set<String> pruneRegionNameSet = new HashSet<>();
+ for (RegionPruneInfo regionPruneInfo : regionPruneInfos) {
+ pruneRegionNameSet.add(regionPruneInfo.getRegionNameAsString());
+ }
+
+ // Fetch the live regions
+ Map<Long, SortedSet<String>> latestTimeRegion = getRegionsOnOrBeforeTime(System.currentTimeMillis());
+ if (!latestTimeRegion.isEmpty()) {
+ SortedSet<String> liveRegions = latestTimeRegion.values().iterator().next();
+ Set<String> liveRegionsWithPruneInfo = Sets.intersection(liveRegions, pruneRegionNameSet);
+ List<RegionPruneInfo> liveRegionWithPruneInfoList = new ArrayList<>();
+ for (RegionPruneInfo regionPruneInfo : regionPruneInfos) {
+ if (liveRegionsWithPruneInfo.contains(regionPruneInfo.getRegionNameAsString())) {
+ liveRegionWithPruneInfoList.add(regionPruneInfo);
+ }
+ }
+
+ // Use the subset of live regions and prune regions
+ regionPruneInfos = liveRegionWithPruneInfoList;
+ }
+
if (numRegions < 0) {
numRegions = regionPruneInfos.size();
}
-
+
Queue<RegionPruneInfo> lowestPrunes = MinMaxPriorityQueue.orderedBy(new Comparator<RegionPruneInfo>() {
@Override
public int compare(RegionPruneInfo o1, RegionPruneInfo o2) {
@@ -148,6 +232,9 @@ public class InvalidListPruningDebug {
"provided as the limit, prune upper bounds of all regions are returned.");
pw.println("prune-info region-name-as-string");
pw.println("Desc: Prints out the Pruning information for the region 'region-name-as-string'");
+ pw.println("to-compact-regions limit");
+ pw.println("Desc: Prints out 'limit' number of regions that are active, but are not empty, " +
+ "and have not registered a prune upper bound.");
}
private boolean execute(String[] args) throws IOException {
@@ -177,6 +264,11 @@ public class InvalidListPruningDebug {
pw.println(String.format("No prune info found for the region %s.", parameter));
}
return true;
+ } else if ("to-compact-regions".equals(command)) {
+ Integer numRegions = Integer.parseInt(parameter);
+ Set<String> toBeCompactedRegions = getRegionsToBeCompacted(numRegions);
+ pw.println(GSON.toJson(toBeCompactedRegions));
+ return true;
} else {
pw.println(String.format("%s is not a valid command.", command));
printUsage(pw);
@@ -191,6 +283,7 @@ public class InvalidListPruningDebug {
try {
pruningDebug.initialize(hConf);
boolean success = pruningDebug.execute(args);
+ pruningDebug.destroy();
if (!success) {
System.exit(1);
}