You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2016/02/02 04:59:48 UTC
[14/50] [abbrv] phoenix git commit: PHOENIX-2417 Compress memory used
by row key byte[] of guideposts (Ankit Singhal)
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a27cecc/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index 1611466..f5c9295 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -75,6 +75,7 @@ import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.protobuf.HBaseZeroCopyByteString;
import com.sun.istack.NotNull;
import co.cask.tephra.TxConstants;
@@ -1016,27 +1017,17 @@ public class PTableImpl implements PTable {
boolean isImmutableRows = table.getIsImmutableRows();
SortedMap<byte[], GuidePostsInfo> tableGuidePosts = new TreeMap<byte[], GuidePostsInfo>(Bytes.BYTES_COMPARATOR);
- for (PTableProtos.PTableStats pTableStatsProto : table.getGuidePostsList()) {
- List<byte[]> value = Lists.newArrayListWithExpectedSize(pTableStatsProto.getValuesCount());
- for (int j = 0; j < pTableStatsProto.getValuesCount(); j++) {
- value.add(pTableStatsProto.getValues(j).toByteArray());
- }
- // No op
- pTableStatsProto.getGuidePostsByteCount();
- value = Lists.newArrayListWithExpectedSize(pTableStatsProto.getValuesCount());
- PGuidePosts pGuidePosts = pTableStatsProto.getPGuidePosts();
- for(int j = 0; j < pGuidePosts.getGuidePostsCount(); j++) {
- value.add(pGuidePosts.getGuidePosts(j).toByteArray());
- }
- long guidePostsByteCount = pGuidePosts.getByteCount();
- long rowCount = pGuidePosts.getRowCount();
- // TODO : Not exposing MIN/MAX key outside to client
- GuidePostsInfo info =
- new GuidePostsInfo(guidePostsByteCount, value, rowCount);
- tableGuidePosts.put(pTableStatsProto.getKey().toByteArray(), info);
+ for (PTableProtos.PTableStats pTableStatsProto : table.getGuidePostsList()) {
+ PGuidePosts pGuidePosts = pTableStatsProto.getPGuidePosts();
+ long guidePostsByteCount = pGuidePosts.getByteCount();
+ long rowCount = pGuidePosts.getRowCount();
+ int maxLength = pGuidePosts.getMaxLength();
+ int guidePostsCount = pGuidePosts.getEncodedGuidePostsCount();
+ GuidePostsInfo info = new GuidePostsInfo(guidePostsByteCount,
+ new ImmutableBytesWritable(HBaseZeroCopyByteString.zeroCopyGetBytes(pGuidePosts.getEncodedGuidePosts())), rowCount, maxLength, guidePostsCount);
+ tableGuidePosts.put(pTableStatsProto.getKey().toByteArray(), info);
}
PTableStats stats = new PTableStatsImpl(tableGuidePosts, table.getStatsTimeStamp());
-
PName dataTableName = null;
if (table.hasDataTableNameBytes()) {
dataTableName = PNameFactory.newName(table.getDataTableNameBytes().toByteArray());
@@ -1141,16 +1132,14 @@ public class PTableImpl implements PTable {
for (Map.Entry<byte[], GuidePostsInfo> entry : table.getTableStats().getGuidePosts().entrySet()) {
PTableProtos.PTableStats.Builder statsBuilder = PTableProtos.PTableStats.newBuilder();
statsBuilder.setKey(ByteStringer.wrap(entry.getKey()));
- for (byte[] stat : entry.getValue().getGuidePosts()) {
- statsBuilder.addValues(ByteStringer.wrap(stat));
- }
statsBuilder.setGuidePostsByteCount(entry.getValue().getByteCount());
+ statsBuilder.setGuidePostsCount(entry.getValue().getGuidePostsCount());
PGuidePostsProtos.PGuidePosts.Builder guidePstsBuilder = PGuidePostsProtos.PGuidePosts.newBuilder();
- for (byte[] stat : entry.getValue().getGuidePosts()) {
- guidePstsBuilder.addGuidePosts(ByteStringer.wrap(stat));
- }
+ guidePstsBuilder.setEncodedGuidePosts(ByteStringer.wrap(entry.getValue().getGuidePosts().get()));
guidePstsBuilder.setByteCount(entry.getValue().getByteCount());
guidePstsBuilder.setRowCount(entry.getValue().getRowCount());
+ guidePstsBuilder.setMaxLength(entry.getValue().getMaxLength());
+ guidePstsBuilder.setEncodedGuidePostsCount(entry.getValue().getGuidePostsCount());
statsBuilder.setPGuidePosts(guidePstsBuilder);
builder.addGuidePosts(statsBuilder.build());
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a27cecc/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/GuidePostsInfo.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/GuidePostsInfo.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/GuidePostsInfo.java
index 0f1dbeb..da7d3a5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/GuidePostsInfo.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/GuidePostsInfo.java
@@ -17,14 +17,8 @@
*/
package org.apache.phoenix.schema.stats;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.util.Bytes;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.util.ByteUtil;
/**
* A class that holds the guidePosts of a region and also allows combining the
* guidePosts of different regions when the GuidePostsInfo is formed for a table.
@@ -34,7 +28,7 @@ public class GuidePostsInfo {
/**
* the total number of guidePosts for the table combining all the guidePosts per region per cf.
*/
- private List<byte[]> guidePosts;
+ private ImmutableBytesWritable guidePosts;
/**
* The bytecount that is flattened across the total number of guide posts.
*/
@@ -45,7 +39,19 @@ public class GuidePostsInfo {
*/
private long rowCount = 0;
- private long keyByteSize; // Total number of bytes in keys stored in guidePosts
+ /**
+ * Maximum length of a guidePost collected
+ */
+ private int maxLength;
+
+ public final static GuidePostsInfo EMPTY_GUIDEPOST = new GuidePostsInfo(0,
+ new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY), 0, 0, 0);
+
+ public int getMaxLength() {
+ return maxLength;
+ }
+
+ private int guidePostsCount;
/**
* Constructor that creates GuidePostsInfo per region
@@ -53,22 +59,20 @@ public class GuidePostsInfo {
* @param guidePosts
* @param rowCount
*/
- public GuidePostsInfo(long byteCount, List<byte[]> guidePosts, long rowCount) {
- this.guidePosts = ImmutableList.copyOf(guidePosts);
- int size = 0;
- for (byte[] key : guidePosts) {
- size += key.length;
- }
- this.keyByteSize = size;
+ public GuidePostsInfo(long byteCount, ImmutableBytesWritable guidePosts, long rowCount, int maxLength, int guidePostsCount) {
+ this.guidePosts = new ImmutableBytesWritable(guidePosts);
+ this.maxLength = maxLength;
this.byteCount = byteCount;
this.rowCount = rowCount;
+ this.guidePostsCount = guidePostsCount;
}
+
public long getByteCount() {
return byteCount;
}
- public List<byte[]> getGuidePosts() {
+ public ImmutableBytesWritable getGuidePosts() {
return guidePosts;
}
@@ -76,70 +80,8 @@ public class GuidePostsInfo {
return this.rowCount;
}
- public void incrementRowCount() {
- this.rowCount++;
- }
-
- /**
- * Combines the GuidePosts per region into one.
- * @param oldInfo
- */
- public void combine(GuidePostsInfo oldInfo) {
- if (!oldInfo.getGuidePosts().isEmpty()) {
- byte[] newFirstKey = oldInfo.getGuidePosts().get(0);
- byte[] existingLastKey;
- if (!this.getGuidePosts().isEmpty()) {
- existingLastKey = this.getGuidePosts().get(this.getGuidePosts().size() - 1);
- } else {
- existingLastKey = HConstants.EMPTY_BYTE_ARRAY;
- }
- int size = oldInfo.getGuidePosts().size();
- // If the existing guidePosts is lesser than the new RegionInfo that we are combining
- // then add the new Region info to the end of the current GuidePosts.
- // If the new region info is smaller than the existing guideposts then add the existing
- // guide posts after the new guideposts.
- List<byte[]> newTotalGuidePosts = new ArrayList<byte[]>(this.getGuidePosts().size() + size);
- if (Bytes.compareTo(existingLastKey, newFirstKey) <= 0) {
- newTotalGuidePosts.addAll(this.getGuidePosts());
- newTotalGuidePosts.addAll(oldInfo.getGuidePosts());
- } else {
- newTotalGuidePosts.addAll(oldInfo.getGuidePosts());
- newTotalGuidePosts.addAll(this.getGuidePosts());
- }
- this.guidePosts = ImmutableList.copyOf(newTotalGuidePosts);
- }
- this.byteCount += oldInfo.getByteCount();
- this.keyByteSize += oldInfo.keyByteSize;
- this.rowCount += oldInfo.getRowCount();
- }
-
- /**
- * The guide posts, rowCount and byteCount are accumulated every time a guidePosts depth is
- * reached while collecting stats.
- * @param row
- * @param byteCount
- * @return
- */
- public boolean addGuidePost(byte[] row, long byteCount, long rowCount) {
- if (guidePosts.isEmpty() || Bytes.compareTo(row, guidePosts.get(guidePosts.size() - 1)) > 0) {
- List<byte[]> newGuidePosts = Lists.newArrayListWithExpectedSize(this.getGuidePosts().size() + 1);
- newGuidePosts.addAll(guidePosts);
- newGuidePosts.add(row);
- this.guidePosts = ImmutableList.copyOf(newGuidePosts);
- this.byteCount += byteCount;
- this.keyByteSize += row.length;
- this.rowCount+=rowCount;
- return true;
- }
- return false;
- }
-
- public boolean addGuidePost(byte[] row) {
- return addGuidePost(row, 0, 0);
- }
-
- public boolean addGuidePost(byte[] row, long byteCount) {
- return addGuidePost(row, byteCount, 0);
+ public int getGuidePostsCount() {
+ return guidePostsCount;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a27cecc/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/GuidePostsInfoBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/GuidePostsInfoBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/GuidePostsInfoBuilder.java
new file mode 100644
index 0000000..f3ada82
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/GuidePostsInfoBuilder.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.stats;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.PrefixByteCodec;
+import org.apache.phoenix.util.PrefixByteEncoder;
+import org.apache.phoenix.util.TrustedByteArrayOutputStream;
+
+/*
+ * Builder to help in adding guidePosts and building guidePostInfo. This is used when we are collecting stats or reading stats for a table.
+ */
+
+public class GuidePostsInfoBuilder {
+ private PrefixByteEncoder encoder;
+ private byte[] lastRow;
+ private ImmutableBytesWritable guidePosts=new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY);
+ private long byteCount = 0;
+ private int guidePostsCount;
+
+ /**
+ * The rowCount that is flattened across the total number of guide posts.
+ */
+ private long rowCount = 0;
+
+ /**
+ * Maximum length of a guidePost collected
+ */
+ private int maxLength;
+ private DataOutputStream output;
+ private TrustedByteArrayOutputStream stream;
+
+ public final static GuidePostsInfo EMPTY_GUIDEPOST = new GuidePostsInfo(0,
+ new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY), 0, 0, 0);
+
+ public int getMaxLength() {
+ return maxLength;
+ }
+ public GuidePostsInfoBuilder(){
+ this.stream = new TrustedByteArrayOutputStream(1);
+ this.output = new DataOutputStream(stream);
+ this.encoder=new PrefixByteEncoder();
+ lastRow = ByteUtil.EMPTY_BYTE_ARRAY;
+ }
+
+ /**
+ * The guide posts, rowCount and byteCount are accumulated every time a guidePosts depth is
+ * reached while collecting stats.
+ * @param row
+ * @param byteCount
+ * @return
+ * @throws IOException
+ */
+ public boolean addGuidePosts( byte[] row, long byteCount, long rowCount) {
+ if (row.length != 0 && Bytes.compareTo(lastRow, row) < 0) {
+ try {
+ encoder.encode(output, row, 0, row.length);
+ this.byteCount += byteCount;
+ this.guidePostsCount++;
+ this.maxLength = encoder.getMaxLength();
+ this.rowCount += rowCount;
+ lastRow = row;
+ return true;
+ } catch (IOException e) {
+ return false;
+ }
+ }
+ return false;
+ }
+
+ public boolean addGuidePosts(byte[] row){
+ return addGuidePosts(row, 0, 0);
+ }
+
+ public boolean addGuidePosts(byte[] row, long byteCount){
+ return addGuidePosts(row, byteCount, 0);
+ }
+
+ private void close() {
+ PrefixByteCodec.close(stream);
+ }
+
+ public GuidePostsInfo build() {
+ this.guidePosts.set(stream.getBuffer(), 0, stream.size());
+ GuidePostsInfo guidePostsInfo = new GuidePostsInfo(this.byteCount, this.guidePosts, this.rowCount, this.maxLength, this.guidePostsCount);
+ this.close();
+ return guidePostsInfo;
+ }
+ public void incrementRowCount() {
+ this.rowCount++;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a27cecc/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStatsImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStatsImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStatsImpl.java
index dc70e86..dacc213 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStatsImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStatsImpl.java
@@ -17,13 +17,19 @@
*/
package org.apache.phoenix.schema.stats;
-import java.util.List;
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.EOFException;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.util.PrefixByteCodec;
+import org.apache.phoenix.util.PrefixByteDecoder;
import org.apache.phoenix.util.SizedUtil;
import com.sun.istack.NotNull;
@@ -47,12 +53,10 @@ public class PTableStatsImpl implements PTableStats {
for (Map.Entry<byte[], GuidePostsInfo> entry : guidePosts.entrySet()) {
byte[] cf = entry.getKey();
estimatedSize += SizedUtil.ARRAY_SIZE + cf.length;
- List<byte[]> keys = entry.getValue().getGuidePosts();
- estimatedSize += SizedUtil.sizeOfArrayList(keys.size());
- for (byte[] key : keys) {
- estimatedSize += SizedUtil.ARRAY_SIZE + key.length;
- }
+ estimatedSize += entry.getValue().getGuidePosts().getLength();
estimatedSize += SizedUtil.LONG_SIZE;
+ estimatedSize += SizedUtil.INT_SIZE;
+ estimatedSize += SizedUtil.INT_SIZE;
}
this.estimatedSize = estimatedSize;
}
@@ -65,19 +69,34 @@ public class PTableStatsImpl implements PTableStats {
@Override
public String toString() {
StringBuilder buf = new StringBuilder();
+
buf.append("PTableStats [");
for (Map.Entry<byte[], GuidePostsInfo> entry : guidePosts.entrySet()) {
buf.append(Bytes.toStringBinary(entry.getKey()));
buf.append(":(");
- List<byte[]> keys = entry.getValue().getGuidePosts();
- if (!keys.isEmpty()) {
- for (byte[] key : keys) {
- buf.append(Bytes.toStringBinary(key));
- buf.append(",");
+ ImmutableBytesWritable keys = entry.getValue().getGuidePosts();
+ ByteArrayInputStream stream = new ByteArrayInputStream(keys.get(), keys.getOffset(), keys.getLength());
+ try {
+ if (keys.getLength() != 0) {
+ DataInput input = new DataInputStream(stream);
+ PrefixByteDecoder decoder = new PrefixByteDecoder(entry.getValue().getMaxLength());
+ try {
+ while (true) {
+ ImmutableBytesWritable ptr = PrefixByteCodec.decode(decoder, input);
+ buf.append(Bytes.toStringBinary(ptr.get()));
+ buf.append(",");
+ }
+ } catch (EOFException e) { // Ignore as this signifies we're done
+
+ } finally {
+ PrefixByteCodec.close(stream);
+ }
+ buf.setLength(buf.length() - 1);
}
- buf.setLength(buf.length()-1);
+ buf.append(")");
+ } finally {
+ PrefixByteCodec.close(stream);
}
- buf.append(")");
}
buf.append("]");
return buf.toString();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a27cecc/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
index fc8d8bd..3462f22 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
@@ -19,7 +19,6 @@ package org.apache.phoenix.schema.stats;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -61,9 +60,9 @@ public class StatisticsCollector {
private long guidepostDepth;
private long maxTimeStamp = MetaDataProtocol.MIN_TABLE_TIMESTAMP;
- private Map<ImmutableBytesPtr, Pair<Long, GuidePostsInfo>> guidePostsMap = Maps.newHashMap();
+ private Map<ImmutableBytesPtr, Pair<Long, GuidePostsInfoBuilder>> guidePostsInfoWriterMap = Maps.newHashMap();
protected StatisticsWriter statsTable;
- private Pair<Long, GuidePostsInfo> cachedGps = null;
+ private Pair<Long, GuidePostsInfoBuilder> cachedGps = null;
public StatisticsCollector(RegionCoprocessorEnvironment env, String tableName, long clientTimeStamp)
throws IOException {
@@ -99,8 +98,8 @@ public class StatisticsCollector {
// in a compaction we know the one family ahead of time
if (family != null) {
ImmutableBytesPtr cfKey = new ImmutableBytesPtr(family);
- cachedGps = new Pair<Long, GuidePostsInfo>(0l, new GuidePostsInfo(0, Collections.<byte[]> emptyList(), 0l));
- guidePostsMap.put(cfKey, cachedGps);
+ cachedGps = new Pair<Long, GuidePostsInfoBuilder>(0l, new GuidePostsInfoBuilder());
+ guidePostsInfoWriterMap.put(cfKey, cachedGps);
}
}
@@ -131,7 +130,7 @@ public class StatisticsCollector {
throws IOException {
try {
// update the statistics table
- for (ImmutableBytesPtr fam : guidePostsMap.keySet()) {
+ for (ImmutableBytesPtr fam : guidePostsInfoWriterMap.keySet()) {
if (delete) {
if (logger.isDebugEnabled()) {
logger.debug("Deleting the stats for the region " + region.getRegionInfo());
@@ -161,22 +160,22 @@ public class StatisticsCollector {
*/
public void collectStatistics(final List<Cell> results) {
Map<ImmutableBytesPtr, Boolean> famMap = Maps.newHashMap();
- List<GuidePostsInfo> rowTracker = null;
+ List<GuidePostsInfoBuilder> rowTracker = null;
if (cachedGps == null) {
- rowTracker = new ArrayList<GuidePostsInfo>();
+ rowTracker = new ArrayList<GuidePostsInfoBuilder>();
}
for (Cell cell : results) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
maxTimeStamp = Math.max(maxTimeStamp, kv.getTimestamp());
- Pair<Long, GuidePostsInfo> gps;
+ Pair<Long, GuidePostsInfoBuilder> gps;
if (cachedGps == null) {
ImmutableBytesPtr cfKey = new ImmutableBytesPtr(kv.getFamilyArray(), kv.getFamilyOffset(),
kv.getFamilyLength());
- gps = guidePostsMap.get(cfKey);
+ gps = guidePostsInfoWriterMap.get(cfKey);
if (gps == null) {
- gps = new Pair<Long, GuidePostsInfo>(0l,
- new GuidePostsInfo(0, Collections.<byte[]> emptyList(), 0l));
- guidePostsMap.put(cfKey, gps);
+ gps = new Pair<Long, GuidePostsInfoBuilder>(0l,
+ new GuidePostsInfoBuilder());
+ guidePostsInfoWriterMap.put(cfKey, gps);
}
if (famMap.get(cfKey) == null) {
famMap.put(cfKey, true);
@@ -191,13 +190,13 @@ public class StatisticsCollector {
if (byteCount >= guidepostDepth) {
byte[] row = ByteUtil.copyKeyBytesIfNecessary(
new ImmutableBytesWritable(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength()));
- if (gps.getSecond().addGuidePost(row, byteCount)) {
+ if (gps.getSecond().addGuidePosts(row, byteCount)) {
gps.setFirst(0l);
}
}
}
if (cachedGps == null) {
- for (GuidePostsInfo s : rowTracker) {
+ for (GuidePostsInfoBuilder s : rowTracker) {
s.incrementRowCount();
}
} else {
@@ -221,24 +220,13 @@ public class StatisticsCollector {
}
public void clear() {
- this.guidePostsMap.clear();
+ this.guidePostsInfoWriterMap.clear();
maxTimeStamp = MetaDataProtocol.MIN_TABLE_TIMESTAMP;
}
- public void addGuidePost(ImmutableBytesPtr cfKey, GuidePostsInfo info, long byteSize, long timestamp,
- byte[] minKey) {
- Pair<Long, GuidePostsInfo> newInfo = new Pair<Long, GuidePostsInfo>(byteSize, info);
- Pair<Long, GuidePostsInfo> oldInfo = guidePostsMap.put(cfKey, newInfo);
- if (oldInfo != null) {
- info.combine(oldInfo.getSecond());
- newInfo.setFirst(oldInfo.getFirst() + newInfo.getFirst());
- }
- maxTimeStamp = Math.max(maxTimeStamp, timestamp);
- }
-
public GuidePostsInfo getGuidePosts(ImmutableBytesPtr fam) {
- Pair<Long, GuidePostsInfo> pair = guidePostsMap.get(fam);
- if (pair != null) { return pair.getSecond(); }
+ Pair<Long, GuidePostsInfoBuilder> pair = guidePostsInfoWriterMap.get(fam);
+ if (pair != null) { return pair.getSecond().build(); }
return null;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a27cecc/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
index d6f0bf1..13e9491 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
@@ -134,7 +134,8 @@ public class StatisticsScanner implements InternalScanner {
} finally {
try {
collectionTracker.removeCompactingRegion(regionInfo);
- stats.close();
+ stats.close();// close the writer
+ tracker.close();// close the tracker
} catch (IOException e) {
if (toThrow == null) toThrow = e;
LOG.error("Error while closing the stats table", e);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a27cecc/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
index 16f249c..5b47104 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
@@ -21,8 +21,8 @@ import static org.apache.phoenix.util.SchemaUtil.getVarCharLength;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
+import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.hadoop.hbase.Cell;
@@ -55,10 +55,14 @@ public class StatisticsUtil {
/** Number of parts in our complex key */
protected static final int NUM_KEY_PARTS = 3;
-
+
public static byte[] getRowKey(byte[] table, ImmutableBytesPtr fam, byte[] guidePostStartKey) {
+ return getRowKey(table, fam, new ImmutableBytesWritable(guidePostStartKey,0,guidePostStartKey.length));
+ }
+
+ public static byte[] getRowKey(byte[] table, ImmutableBytesPtr fam, ImmutableBytesWritable guidePostStartKey) {
// always starts with the source table
- byte[] rowKey = new byte[table.length + fam.getLength() + guidePostStartKey.length + 2];
+ byte[] rowKey = new byte[table.length + fam.getLength() + guidePostStartKey.getLength() + 2];
int offset = 0;
System.arraycopy(table, 0, rowKey, offset, table.length);
offset += table.length;
@@ -66,7 +70,7 @@ public class StatisticsUtil {
System.arraycopy(fam.get(), fam.getOffset(), rowKey, offset, fam.getLength());
offset += fam.getLength();
rowKey[offset++] = QueryConstants.SEPARATOR_BYTE; // assumes stats table columns not DESC
- System.arraycopy(guidePostStartKey, 0, rowKey, offset, guidePostStartKey.length);
+ System.arraycopy(guidePostStartKey.get(), 0, rowKey, offset, guidePostStartKey.getLength());
return rowKey;
}
@@ -126,7 +130,7 @@ public class StatisticsUtil {
s.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES);
ResultScanner scanner = null;
long timeStamp = MetaDataProtocol.MIN_TABLE_TIMESTAMP;
- TreeMap<byte[], GuidePostsInfo> guidePostsPerCf = new TreeMap<byte[], GuidePostsInfo>(Bytes.BYTES_COMPARATOR);
+ TreeMap<byte[], GuidePostsInfoBuilder> guidePostsInfoWriterPerCf = new TreeMap<byte[], GuidePostsInfoBuilder>(Bytes.BYTES_COMPARATOR);
try {
scanner = statsHTable.getScanner(s);
Result result = null;
@@ -168,23 +172,33 @@ public class StatisticsUtil {
}
if (cfName != null) {
byte[] newGPStartKey = getGuidePostsInfoFromRowKey(tableNameBytes, cfName, result.getRow());
- GuidePostsInfo guidePosts = guidePostsPerCf.get(cfName);
- if (guidePosts == null) {
- guidePosts = new GuidePostsInfo(0l, Collections.<byte[]> emptyList(), 0l);
- guidePostsPerCf.put(cfName, guidePosts);
+ GuidePostsInfoBuilder guidePostsInfoWriter = guidePostsInfoWriterPerCf.get(cfName);
+ if (guidePostsInfoWriter == null) {
+ guidePostsInfoWriter = new GuidePostsInfoBuilder();
+ guidePostsInfoWriterPerCf.put(cfName, guidePostsInfoWriter);
}
- guidePosts.addGuidePost(newGPStartKey, byteCount, rowCount);
+ guidePostsInfoWriter.addGuidePosts(newGPStartKey, byteCount, rowCount);
}
}
+ if (!guidePostsInfoWriterPerCf.isEmpty()) { return new PTableStatsImpl(
+ getGuidePostsPerCf(guidePostsInfoWriterPerCf), timeStamp); }
} finally {
if (scanner != null) {
scanner.close();
}
}
- if (!guidePostsPerCf.isEmpty()) { return new PTableStatsImpl(guidePostsPerCf, timeStamp); }
return PTableStats.EMPTY_STATS;
}
+ private static SortedMap<byte[], GuidePostsInfo> getGuidePostsPerCf(
+ TreeMap<byte[], GuidePostsInfoBuilder> guidePostsWriterPerCf) {
+ TreeMap<byte[], GuidePostsInfo> guidePostsPerCf = new TreeMap<byte[], GuidePostsInfo>(Bytes.BYTES_COMPARATOR);
+ for (byte[] key : guidePostsWriterPerCf.keySet()) {
+ guidePostsPerCf.put(key, guidePostsWriterPerCf.get(key).build());
+ }
+ return guidePostsPerCf;
+ }
+
public static long getGuidePostDepth(int guidepostPerRegion, long guidepostWidth, HTableDescriptor tableDesc) {
if (guidepostPerRegion > 0) {
long maxFileSize = HConstants.DEFAULT_MAX_FILE_SIZE;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a27cecc/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
index b8bd064..d03af7a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
@@ -17,7 +17,11 @@
*/
package org.apache.phoenix.schema.stats;
+import java.io.ByteArrayInputStream;
import java.io.Closeable;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.EOFException;
import java.io.IOException;
import java.sql.Date;
import java.util.List;
@@ -45,6 +49,8 @@ import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.types.PDate;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.PrefixByteCodec;
+import org.apache.phoenix.util.PrefixByteDecoder;
import org.apache.phoenix.util.ServerUtil;
import org.apache.phoenix.util.TimeKeeper;
@@ -103,6 +109,7 @@ public class StatisticsWriter implements Closeable {
@Override
public void close() throws IOException {
statsWriterTable.close();
+ statsReaderTable.close();
}
/**
@@ -134,21 +141,32 @@ public class StatisticsWriter implements Closeable {
GuidePostsInfo gps = tracker.getGuidePosts(cfKey);
if (gps != null) {
boolean rowColumnAdded = false;
- for (byte[] gp : gps.getGuidePosts()) {
- byte[] prefix = StatisticsUtil.getRowKey(tableName, cfKey, gp);
- Put put = new Put(prefix);
- if (!rowColumnAdded) {
- put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH_BYTES,
- timeStamp, PLong.INSTANCE.toBytes(gps.getByteCount()));
- put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
- PhoenixDatabaseMetaData.GUIDE_POSTS_ROW_COUNT_BYTES, timeStamp,
- PLong.INSTANCE.toBytes(gps.getRowCount()));
- rowColumnAdded = true;
+ ImmutableBytesWritable keys = gps.getGuidePosts();
+ ByteArrayInputStream stream = new ByteArrayInputStream(keys.get(), keys.getOffset(), keys.getLength());
+ DataInput input = new DataInputStream(stream);
+ PrefixByteDecoder decoder = new PrefixByteDecoder(gps.getMaxLength());
+ try {
+ while (true) {
+ ImmutableBytesWritable ptr = decoder.decode(input);
+ byte[] prefix = StatisticsUtil.getRowKey(tableName, cfKey, ptr);
+ Put put = new Put(prefix);
+ if (!rowColumnAdded) {
+ put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH_BYTES,
+ timeStamp, PLong.INSTANCE.toBytes(gps.getByteCount()));
+ put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
+ PhoenixDatabaseMetaData.GUIDE_POSTS_ROW_COUNT_BYTES, timeStamp,
+ PLong.INSTANCE.toBytes(gps.getRowCount()));
+ rowColumnAdded = true;
+ }
+ // Add our empty column value so queries behave correctly
+ put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, timeStamp,
+ ByteUtil.EMPTY_BYTE_ARRAY);
+ mutations.add(put);
}
- // Add our empty column value so queries behave correctly
- put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, timeStamp,
- ByteUtil.EMPTY_BYTE_ARRAY);
- mutations.add(put);
+ } catch (EOFException e) { // Ignore as this signifies we're done
+
+ } finally {
+ PrefixByteCodec.close(stream);
}
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a27cecc/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java
index 64d064a..44502a5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java
@@ -49,6 +49,8 @@ public class ByteUtil {
public static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
public static final ImmutableBytesPtr EMPTY_BYTE_ARRAY_PTR = new ImmutableBytesPtr(
EMPTY_BYTE_ARRAY);
+ public static final ImmutableBytesWritable EMPTY_IMMUTABLE_BYTE_ARRAY = new ImmutableBytesWritable(
+ EMPTY_BYTE_ARRAY);
public static final Comparator<ImmutableBytesPtr> BYTES_PTR_COMPARATOR = new Comparator<ImmutableBytesPtr>() {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a27cecc/phoenix-core/src/main/java/org/apache/phoenix/util/PrefixByteCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PrefixByteCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PrefixByteCodec.java
new file mode 100644
index 0000000..8c3aa80
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PrefixByteCodec.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import com.google.common.collect.Lists;
+
+public class PrefixByteCodec {
+
+ public static List<byte[]> decodeBytes(ImmutableBytesWritable encodedBytes, int maxLength) throws IOException {
+ ByteArrayInputStream stream = new ByteArrayInputStream(encodedBytes.get(), encodedBytes.getOffset(), encodedBytes.getLength());
+ DataInput input = new DataInputStream(stream);
+ PrefixByteDecoder decoder = new PrefixByteDecoder(maxLength);
+ List<byte[]> listOfBytes = Lists.newArrayList();
+ try {
+ while (true) {
+ ImmutableBytesWritable ptr = decoder.decode(input);
+ // For this test, copy the bytes, but we wouldn't do this unless
+ // necessary for non testing
+ listOfBytes.add(ptr.copyBytes());
+ }
+ } catch (EOFException e) { // Ignore as this signifies we're done
+
+ }
+ return listOfBytes;
+ }
+
+ public static int encodeBytes(List<byte[]> listOfBytes, ImmutableBytesWritable ptr) throws IOException {
+ TrustedByteArrayOutputStream stream = new TrustedByteArrayOutputStream(calculateSize(listOfBytes));
+ DataOutput output = new DataOutputStream(stream);
+ PrefixByteEncoder encoder = new PrefixByteEncoder();
+ for (byte[] bytes : listOfBytes) {
+ encoder.encode(output, bytes, 0, bytes.length);
+ }
+ close(stream);
+ ptr.set(stream.getBuffer(), 0, stream.size());
+ return encoder.getMaxLength();
+ }
+
+ public static int calculateSize(List<byte[]> listOfBytes) {
+ int size = 0;
+ for (byte[] bytes : listOfBytes) {
+ size += bytes.length;
+ }
+ return size;
+ }
+
+ public static ImmutableBytesWritable decode(PrefixByteDecoder decoder, DataInput input) throws EOFException {
+ try {
+ ImmutableBytesWritable val= decoder.decode(input);
+ return val;
+ } catch(EOFException eof){
+ throw eof;
+ }catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static void close(ByteArrayInputStream stream) {
+ if (stream != null) {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public static void close(ByteArrayOutputStream stream) {
+ if (stream != null) {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a27cecc/phoenix-core/src/main/java/org/apache/phoenix/util/PrefixByteDecoder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PrefixByteDecoder.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PrefixByteDecoder.java
new file mode 100644
index 0000000..c34bda8
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PrefixByteDecoder.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.io.DataInput;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ *
+ * Prefix decoder for byte arrays. For encoding, see {@link PrefixByteEncoder}.
+ *
+ */
+public class PrefixByteDecoder {
+ private final int maxLength;
+ private final ImmutableBytesWritable previous;
+
+ /**
+ * Used when the maximum length of encoded byte array is not known. Will
+ * cause a new byte array to be allocated for each call to {@link #decode(DataInput)}.
+ */
+ public PrefixByteDecoder() {
+ maxLength = -1;
+ previous = new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY);
+ }
+
+ /**
+ * Used when the maximum length of encoded byte array is known in advance.
+ * Will not allocate new byte array with each call to {@link #decode(DataInput)}.
+ * @param maxLength maximum length needed for any call to {@link #decode(DataInput)}.
+ */
+ public PrefixByteDecoder(int maxLength) {
+ if (maxLength > 0) {
+ this.maxLength = maxLength;
+ this.previous = new ImmutableBytesWritable(new byte[maxLength], 0, 0);
+ } else {
+ this.maxLength = -1;
+ previous = new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY);
+ }
+ }
+
+ /**
+ * Resets state of decoder if it will be used to decode bytes from a
+ * different DataInput.
+ */
+ public void reset() {
+ previous.set(previous.get(),0,0);
+ }
+
+ /**
+ * Decodes bytes encoded with {@link PrefixByteEncoder}.
+ * @param in Input from which bytes are read.
+ * @return Pointer containing bytes that were decoded. Note that the
+ * same pointer will be returned with each call, so it must be consumed
+ * prior to calling decode again.
+ * @throws IOException
+ */
+ public ImmutableBytesWritable decode(DataInput in) throws IOException {
+ int prefixLen = WritableUtils.readVInt(in);
+ int suffixLen = WritableUtils.readVInt(in);
+ int length = prefixLen + suffixLen;
+ byte[] b;
+ if (maxLength == -1) { // Allocate new byte array each time
+ b = new byte[length];
+ System.arraycopy(previous.get(), previous.getOffset(), b, 0, prefixLen);
+ } else { // Reuse same buffer each time
+ b = previous.get();
+ }
+ in.readFully(b, prefixLen, suffixLen);
+ previous.set(b, 0, length);
+ return previous;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a27cecc/phoenix-core/src/main/java/org/apache/phoenix/util/PrefixByteEncoder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PrefixByteEncoder.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PrefixByteEncoder.java
new file mode 100644
index 0000000..bf92be5
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PrefixByteEncoder.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ *
+ * Prefix encoder for byte arrays. For decoding, see {@link PrefixByteDecoder}.
+ *
+ */
+public class PrefixByteEncoder {
+ private int maxLength;
+ private final ImmutableBytesWritable previous = new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY);
+
+ public PrefixByteEncoder() {
+ }
+
+ /**
+ * Resets the state of the encoder to its initial state (i.e. forgetting
+ * the previous byte array that may have been encoded).
+ */
+ public void reset() {
+ previous.set(ByteUtil.EMPTY_BYTE_ARRAY);
+ }
+
+ /**
+ * @return the maximum length byte array encountered while encoding
+ */
+ public int getMaxLength() {
+ return maxLength;
+ }
+
+ /**
+ * Prefix encodes the byte array pointed to into the output stream
+ * @param out output stream to encode into
+ * @param ptr pointer to byte array to encode.
+ * @throws IOException
+ */
+ public void encode(DataOutput out, ImmutableBytesWritable ptr) throws IOException {
+ encode(out, ptr.get(), ptr.getOffset(), ptr.getLength());
+ }
+
+ /**
+ * Prefix encodes the byte array into the output stream
+ * @param out output stream to encode into
+ * @param b byte array to encode
+ * @throws IOException
+ */
+ public void encode(DataOutput out, byte[] b) throws IOException {
+ encode(out, b, 0, b.length);
+ }
+
+ /**
+ * Prefix encodes the byte array from offset to length into output stream.
+ * Instead of writing the entire byte array, only the portion of the byte array
+ * that differs from the beginning of the previous byte array written is written.
+ *
+ * @param out output stream to encode into
+ * @param b byte array buffer
+ * @param offset offset into byte array to start encoding
+ * @param length length of byte array to encode
+ * @throws IOException
+ */
+ public void encode(DataOutput out, byte[] b, int offset, int length) throws IOException {
+ int i = 0;
+ int prevOffset = previous.getOffset();
+ byte[] prevBytes = previous.get();
+ int prevLength = previous.getLength();
+ int minLength = prevLength < b.length ? prevLength : b.length;
+ for(i = 0; (i < minLength) && (prevBytes[prevOffset + i] == b[offset + i]); i++);
+ WritableUtils.writeVInt(out, i);
+ Bytes.writeByteArray(out, b, offset + i, length - i);
+ previous.set(b, offset, length);
+ if (length > maxLength) {
+ maxLength = length;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a27cecc/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
index c931851..1d6f438 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
@@ -52,12 +52,14 @@ import java.util.Set;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
@@ -100,6 +102,7 @@ import com.google.common.collect.Sets;
public class UpgradeUtil {
private static final Logger logger = LoggerFactory.getLogger(UpgradeUtil.class);
private static final byte[] SEQ_PREFIX_BYTES = ByteUtil.concat(QueryConstants.SEPARATOR_BYTE_ARRAY, Bytes.toBytes("_SEQ_"));
+ public static final byte[] UPGRADE_TO_4_7_COLUMN_NAME = Bytes.toBytes("UPGRADE_TO_4_7");
public static String UPSERT_BASE_COLUMN_COUNT_IN_HEADER_ROW = "UPSERT "
+ "INTO SYSTEM.CATALOG "
@@ -1215,4 +1218,67 @@ public class UpgradeUtil {
MetaDataEndpointImpl.ROW_KEY_ORDER_OPTIMIZABLE_BYTES, PBoolean.INSTANCE.toBytes(true));
tableMetadata.add(put);
}
-}
+
+ public static boolean truncateStats(HTableInterface metaTable, HTableInterface statsTable)
+ throws IOException, InterruptedException {
+ List<Cell> columnCells = metaTable
+ .get(new Get(SchemaUtil.getTableKey(null, PhoenixDatabaseMetaData.SYSTEM_SCHEMA_NAME,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE)))
+ .getColumnCells(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES);
+ if (!columnCells.isEmpty()
+ && columnCells.get(0).getTimestamp() < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0) {
+
+ byte[] statsTableKey = SchemaUtil.getTableKey(null, PhoenixDatabaseMetaData.SYSTEM_SCHEMA_NAME,
+ PhoenixDatabaseMetaData.SYSTEM_STATS_TABLE);
+ KeyValue upgradeKV = KeyValueUtil.newKeyValue(statsTableKey, PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+ UPGRADE_TO_4_7_COLUMN_NAME, MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 1,
+ ByteUtil.EMPTY_BYTE_ARRAY);
+ Put upgradePut = new Put(statsTableKey);
+ upgradePut.add(upgradeKV);
+
+ // check for null in UPGRADE_TO_4_7_COLUMN_NAME in checkAndPut so that only single client
+ // drop the rows of SYSTEM.STATS
+ if (metaTable.checkAndPut(statsTableKey, PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+ UPGRADE_TO_4_7_COLUMN_NAME, null, upgradePut)) {
+ List<Mutation> mutations = Lists.newArrayListWithExpectedSize(1000);
+ Scan scan = new Scan();
+ scan.setRaw(true);
+ scan.setMaxVersions();
+ ResultScanner statsScanner = statsTable.getScanner(scan);
+ Result r;
+ mutations.clear();
+ int count = 0;
+ while ((r = statsScanner.next()) != null) {
+ Delete delete = null;
+ for (KeyValue keyValue : r.raw()) {
+ if (KeyValue.Type.codeToType(keyValue.getType()) == KeyValue.Type.Put) {
+ if (delete == null) {
+ delete = new Delete(keyValue.getRow());
+ }
+ KeyValue deleteKeyValue = new KeyValue(keyValue.getRowArray(), keyValue.getRowOffset(),
+ keyValue.getRowLength(), keyValue.getFamilyArray(), keyValue.getFamilyOffset(),
+ keyValue.getFamilyLength(), keyValue.getQualifierArray(),
+ keyValue.getQualifierOffset(), keyValue.getQualifierLength(),
+ keyValue.getTimestamp(), KeyValue.Type.Delete, ByteUtil.EMPTY_BYTE_ARRAY, 0, 0);
+ delete.addDeleteMarker(deleteKeyValue);
+ }
+ }
+ if (delete != null) {
+ mutations.add(delete);
+ if (count > 10) {
+ statsTable.batch(mutations);
+ mutations.clear();
+ count = 0;
+ }
+ count++;
+ }
+ }
+ if (!mutations.isEmpty()) {
+ statsTable.batch(mutations);
+ }
+ return true;
+ }
+ }
+ return false;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a27cecc/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java b/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java
index a0aca4d..bb5f408 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java
@@ -22,12 +22,10 @@ import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
-import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.SortedMap;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.end2end.Shadower;
@@ -40,6 +38,7 @@ import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableImpl;
import org.apache.phoenix.schema.PTableKey;
import org.apache.phoenix.schema.stats.GuidePostsInfo;
+import org.apache.phoenix.schema.stats.GuidePostsInfoBuilder;
import org.apache.phoenix.schema.stats.PTableStats;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ReadOnlyProps;
@@ -648,10 +647,11 @@ public class SkipScanBigFilterTest extends BaseConnectionlessQueryTest {
stmt.execute();
final PTable table = conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, "PERF.BIG_OLAP_DOC"));
- GuidePostsInfo info = new GuidePostsInfo(0,Collections.<byte[]> emptyList(), 0l);
+ GuidePostsInfoBuilder gpWriter = new GuidePostsInfoBuilder();
for (byte[] gp : guidePosts) {
- info.addGuidePost(gp, 1000);
+ gpWriter.addGuidePosts(gp, 1000);
}
+ GuidePostsInfo info = gpWriter.build();
final SortedMap<byte[], GuidePostsInfo> gpMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
gpMap.put(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, info);
PTable tableWithStats = PTableImpl.makePTable(table, new PTableStats() {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a27cecc/phoenix-core/src/test/java/org/apache/phoenix/util/PrefixByteEncoderDecoderTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/PrefixByteEncoderDecoderTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/PrefixByteEncoderDecoderTest.java
new file mode 100644
index 0000000..f8aa7db
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/PrefixByteEncoderDecoderTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.phoenix.query.QueryConstants;
+import org.junit.Test;
+
+
+public class PrefixByteEncoderDecoderTest {
+
+ static final List<byte[]> guideposts = Arrays.asList(
+ ByteUtil.concat(Bytes.toBytes("aaaaaaaaaa"), QueryConstants.SEPARATOR_BYTE_ARRAY, Bytes.toBytes(1000L), Bytes.toBytes("bbbbbbbbbb")),
+ ByteUtil.concat(Bytes.toBytes("aaaaaaaaaa"), QueryConstants.SEPARATOR_BYTE_ARRAY, Bytes.toBytes(1000L), Bytes.toBytes("bbbbbccccc")),
+ ByteUtil.concat(Bytes.toBytes("aaaaaaaaaa"), QueryConstants.SEPARATOR_BYTE_ARRAY, Bytes.toBytes(2000L), Bytes.toBytes("bbbbbbbbbb")),
+ ByteUtil.concat(Bytes.toBytes("bbbbbbbbbb"), QueryConstants.SEPARATOR_BYTE_ARRAY, Bytes.toBytes(1000L), Bytes.toBytes("bbbbbbbbbb")),
+ ByteUtil.concat(Bytes.toBytes("bbbbbbbbbb"), QueryConstants.SEPARATOR_BYTE_ARRAY, Bytes.toBytes(2000L), Bytes.toBytes("bbbbbbbbbb")),
+ ByteUtil.concat(Bytes.toBytes("bbbbbbbbbb"), QueryConstants.SEPARATOR_BYTE_ARRAY, Bytes.toBytes(2000L), Bytes.toBytes("c")),
+ ByteUtil.concat(Bytes.toBytes("bbbbbbbbbbb"), QueryConstants.SEPARATOR_BYTE_ARRAY, Bytes.toBytes(1000L), Bytes.toBytes("bbbbbbbbbb")),
+ ByteUtil.concat(Bytes.toBytes("d"), QueryConstants.SEPARATOR_BYTE_ARRAY, Bytes.toBytes(1000L), Bytes.toBytes("bbbbbbbbbb")),
+ ByteUtil.concat(Bytes.toBytes("d"), QueryConstants.SEPARATOR_BYTE_ARRAY, Bytes.toBytes(1000L), Bytes.toBytes("bbbbbbbbbbc")),
+ ByteUtil.concat(Bytes.toBytes("e"), QueryConstants.SEPARATOR_BYTE_ARRAY, Bytes.toBytes(1000L), Bytes.toBytes("bbbbbbbbbb"))
+ );
+
+ @Test
+ public void testEncode() throws IOException {
+ List<byte[]> listOfBytes = Arrays.asList(Bytes.toBytes("aaaaa"), Bytes.toBytes("aaaabb"));
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ int maxLength = PrefixByteCodec.encodeBytes(listOfBytes, ptr);
+ assertEquals(6, maxLength);
+ TrustedByteArrayOutputStream stream = new TrustedByteArrayOutputStream(PrefixByteCodec.calculateSize(listOfBytes));
+ DataOutput output = new DataOutputStream(stream);
+ WritableUtils.writeVInt(output, 0);
+ WritableUtils.writeVInt(output, 5);
+ output.write(Bytes.toBytes("aaaaa")); // No space savings on first key
+ WritableUtils.writeVInt(output, 4);
+ WritableUtils.writeVInt(output, 2);
+ output.write(Bytes.toBytes("bb")); // Only writes part of second key that's different
+ assertArrayEquals(stream.toByteArray(), ptr.copyBytes());
+ }
+
+ @Test
+ public void testEncodeDecodeWithSingleBuffer() throws IOException {
+ testEncodeDecode(true);
+ }
+
+ @Test
+ public void testEncodeDecodeWithNewBuffer() throws IOException {
+ testEncodeDecode(false);
+ }
+
+ private void testEncodeDecode(boolean useSingleBuffer) throws IOException {
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ int maxLength = PrefixByteCodec.encodeBytes(guideposts, ptr);
+ int encodedSize = ptr.getLength();
+ int unencodedSize = PrefixByteCodec.calculateSize(guideposts);
+ assertTrue(encodedSize < unencodedSize);
+ List<byte[]> listOfBytes = PrefixByteCodec.decodeBytes(ptr, useSingleBuffer ? maxLength : -1);
+ assertListByteArraysEquals(guideposts, listOfBytes);
+ }
+
+ private static void assertListByteArraysEquals(List<byte[]> listOfBytes1, List<byte[]> listOfBytes2) {
+ assertEquals(listOfBytes1.size(), listOfBytes2.size());
+ for (int i = 0; i < listOfBytes1.size(); i++) {
+ assertArrayEquals(listOfBytes1.get(i), listOfBytes2.get(i));
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a27cecc/phoenix-protocol/src/main/PGuidePosts.proto
----------------------------------------------------------------------
diff --git a/phoenix-protocol/src/main/PGuidePosts.proto b/phoenix-protocol/src/main/PGuidePosts.proto
index 047a658..14de2eb 100644
--- a/phoenix-protocol/src/main/PGuidePosts.proto
+++ b/phoenix-protocol/src/main/PGuidePosts.proto
@@ -27,4 +27,7 @@ message PGuidePosts {
repeated bytes guidePosts = 1;
optional int64 byteCount = 2;
optional int64 rowCount = 3;
+ optional int32 maxLength = 4;
+ optional int32 encodedGuidePostsCount = 5;
+ optional bytes encodedGuidePosts = 6;
}
\ No newline at end of file