You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by bi...@apache.org on 2012/11/20 08:45:53 UTC
svn commit: r1411570 - in /pig/branches/branch-0.11: CHANGES.txt
src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
test/org/apache/pig/backend/hadoop/hbase/
test/org/apache/pig/backend/hadoop/hbase/TestHBaseStorageFiltering.java
Author: billgraham
Date: Tue Nov 20 07:45:52 2012
New Revision: 1411570
URL: http://svn.apache.org/viewvc?rev=1411570&view=rev
Log:
PIG-2934: HBaseStorage filter optimizations (billgraham)
Added:
pig/branches/branch-0.11/test/org/apache/pig/backend/hadoop/hbase/
pig/branches/branch-0.11/test/org/apache/pig/backend/hadoop/hbase/TestHBaseStorageFiltering.java
Modified:
pig/branches/branch-0.11/CHANGES.txt
pig/branches/branch-0.11/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
Modified: pig/branches/branch-0.11/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/CHANGES.txt?rev=1411570&r1=1411569&r2=1411570&view=diff
==============================================================================
--- pig/branches/branch-0.11/CHANGES.txt (original)
+++ pig/branches/branch-0.11/CHANGES.txt Tue Nov 20 07:45:52 2012
@@ -28,6 +28,8 @@ PIG-1891 Enable StoreFunc to make intell
IMPROVEMENTS
+PIG-2934: HBaseStorage filter optimizations (billgraham)
+
PIG-2980: documentation for DateTime datatype (zjshen via thejas)
PIG-2982: add unit tests for DateTime type that test setting timezone (zjshen via thejas)
Modified: pig/branches/branch-0.11/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1411570&r1=1411569&r2=1411570&view=diff
==============================================================================
--- pig/branches/branch-0.11/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original)
+++ pig/branches/branch-0.11/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Tue Nov 20 07:45:52 2012
@@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.filter.Co
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.WhileMatchFilter;
import org.apache.hadoop.hbase.filter.FamilyFilter;
import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.Filter;
@@ -114,6 +115,12 @@ import com.google.common.collect.Lists;
* the constructor in the above example would cause only columns that start with
* <i>bob_</i> to be loaded.
* <P>
+ * Note that when using a prefix like <code>friends:bob_*</code>, explicit HBase filters are set for
+ * all columns and prefixes specified. Querying HBase with many filters can cause performance
+ * degredation. This is typically seen when mixing one or more prefixed descriptors with a large list
+ * of columns. In that case better perfomance will be seen by either loading the entire family via
+ * <code>friends:*</code> or by specifying explicit column descriptor names.
+ * <P>
* Below is an example showing how to store data into HBase:
* <pre>{@code
* copy = STORE raw INTO 'hbase://SampleTableCopy'
@@ -235,6 +242,7 @@ public class HBaseStorage extends LoadFu
* <li>-minTimestamp= Scan's timestamp for min timeRange
* <li>-maxTimestamp= Scan's timestamp for max timeRange
* <li>-timestamp= Scan's specified timestamp
+ * <li>-caster=(HBaseBinaryConverter|Utf8StorageConverter) Utf8StorageConverter is the default
* To be used with extreme caution, since this could result in data loss
* (see http://hbase.apache.org/book.html#perf.hbase.client.putwal).
* </ul>
@@ -310,7 +318,7 @@ public class HBaseStorage extends LoadFu
timestamp_ = 0;
}
- initScan();
+ initScan();
}
/**
@@ -376,70 +384,146 @@ public class HBaseStorage extends LoadFu
if (configuredOptions_.hasOption("gt")) {
gt_ = Bytes.toBytesBinary(Utils.slashisize(configuredOptions_.getOptionValue("gt")));
addRowFilter(CompareOp.GREATER, gt_);
+ scan.setStartRow(gt_);
}
if (configuredOptions_.hasOption("lt")) {
lt_ = Bytes.toBytesBinary(Utils.slashisize(configuredOptions_.getOptionValue("lt")));
addRowFilter(CompareOp.LESS, lt_);
+ scan.setStopRow(lt_);
}
if (configuredOptions_.hasOption("gte")) {
gte_ = Bytes.toBytesBinary(Utils.slashisize(configuredOptions_.getOptionValue("gte")));
- addRowFilter(CompareOp.GREATER_OR_EQUAL, gte_);
+ scan.setStartRow(gte_);
}
if (configuredOptions_.hasOption("lte")) {
lte_ = Bytes.toBytesBinary(Utils.slashisize(configuredOptions_.getOptionValue("lte")));
- addRowFilter(CompareOp.LESS_OR_EQUAL, lte_);
+ byte[] lt = increment(lte_);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Incrementing lte value of %s from bytes %s to %s to set stop row",
+ Bytes.toString(lte_), toString(lte_), toString(lt)));
+ }
+
+ if (lt != null) {
+ scan.setStopRow(increment(lte_));
+ }
+
+ // The WhileMatchFilter will short-circuit the scan after we no longer match. The
+ // setStopRow call will limit the number of regions we need to scan
+ addFilter(new WhileMatchFilter(new RowFilter(CompareOp.LESS_OR_EQUAL, new BinaryComparator(lte_))));
}
if (configuredOptions_.hasOption("minTimestamp") || configuredOptions_.hasOption("maxTimestamp")){
scan.setTimeRange(minTimestamp_, maxTimestamp_);
}
if (configuredOptions_.hasOption("timestamp")){
- scan.setTimeStamp(timestamp_);
+ scan.setTimeStamp(timestamp_);
}
- // apply any column filters
- FilterList allColumnFilters = null;
- for (ColumnInfo colInfo : columnInfo_) {
- // all column family filters roll up to one parent OR filter
- if (allColumnFilters == null) {
- allColumnFilters = new FilterList(FilterList.Operator.MUST_PASS_ONE);
- }
- // and each filter contains a column family filter
- FilterList thisColumnFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL);
- thisColumnFilter.addFilter(new FamilyFilter(CompareOp.EQUAL,
- new BinaryComparator(colInfo.getColumnFamily())));
+ // if the group of columnInfos for this family doesn't contain a prefix, we don't need
+ // to set any filters, we can just call addColumn or addFamily. See javadocs below.
+ boolean columnPrefixExists = false;
+ for (ColumnInfo columnInfo : columnInfo_) {
+ if (columnInfo.getColumnPrefix() != null) {
+ columnPrefixExists = true;
+ break;
+ }
+ }
- if (colInfo.isColumnMap()) {
+ if (!columnPrefixExists) {
+ addFiltersWithoutColumnPrefix(columnInfo_);
+ }
+ else {
+ addFiltersWithColumnPrefix(columnInfo_);
+ }
+ }
+ /**
+ * If there is no column with a prefix, we don't need filters, we can just call addColumn and
+ * addFamily on the scan
+ */
+ private void addFiltersWithoutColumnPrefix(List<ColumnInfo> columnInfos) {
+ for (ColumnInfo columnInfo : columnInfos) {
+ if (columnInfo.columnName != null) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Adding family:prefix filters with values " +
- Bytes.toString(colInfo.getColumnFamily()) + COLON +
- Bytes.toString(colInfo.getColumnPrefix()));
- }
-
- // each column map filter consists of a FamilyFilter AND
- // optionally a PrefixFilter
- if (colInfo.getColumnPrefix() != null) {
- thisColumnFilter.addFilter(new ColumnPrefixFilter(
- colInfo.getColumnPrefix()));
+ LOG.debug("Adding column to scan via addColumn with cf:name = " +
+ Bytes.toString(columnInfo.getColumnFamily()) + ":" +
+ Bytes.toString(columnInfo.getColumnName()));
}
+ scan.addColumn(columnInfo.getColumnFamily(), columnInfo.getColumnName());
}
else {
-
if (LOG.isDebugEnabled()) {
- LOG.debug("Adding family:descriptor filters with values " +
- Bytes.toString(colInfo.getColumnFamily()) + COLON +
- Bytes.toString(colInfo.getColumnName()));
+ LOG.debug("Adding column family to scan via addFamily with cf:name = " +
+ Bytes.toString(columnInfo.getColumnFamily()));
}
+ scan.addFamily(columnInfo.getColumnFamily());
+ }
+ }
+ }
- // each column value filter consists of a FamilyFilter AND
- // a QualifierFilter
- thisColumnFilter.addFilter(new QualifierFilter(CompareOp.EQUAL,
- new BinaryComparator(colInfo.getColumnName())));
+ /**
+ * If we have a qualifier with a prefix and a wildcard (i.e. cf:foo*), we need a filter on every
+ * possible column to be returned as shown below. This will become very inneficient for long
+ * lists of columns mixed with a prefixed wildcard.
+ *
+ * FilterList - must pass ALL of
+ * - FamilyFilter
+ * - AND a must pass ONE FilterList of
+ * - either Qualifier
+ * - or ColumnPrefixFilter
+ *
+ * If we have only column family filters (i.e. cf:*) or explicit column descriptors
+ * (i.e., cf:foo) or a mix of both then we don't need filters, since the scan will take
+ * care of that.
+ */
+ private void addFiltersWithColumnPrefix(List<ColumnInfo> columnInfos) {
+ // we need to apply a CF AND column list filter for each family
+ FilterList allColumnFilters = null;
+ Map<String, List<ColumnInfo>> groupedMap = groupByFamily(columnInfos);
+ for (String cfString : groupedMap.keySet()) {
+ List<ColumnInfo> columnInfoList = groupedMap.get(cfString);
+ byte[] cf = Bytes.toBytes(cfString);
+
+ // all filters roll up to one parent OR filter
+ if (allColumnFilters == null) {
+ allColumnFilters = new FilterList(FilterList.Operator.MUST_PASS_ONE);
}
- allColumnFilters.addFilter(thisColumnFilter);
- }
+ // each group contains a column family filter AND (all) and an OR (one of) of
+ // the column filters
+ FilterList thisColumnGroupFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL);
+ thisColumnGroupFilter.addFilter(new FamilyFilter(CompareOp.EQUAL, new BinaryComparator(cf)));
+ FilterList columnFilters = new FilterList(FilterList.Operator.MUST_PASS_ONE);
+ for (ColumnInfo colInfo : columnInfoList) {
+ if (colInfo.isColumnMap()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding family:prefix filters with values " +
+ Bytes.toString(colInfo.getColumnFamily()) + COLON +
+ Bytes.toString(colInfo.getColumnPrefix()));
+ }
+
+ // add a PrefixFilter to the list of column filters
+ if (colInfo.getColumnPrefix() != null) {
+ columnFilters.addFilter(new ColumnPrefixFilter(
+ colInfo.getColumnPrefix()));
+ }
+ }
+ else {
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding family:descriptor filters with values " +
+ Bytes.toString(colInfo.getColumnFamily()) + COLON +
+ Bytes.toString(colInfo.getColumnName()));
+ }
+
+ // add a QualifierFilter to the list of column filters
+ columnFilters.addFilter(new QualifierFilter(CompareOp.EQUAL,
+ new BinaryComparator(colInfo.getColumnName())));
+ }
+ }
+ thisColumnGroupFilter.addFilter(columnFilters);
+ allColumnFilters.addFilter(thisColumnGroupFilter);
+ }
if (allColumnFilters != null) {
addFilter(allColumnFilters);
}
@@ -1036,4 +1120,74 @@ public class HBaseStorage extends LoadFu
public String toString() { return originalColumnName; }
}
+ /**
+ * Group the list of ColumnInfo objects by their column family and returns a map of CF to its
+ * list of ColumnInfo objects. Using String as key since it implements Comparable.
+ * @param columnInfos the columnInfo list to group
+ * @return a Map of lists, keyed by their column family.
+ */
+ static Map<String, List<ColumnInfo>> groupByFamily(List<ColumnInfo> columnInfos) {
+ Map<String, List<ColumnInfo>> groupedMap = new HashMap<String, List<ColumnInfo>>();
+ for (ColumnInfo columnInfo : columnInfos) {
+ String cf = Bytes.toString(columnInfo.getColumnFamily());
+ List<ColumnInfo> columnInfoList = groupedMap.get(cf);
+ if (columnInfoList == null) {
+ columnInfoList = new ArrayList<ColumnInfo>();
+ }
+ columnInfoList.add(columnInfo);
+ groupedMap.put(cf, columnInfoList);
+ }
+ return groupedMap;
+ }
+
+ static String toString(byte[] bytes) {
+ if (bytes == null) { return null; }
+
+ StringBuffer sb = new StringBuffer();
+ for (int i = 0; i < bytes.length; i++) {
+ if (i > 0) { sb.append("|"); }
+ sb.append(bytes[i]);
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Increments the byte array by one for use with setting stopRow. If all bytes in the array are
+ * set to the maximum byte value, then the original array will be returned with a 0 byte appended
+ * to it. This is because HBase compares bytes from left to right. If byte array B is equal to
+ * byte array A, but with an extra byte appended, A will be < B. For example
+ * {@code}A = byte[] {-1}{@code} increments to
+ * {@code}B = byte[] {-1, 0}{@code} and {@code}A < B{@code}
+ * @param bytes array to increment bytes on
+ * @return a copy of the byte array incremented by 1
+ */
+ static byte[] increment(byte[] bytes) {
+ boolean allAtMax = true;
+ for(int i = 0; i < bytes.length; i++) {
+ if((bytes[bytes.length - i - 1] & 0x0ff) != 255) {
+ allAtMax = false;
+ break;
+ }
+ }
+
+ if (allAtMax) {
+ return Arrays.copyOf(bytes, bytes.length + 1);
+ }
+
+ byte[] incremented = bytes.clone();
+ for(int i = bytes.length - 1; i >= 0; i--) {
+ boolean carry = false;
+ int val = bytes[i] & 0x0ff;
+ int total = val + 1;
+ if(total > 255) {
+ carry = true;
+ total %= 256;
+ } else if (total < 0) {
+ carry = true;
+ }
+ incremented[i] = (byte)total;
+ if (!carry) return incremented;
+ }
+ return incremented;
+ }
}
Added: pig/branches/branch-0.11/test/org/apache/pig/backend/hadoop/hbase/TestHBaseStorageFiltering.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/test/org/apache/pig/backend/hadoop/hbase/TestHBaseStorageFiltering.java?rev=1411570&view=auto
==============================================================================
--- pig/branches/branch-0.11/test/org/apache/pig/backend/hadoop/hbase/TestHBaseStorageFiltering.java (added)
+++ pig/branches/branch-0.11/test/org/apache/pig/backend/hadoop/hbase/TestHBaseStorageFiltering.java Tue Nov 20 07:45:52 2012
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.hbase;
+
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.FamilyFilter;
+import org.apache.hadoop.hbase.filter.QualifierFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.pig.impl.util.UDFContext;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Class to test how filters are used by HBaseStorage, since mis-use can be costly in terms of
+ * perfomance.
+ */
+public class TestHBaseStorageFiltering {
+
+ private Filter getHBaseStorageFilter(String firstArg)
+ throws NoSuchFieldException, ParseException, IOException, IllegalAccessException {
+ return initHBaseStorageFilter(firstArg, "");
+ }
+
+ // Helper to initialize HBaseStorage and return its Filter
+ private Filter initHBaseStorageFilter(String firstArg, String secondArg)
+ throws NoSuchFieldException, ParseException, IOException, IllegalAccessException {
+ UDFContext.getUDFContext().setClientSystemProps(new Properties());
+
+ Field scanField = HBaseStorage.class.getDeclaredField("scan");
+ scanField.setAccessible(true);
+
+ HBaseStorage storage = new HBaseStorage(firstArg, secondArg);
+ Scan scan = (Scan)scanField.get(storage);
+ return scan.getFilter();
+ }
+
+ // Helper to initialize HBaseStorage and return its List of ColumnInfo objects
+ @SuppressWarnings("unchecked")
+ private List<HBaseStorage.ColumnInfo> getHBaseColumnInfo(String firstArg, String secondArg)
+ throws NoSuchFieldException, ParseException, IOException, IllegalAccessException {
+ UDFContext.getUDFContext().setClientSystemProps(new Properties());
+
+ Field columnInfoField = HBaseStorage.class.getDeclaredField("columnInfo_");
+ columnInfoField.setAccessible(true);
+
+ HBaseStorage storage = new HBaseStorage(firstArg, secondArg);
+ List<HBaseStorage.ColumnInfo> columnInfoList =
+ (List<HBaseStorage.ColumnInfo>)columnInfoField.get(storage);
+ return columnInfoList;
+ }
+
+ @Test
+ public void testDescriptors() throws Exception {
+ Filter filter = getHBaseStorageFilter("cf1:a cf1:b");
+ assertNull(filter);
+ }
+
+ @Test
+ public void testFamily() throws Exception {
+ Filter filter = getHBaseStorageFilter("cf1:");
+ assertNull(filter);
+ }
+
+ @Test
+ public void testFamily2() throws Exception {
+ Filter filter = getHBaseStorageFilter("cf1:*");
+ assertNull(filter);
+ }
+
+ @Test
+ public void testPrefix() throws Exception {
+ Filter filter = getHBaseStorageFilter("cf1:foo*");
+ List<Filter> childFilters = assertFilterList(filter, FilterList.Operator.MUST_PASS_ALL, 1);
+ childFilters = assertFilterList(childFilters.get(0), FilterList.Operator.MUST_PASS_ONE, 1);
+ childFilters = assertFilterList(childFilters.get(0), FilterList.Operator.MUST_PASS_ALL, 2);
+ assertFamilyFilter(childFilters.get(0), CompareFilter.CompareOp.EQUAL, "cf1");
+ childFilters = assertFilterList(childFilters.get(1), FilterList.Operator.MUST_PASS_ONE, 1);
+ assertPrefixFilter(childFilters.get(0), "foo");
+ }
+
+ @Test
+ public void testDescriptorsAndFamily1() throws Exception {
+ Filter filter = getHBaseStorageFilter("cf1:a cf1:b cf2:");
+ assertNull(filter);
+ }
+
+ @Test
+ public void testDescriptorsAndFamily2() throws Exception {
+ Filter filter = getHBaseStorageFilter("cf1:a cf1:b cf2:*");
+ assertNull(filter);
+ }
+
+ @Test
+ public void testDescriptorsAndPrefix() throws Exception {
+ Filter filter = getHBaseStorageFilter("cf1:a cf1:b cf2:foo*");
+ List<Filter> childFilters = assertFilterList(filter, FilterList.Operator.MUST_PASS_ALL, 1);
+ List<Filter> groupFilters = assertFilterList(childFilters.get(0), FilterList.Operator.MUST_PASS_ONE, 2);
+
+ List<Filter> firstFilters = assertFilterList(groupFilters.get(0), FilterList.Operator.MUST_PASS_ALL, 2);
+ FamilyFilter firstFamilyFilter = assertFamilyFilter(firstFilters.get(0), CompareFilter.CompareOp.EQUAL);
+
+ List<Filter> secondFilters = assertFilterList(groupFilters.get(1), FilterList.Operator.MUST_PASS_ALL, 2);
+ FamilyFilter secondFamilyFilter = assertFamilyFilter(secondFilters.get(0), CompareFilter.CompareOp.EQUAL);
+
+ // one of the above will be the cf1 filters, one will be the cf2. Order is unknown
+ Filter cf1ColumnList;
+ Filter cf2ColumnList;
+ if (Bytes.toString(firstFamilyFilter.getComparator().getValue()).equals("cf1")) {
+ assertEquals("cf2", Bytes.toString(secondFamilyFilter.getComparator().getValue()));
+ cf1ColumnList = firstFilters.get(1);
+ cf2ColumnList = secondFilters.get(1);
+ }
+ else {
+ assertEquals("cf1", Bytes.toString(secondFamilyFilter.getComparator().getValue()));
+ assertEquals("cf2", Bytes.toString(firstFamilyFilter.getComparator().getValue()));
+ cf1ColumnList = secondFilters.get(1);
+ cf2ColumnList = firstFilters.get(1);
+ }
+
+ List<Filter> c1ColumnFilters = assertFilterList(cf1ColumnList, FilterList.Operator.MUST_PASS_ONE, 2);
+ assertQualifierFilter(c1ColumnFilters.get(0), CompareFilter.CompareOp.EQUAL, "a");
+ assertQualifierFilter(c1ColumnFilters.get(1), CompareFilter.CompareOp.EQUAL, "b");
+
+ List<Filter> c2ColumnFilters = assertFilterList(cf2ColumnList, FilterList.Operator.MUST_PASS_ONE, 1);
+ assertPrefixFilter(c2ColumnFilters.get(0), "foo");
+ }
+
+ @Test
+ public void testColumnGroups() throws Exception {
+ List<HBaseStorage.ColumnInfo> columnInfoList = getHBaseColumnInfo("cf1:a cf1:b cf2:foo*", "");
+ Map<String, List<HBaseStorage.ColumnInfo>> groupMap = HBaseStorage.groupByFamily(columnInfoList);
+ assertEquals(2, groupMap.size());
+
+ List<HBaseStorage.ColumnInfo> cf1List = groupMap.get("cf1");
+ assertNotNull(cf1List);
+ assertEquals(2, cf1List.size());
+ assertColumnInfo(cf1List.get(0), "cf1", "a", null);
+ assertColumnInfo(cf1List.get(1), "cf1", "b", null);
+
+ List<HBaseStorage.ColumnInfo> cf2List = groupMap.get("cf2");
+ assertNotNull(cf2List);
+ assertEquals(1, cf2List.size());
+ assertColumnInfo(cf2List.get(0), "cf2", null, "foo");
+ }
+
+ @Test
+ public void testIncrementStrings() {
+ doIncrementTest("100", "101");
+ doIncrementTest("0001", "0002");
+ doIncrementTest("aaaccccc", "aaaccccd");
+ }
+
+ @Test
+ public void testIncrementBytes() {
+ doIncrementTest(Bytes.toBytes(0x03), Bytes.toBytes(0x04));
+ doIncrementTest(new byte[] {0, 1, 0}, new byte[] {0, 1, 1});
+ doIncrementTest(new byte[] {127}, new byte[] {-128});
+ doIncrementTest(new byte[] {-1}, new byte[] {-1, 0});
+ doIncrementTest(new byte[] {-1, -1}, new byte[] {-1, -1, 0});
+ doIncrementTest(new byte[] {0, -1, -1}, new byte[] {1, 0, 0});
+ doIncrementTest(Bytes.toBytes(0xFFFFFFFF), new byte[] {-1, -1, -1, -1, 0});
+ doIncrementTest(Bytes.toBytes(Long.MAX_VALUE), new byte[] {-128, 0, 0, 0, 0, 0, 0, 0});
+ }
+
+ private void doIncrementTest(String initial, String expected) {
+ byte[] initialBytes = Bytes.toBytes(initial);
+ byte[] expectedBytes = Bytes.toBytes(expected);
+ byte[] incrementedBytes = HBaseStorage.increment(initialBytes);
+ assertTrue(String.format("Expected bytes %s (%s) did not equal found bytes %s (%s)",
+ HBaseStorage.toString(expectedBytes), expected, HBaseStorage.toString(incrementedBytes), Bytes.toString(incrementedBytes)),
+ Bytes.compareTo(expectedBytes, incrementedBytes) == 0);
+ assertEquals(expected, Bytes.toString(incrementedBytes));
+ assertTrue("Initial value of " + initial + " should be < " + Bytes.toString(incrementedBytes),
+ Bytes.compareTo(initialBytes, incrementedBytes) < 0);
+ }
+
+ private void doIncrementTest(byte[] initial, byte[] expected) {
+ byte[] incrementedBytes = HBaseStorage.increment(initial);
+ if (expected == null) {
+ assertNull("Expected null bytes but found " + HBaseStorage.toString(incrementedBytes), incrementedBytes);
+ } else {
+ assertTrue(String.format("Expected bytes %s did not equal found bytes %s",
+ HBaseStorage.toString(expected), HBaseStorage.toString(incrementedBytes)),
+ Bytes.compareTo(expected, incrementedBytes) == 0);
+ }
+ assertTrue("Initial value of should be < incremented value",
+ Bytes.compareTo(initial, incrementedBytes) < 0);
+ }
+
+ private void assertColumnInfo(HBaseStorage.ColumnInfo columnInfo,
+ String columnFamily, String columnName, String prefix) {
+ assertEquals(columnFamily, Bytes.toString(columnInfo.getColumnFamily()));
+ assertEquals(columnName, Bytes.toString(columnInfo.getColumnName()));
+ assertEquals(prefix, Bytes.toString(columnInfo.getColumnPrefix()));
+ }
+
+ private List<Filter> assertFilterList(Filter filter, FilterList.Operator operator, int size) {
+ assertTrue("Filter is not a FilterList: " + filter.getClass().getSimpleName(),
+ filter instanceof FilterList);
+ FilterList filterList = (FilterList)filter;
+ assertEquals("Unexpected operator", operator, filterList.getOperator());
+ assertEquals("Unexpected filter list size", size, filterList.getFilters().size());
+ return filterList.getFilters();
+ }
+
+ private FamilyFilter assertFamilyFilter(Filter filter, CompareFilter.CompareOp compareOp, String value) {
+ FamilyFilter familyFilter = assertFamilyFilter(filter, compareOp);
+ assertEquals("Unexpected value", value, Bytes.toString(familyFilter.getComparator().getValue()));
+ return familyFilter;
+ }
+
+ private FamilyFilter assertFamilyFilter(Filter filter, CompareFilter.CompareOp compareOp) {
+ assertTrue("Filter is not a FamilyFilter: " + filter.getClass().getSimpleName(),
+ filter instanceof FamilyFilter);
+ FamilyFilter familyFilter = (FamilyFilter)filter;
+ assertEquals("Unexpected compareOp", compareOp, familyFilter.getOperator());
+ return familyFilter;
+ }
+
+ private void assertPrefixFilter(Filter filter, String prefix) {
+ assertTrue("Filter is not a ColumnPrefixFilter: " + filter.getClass().getSimpleName(),
+ filter instanceof ColumnPrefixFilter);
+ ColumnPrefixFilter familyFilter = (ColumnPrefixFilter)filter;
+ assertEquals("Unexpected prefix", prefix, Bytes.toString(familyFilter.getPrefix()));
+ }
+
+ private void assertQualifierFilter(Filter filter, CompareFilter.CompareOp compareOp, String value) {
+ assertTrue("Filter is not a QualifierFilter: " + filter.getClass().getSimpleName(),
+ filter instanceof QualifierFilter);
+ QualifierFilter qualifierFilter = (QualifierFilter)filter;
+ assertEquals("Unexpected compareOp", compareOp, qualifierFilter.getOperator());
+ assertEquals("Unexpected value", value, Bytes.toString(qualifierFilter.getComparator().getValue()));
+ }
+
+ private void printFilters(List<Filter> filters) {
+ for (Filter child : filters) {
+ printFilters(child);
+ }
+ }
+
+ private void printFilters(Filter filter) {
+ if (filter != null) {
+ System.out.println(filter.toString());
+ if (filter instanceof FilterList) {
+ printFilters(((FilterList)filter).getFilters());
+ }
+ }
+ }
+}