You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/11/13 06:55:30 UTC

[5/9] phoenix git commit: PHOENIX-1427 Reduce work in StatsCollector

PHOENIX-1427 Reduce work in StatsCollector


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/1e12e122
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/1e12e122
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/1e12e122

Branch: refs/heads/master
Commit: 1e12e122cace349a94bfee0b998242b505de33b3
Parents: 9ee5d01
Author: James Taylor <jt...@salesforce.com>
Authored: Wed Nov 12 13:55:29 2014 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Wed Nov 12 21:50:51 2014 -0800

----------------------------------------------------------------------
 .../schema/stats/StatisticsCollector.java       | 66 ++++++--------------
 .../phoenix/schema/stats/StatisticsScanner.java | 10 +--
 .../phoenix/schema/stats/StatisticsUtil.java    | 11 ++--
 .../phoenix/schema/stats/StatisticsWriter.java  | 40 +++++-------
 4 files changed, 44 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/1e12e122/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
index 9c85e63..4123ebe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
@@ -59,11 +59,9 @@ public class StatisticsCollector {
     private static final Logger logger = LoggerFactory.getLogger(StatisticsCollector.class);
     public static final long NO_TIMESTAMP = -1;
 
-    private Map<String, byte[]> minMap = Maps.newHashMap();
-    private Map<String, byte[]> maxMap = Maps.newHashMap();
     private long guidepostDepth;
     private long maxTimeStamp = MetaDataProtocol.MIN_TABLE_TIMESTAMP;
-    private Map<String, Pair<Long,GuidePostsInfo>> guidePostsMap = Maps.newHashMap();
+    private Map<ImmutableBytesPtr, Pair<Long,GuidePostsInfo>> guidePostsMap = Maps.newHashMap();
     // Tracks the bytecount per family if it has reached the guidePostsDepth
     private Map<ImmutableBytesPtr, Boolean> familyMap = Maps.newHashMap();
     protected StatisticsWriter statsTable;
@@ -112,13 +110,13 @@ public class StatisticsCollector {
                     if(logger.isDebugEnabled()) {
                         logger.debug("Deleting the stats for the region "+region.getRegionInfo());
                     }
-                    statsTable.deleteStats(region.getRegionInfo().getRegionName(), this, Bytes.toString(fam.copyBytesIfNecessary()),
+                    statsTable.deleteStats(region.getRegionInfo().getRegionName(), this, fam,
                             mutations);
                 }
                 if(logger.isDebugEnabled()) {
                     logger.debug("Adding new stats for the region "+region.getRegionInfo());
                 }
-                statsTable.addStats((region.getRegionInfo().getRegionName()), this, Bytes.toString(fam.copyBytesIfNecessary()),
+                statsTable.addStats((region.getRegionInfo().getRegionName()), this, fam,
                         mutations);
             }
         } catch (IOException e) {
@@ -149,7 +147,9 @@ public class StatisticsCollector {
         if (logger.isDebugEnabled()) {
             logger.debug("Compaction scanner created for stats");
         }
-        return getInternalScanner(region, store, s, store.getColumnFamilyName());
+        // FIXME: no way to get cf as byte[] ?
+        ImmutableBytesPtr cfKey = new ImmutableBytesPtr(Bytes.toBytes(store.getColumnFamilyName()));
+        return getInternalScanner(region, store, s, cfKey);
     }
 
     public void splitStats(HRegion parent, HRegion left, HRegion right) {
@@ -159,7 +159,7 @@ public class StatisticsCollector {
             }
             List<Mutation> mutations = Lists.newArrayListWithExpectedSize(3);
             for (byte[] fam : parent.getStores().keySet()) {
-            	statsTable.splitStats(parent, left, right, this, Bytes.toString(fam), mutations);
+            	statsTable.splitStats(parent, left, right, this, new ImmutableBytesPtr(fam), mutations);
             }
             if (logger.isDebugEnabled()) {
                 logger.debug("Committing stats for the daughter regions as part of split " + parent.getRegionInfo());
@@ -172,22 +172,19 @@ public class StatisticsCollector {
     }
 
     protected InternalScanner getInternalScanner(HRegion region, Store store,
-            InternalScanner internalScan, String family) {
-        return new StatisticsScanner(this, statsTable, region, internalScan,
-                Bytes.toBytes(family));
+            InternalScanner internalScan, ImmutableBytesPtr family) {
+        return new StatisticsScanner(this, statsTable, region, internalScan, family);
     }
 
     public void clear() {
-        this.maxMap.clear();
-        this.minMap.clear();
         this.guidePostsMap.clear();
         this.familyMap.clear();
         maxTimeStamp = MetaDataProtocol.MIN_TABLE_TIMESTAMP;
     }
 
-    public void addGuidePost(String fam, GuidePostsInfo info, long byteSize, long timestamp) {
+    public void addGuidePost(ImmutableBytesPtr cfKey, GuidePostsInfo info, long byteSize, long timestamp) {
     	Pair<Long,GuidePostsInfo> newInfo = new Pair<Long,GuidePostsInfo>(byteSize,info);
-    	Pair<Long,GuidePostsInfo> oldInfo = guidePostsMap.put(fam, newInfo);
+    	Pair<Long,GuidePostsInfo> oldInfo = guidePostsMap.put(cfKey, newInfo);
     	if (oldInfo != null) {
     		info.combine(oldInfo.getSecond());
     		newInfo.setFirst(oldInfo.getFirst() + newInfo.getFirst());
@@ -195,56 +192,31 @@ public class StatisticsCollector {
         maxTimeStamp = Math.max(maxTimeStamp, timestamp);
     }
     
+    @SuppressWarnings("deprecation")
     public void updateStatistic(KeyValue kv) {
-        @SuppressWarnings("deprecation")
-        byte[] cf = kv.getFamily();
-        familyMap.put(new ImmutableBytesPtr(cf), true);
+        ImmutableBytesPtr cfKey = new ImmutableBytesPtr(kv.getBuffer(), kv.getFamilyOffset(), kv.getFamilyLength());
+        familyMap.put(cfKey, true);
         
-        String fam = Bytes.toString(cf);
-        byte[] row = ByteUtil.copyKeyBytesIfNecessary(
-                new ImmutableBytesWritable(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength()));
-        if (!minMap.containsKey(fam) && !maxMap.containsKey(fam)) {
-            minMap.put(fam, row);
-            // Ideally the max key also should be added in this case
-            maxMap.put(fam, row);
-        } else {
-            if (Bytes.compareTo(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), minMap.get(fam), 0,
-                    minMap.get(fam).length) < 0) {
-                minMap.put(fam, row);
-            }
-            if (Bytes.compareTo(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), maxMap.get(fam), 0,
-                    maxMap.get(fam).length) > 0) {
-                maxMap.put(fam, row);
-            }
-        }
         maxTimeStamp = Math.max(maxTimeStamp, kv.getTimestamp());
         // TODO : This can be moved to an interface so that we could collect guide posts in different ways
-        Pair<Long,GuidePostsInfo> gps = guidePostsMap.get(fam);
+        Pair<Long,GuidePostsInfo> gps = guidePostsMap.get(cfKey);
         if (gps == null) {
             gps = new Pair<Long,GuidePostsInfo>(0L,new GuidePostsInfo(0, Collections.<byte[]>emptyList()));
-            guidePostsMap.put(fam, gps);
+            guidePostsMap.put(cfKey, gps);
         }
         int kvLength = kv.getLength();
         long byteCount = gps.getFirst() + kvLength;
         gps.setFirst(byteCount);
         if (byteCount >= guidepostDepth) {
+            byte[] row = ByteUtil.copyKeyBytesIfNecessary(
+                    new ImmutableBytesWritable(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength()));
             if (gps.getSecond().addGuidePost(row, byteCount)) {
                 gps.setFirst(0L);
             }
         }
     }
 
-    public byte[] getMaxKey(String fam) {
-        if (maxMap.get(fam) != null) { return maxMap.get(fam); }
-        return null;
-    }
-
-    public byte[] getMinKey(String fam) {
-        if (minMap.get(fam) != null) { return minMap.get(fam); }
-        return null;
-    }
-
-    public GuidePostsInfo getGuidePosts(String fam) {
+    public GuidePostsInfo getGuidePosts(ImmutableBytesPtr fam) {
         Pair<Long,GuidePostsInfo> pair = guidePostsMap.get(fam);
         if (pair != null) {
             return pair.getSecond();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1e12e122/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
index fa3930d..51b6a6b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
@@ -20,7 +20,7 @@ import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 
 /**
  * The scanner that does the scanning to collect the stats during major compaction.{@link StatisticsCollector}
@@ -31,10 +31,10 @@ public class StatisticsScanner implements InternalScanner {
     private StatisticsWriter stats;
     private HRegion region;
     private StatisticsCollector tracker;
-    private byte[] family;
+    private ImmutableBytesPtr family;
 
     public StatisticsScanner(StatisticsCollector tracker, StatisticsWriter stats, HRegion region,
-            InternalScanner delegate, byte[] family) {
+            InternalScanner delegate, ImmutableBytesPtr family) {
         this.tracker = tracker;
         this.stats = stats;
         this.delegate = delegate;
@@ -83,12 +83,12 @@ public class StatisticsScanner implements InternalScanner {
                 LOG.debug("Deleting the stats for the region " + region.getRegionNameAsString()
                         + " as part of major compaction");
             }
-            stats.deleteStats(region.getRegionName(), this.tracker, Bytes.toString(family), mutations);
+            stats.deleteStats(region.getRegionName(), this.tracker, family, mutations);
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Adding new stats for the region " + region.getRegionNameAsString()
                         + " as part of major compaction");
             }
-            stats.addStats(region.getRegionName(), this.tracker, Bytes.toString(family), mutations);
+            stats.addStats(region.getRegionName(), this.tracker, family, mutations);
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Committing new stats for the region " + region.getRegionNameAsString()
                         + " as part of major compaction");

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1e12e122/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
index 2a7047f..bf9d80e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.util.ByteUtil;
@@ -50,15 +51,15 @@ public class StatisticsUtil {
     /** Number of parts in our complex key */
     protected static final int NUM_KEY_PARTS = 3;
 
-    public static byte[] getRowKey(byte[] table, byte[] fam, byte[] region) {
+    public static byte[] getRowKey(byte[] table, ImmutableBytesPtr fam, byte[] region) {
         // always starts with the source table
-        byte[] rowKey = new byte[table.length + fam.length + region.length + 2];
+        byte[] rowKey = new byte[table.length + fam.getLength() + region.length + 2];
         int offset = 0;
         System.arraycopy(table, 0, rowKey, offset, table.length);
         offset += table.length;
         rowKey[offset++] = QueryConstants.SEPARATOR_BYTE;
-        System.arraycopy(fam, 0, rowKey, offset, fam.length);
-        offset += fam.length;
+        System.arraycopy(fam.get(), fam.getOffset(), rowKey, offset, fam.getLength());
+        offset += fam.getLength();
         rowKey[offset++] = QueryConstants.SEPARATOR_BYTE;
         System.arraycopy(region, 0, rowKey, offset, region.length);
         return rowKey;
@@ -68,7 +69,7 @@ public class StatisticsUtil {
         return Arrays.copyOfRange(kv.getRowArray(), kv.getRowOffset(), kv.getRowOffset() + kv.getRowLength());
     }
 
-    public static Result readRegionStatistics(HTableInterface statsHTable, byte[] tableNameBytes, byte[] cf, byte[] regionName, long clientTimeStamp)
+    public static Result readRegionStatistics(HTableInterface statsHTable, byte[] tableNameBytes, ImmutableBytesPtr cf, byte[] regionName, long clientTimeStamp)
             throws IOException {
         byte[] prefix = StatisticsUtil.getRowKey(tableNameBytes, cf, regionName);
         Get get = new Get(prefix);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1e12e122/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
index f70c327..9b6efc9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRo
 import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PDataType;
@@ -102,15 +103,14 @@ public class StatisticsWriter implements Closeable {
         statsWriterTable.close();
     }
 
-    public void splitStats(HRegion p, HRegion l, HRegion r, StatisticsCollector tracker, String fam, List<Mutation> mutations) throws IOException {
+    public void splitStats(HRegion p, HRegion l, HRegion r, StatisticsCollector tracker, ImmutableBytesPtr cfKey, List<Mutation> mutations) throws IOException {
         if (tracker == null) { return; }
         boolean useMaxTimeStamp = clientTimeStamp == StatisticsCollector.NO_TIMESTAMP;
         if (!useMaxTimeStamp) {
             mutations.add(getLastStatsUpdatedTimePut(clientTimeStamp));
         }
         long readTimeStamp = useMaxTimeStamp ? HConstants.LATEST_TIMESTAMP : clientTimeStamp;
-        byte[] famBytes = PDataType.VARCHAR.toBytes(fam);
-        Result result = StatisticsUtil.readRegionStatistics(statsReaderTable, tableName, famBytes, p.getRegionName(), readTimeStamp);
+        Result result = StatisticsUtil.readRegionStatistics(statsReaderTable, tableName, cfKey, p.getRegionName(), readTimeStamp);
         if (result != null && !result.isEmpty()) {
         	Cell cell = result.getColumnLatestCell(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_BYTES);
 
@@ -118,7 +118,7 @@ public class StatisticsWriter implements Closeable {
                 long writeTimeStamp = useMaxTimeStamp ? cell.getTimestamp() : clientTimeStamp;
 
                 GuidePostsInfo guidePosts = GuidePostsInfo.fromBytes(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
-                byte[] pPrefix = StatisticsUtil.getRowKey(tableName, famBytes, p.getRegionName());
+                byte[] pPrefix = StatisticsUtil.getRowKey(tableName, cfKey, p.getRegionName());
                 mutations.add(new Delete(pPrefix, writeTimeStamp));
                 
 	        	long byteSize = 0;
@@ -139,14 +139,14 @@ public class StatisticsWriter implements Closeable {
 	            if (midEndIndex > 0) {
 	                GuidePostsInfo lguidePosts = new GuidePostsInfo(byteSize, guidePosts.getGuidePosts().subList(0, midEndIndex));
 	                tracker.clear();
-	                tracker.addGuidePost(fam, lguidePosts, byteSize, cell.getTimestamp());
-	                addStats(l.getRegionName(), tracker, fam, mutations);
+	                tracker.addGuidePost(cfKey, lguidePosts, byteSize, cell.getTimestamp());
+	                addStats(l.getRegionName(), tracker, cfKey, mutations);
 	            }
 	            if (midStartIndex < guidePosts.getGuidePosts().size()) {
 	                GuidePostsInfo rguidePosts = new GuidePostsInfo(byteSize, guidePosts.getGuidePosts().subList(midStartIndex, guidePosts.getGuidePosts().size()));
 	                tracker.clear();
-	                tracker.addGuidePost(fam, rguidePosts, byteSize, cell.getTimestamp());
-	                addStats(r.getRegionName(), tracker, fam, mutations);
+	                tracker.addGuidePost(cfKey, rguidePosts, byteSize, cell.getTimestamp());
+	                addStats(r.getRegionName(), tracker, cfKey, mutations);
 	            }
         	}
         }
@@ -157,7 +157,7 @@ public class StatisticsWriter implements Closeable {
      * then we use Upsert queries to update the table
      * If the region gets splitted or the major compaction happens we update using HTable.put()
      * @param tracker - the statistics tracker
-     * @param fam -  the family for which the stats is getting collected.
+     * @param cfKey -  the family for which the stats is getting collected.
      * @param mutations - list of mutations that collects all the mutations to commit in a batch
      * @param tablekey - The table name
      * @param schemaName - the schema name associated with the table          
@@ -167,7 +167,7 @@ public class StatisticsWriter implements Closeable {
      *             if we fail to do any of the puts. Any single failure will prevent any future attempts for the remaining list of stats to
      *             update
      */
-    public void addStats(byte[] regionName, StatisticsCollector tracker, String fam, List<Mutation> mutations) throws IOException {
+    public void addStats(byte[] regionName, StatisticsCollector tracker, ImmutableBytesPtr cfKey, List<Mutation> mutations) throws IOException {
         if (tracker == null) { return; }
         boolean useMaxTimeStamp = clientTimeStamp == StatisticsCollector.NO_TIMESTAMP;
         long timeStamp = clientTimeStamp;
@@ -175,10 +175,9 @@ public class StatisticsWriter implements Closeable {
             timeStamp = tracker.getMaxTimeStamp();
             mutations.add(getLastStatsUpdatedTimePut(timeStamp));
         }
-        byte[] prefix = StatisticsUtil.getRowKey(tableName, PDataType.VARCHAR.toBytes(fam),
-                regionName);
+        byte[] prefix = StatisticsUtil.getRowKey(tableName, cfKey, regionName);
         Put put = new Put(prefix);
-        GuidePostsInfo gp = tracker.getGuidePosts(fam);
+        GuidePostsInfo gp = tracker.getGuidePosts(cfKey);
         if (gp != null) {
             put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_COUNT_BYTES,
                     timeStamp, PDataType.LONG.toBytes((gp.getGuidePosts().size())));
@@ -187,16 +186,6 @@ public class StatisticsWriter implements Closeable {
             put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH_BYTES,
                     timeStamp, PDataType.LONG.toBytes(gp.getByteCount()));
         }
-        byte[] minKey = tracker.getMinKey(fam);
-        if (minKey != null) {
-	        put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.MIN_KEY_BYTES,
-	                timeStamp, PDataType.VARBINARY.toBytes(minKey));
-        }
-        byte[] maxKey = tracker.getMaxKey(fam);
-        if (maxKey != null) {
-	        put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.MAX_KEY_BYTES,
-	                timeStamp, PDataType.VARBINARY.toBytes(maxKey));
-        }
         // Add our empty column value so queries behave correctly
         put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES,
                 timeStamp, ByteUtil.EMPTY_BYTE_ARRAY);
@@ -248,11 +237,10 @@ public class StatisticsWriter implements Closeable {
         statsWriterTable.put(put);
     }
     
-    public void deleteStats(byte[] regionName, StatisticsCollector tracker, String fam, List<Mutation> mutations)
+    public void deleteStats(byte[] regionName, StatisticsCollector tracker, ImmutableBytesPtr fam, List<Mutation> mutations)
             throws IOException {
         long timeStamp = clientTimeStamp == StatisticsCollector.NO_TIMESTAMP ? tracker.getMaxTimeStamp() : clientTimeStamp;
-        byte[] prefix = StatisticsUtil.getRowKey(tableName, PDataType.VARCHAR.toBytes(fam),
-                regionName);
+        byte[] prefix = StatisticsUtil.getRowKey(tableName, fam, regionName);
         mutations.add(new Delete(prefix, timeStamp - 1));
     }
 }
\ No newline at end of file