You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by bi...@apache.org on 2012/11/20 08:45:53 UTC

svn commit: r1411570 - in /pig/branches/branch-0.11: CHANGES.txt src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java test/org/apache/pig/backend/hadoop/hbase/ test/org/apache/pig/backend/hadoop/hbase/TestHBaseStorageFiltering.java

Author: billgraham
Date: Tue Nov 20 07:45:52 2012
New Revision: 1411570

URL: http://svn.apache.org/viewvc?rev=1411570&view=rev
Log:
PIG-2934: HBaseStorage filter optimizations (billgraham)

Added:
    pig/branches/branch-0.11/test/org/apache/pig/backend/hadoop/hbase/
    pig/branches/branch-0.11/test/org/apache/pig/backend/hadoop/hbase/TestHBaseStorageFiltering.java
Modified:
    pig/branches/branch-0.11/CHANGES.txt
    pig/branches/branch-0.11/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java

Modified: pig/branches/branch-0.11/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/CHANGES.txt?rev=1411570&r1=1411569&r2=1411570&view=diff
==============================================================================
--- pig/branches/branch-0.11/CHANGES.txt (original)
+++ pig/branches/branch-0.11/CHANGES.txt Tue Nov 20 07:45:52 2012
@@ -28,6 +28,8 @@ PIG-1891 Enable StoreFunc to make intell
 
 IMPROVEMENTS
 
+PIG-2934: HBaseStorage filter optimizations (billgraham)
+
 PIG-2980: documentation for DateTime datatype (zjshen via thejas)
 
 PIG-2982: add unit tests for DateTime type that test setting timezone (zjshen via thejas)

Modified: pig/branches/branch-0.11/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1411570&r1=1411569&r2=1411570&view=diff
==============================================================================
--- pig/branches/branch-0.11/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original)
+++ pig/branches/branch-0.11/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Tue Nov 20 07:45:52 2012
@@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.filter.Co
 import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.filter.QualifierFilter;
 import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.WhileMatchFilter;
 import org.apache.hadoop.hbase.filter.FamilyFilter;
 import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
 import org.apache.hadoop.hbase.filter.Filter;
@@ -114,6 +115,12 @@ import com.google.common.collect.Lists;
  * the constructor in the above example would cause only columns that start with
  * <i>bob_</i> to be loaded.
  * <P>
+ * Note that when using a prefix like <code>friends:bob_*</code>, explicit HBase filters are set for
+ * all columns and prefixes specified. Querying HBase with many filters can cause performance
+ * degredation. This is typically seen when mixing one or more prefixed descriptors with a large list
+ * of columns. In that case better perfomance will be seen by either loading the entire family via
+ * <code>friends:*</code> or by specifying explicit column descriptor names.
+ * <P>
  * Below is an example showing how to store data into HBase:
  * <pre>{@code
  * copy = STORE raw INTO 'hbase://SampleTableCopy'
@@ -235,6 +242,7 @@ public class HBaseStorage extends LoadFu
      * <li>-minTimestamp= Scan's timestamp for min timeRange
      * <li>-maxTimestamp= Scan's timestamp for max timeRange
      * <li>-timestamp= Scan's specified timestamp
+     * <li>-caster=(HBaseBinaryConverter|Utf8StorageConverter) Utf8StorageConverter is the default
      * To be used with extreme caution, since this could result in data loss
      * (see http://hbase.apache.org/book.html#perf.hbase.client.putwal).
      * </ul>
@@ -310,7 +318,7 @@ public class HBaseStorage extends LoadFu
             timestamp_ = 0;
         }
         
-        initScan();	    
+        initScan();
     }
 
     /**
@@ -376,70 +384,146 @@ public class HBaseStorage extends LoadFu
         if (configuredOptions_.hasOption("gt")) {
             gt_ = Bytes.toBytesBinary(Utils.slashisize(configuredOptions_.getOptionValue("gt")));
             addRowFilter(CompareOp.GREATER, gt_);
+            scan.setStartRow(gt_);
         }
         if (configuredOptions_.hasOption("lt")) {
             lt_ = Bytes.toBytesBinary(Utils.slashisize(configuredOptions_.getOptionValue("lt")));
             addRowFilter(CompareOp.LESS, lt_);
+            scan.setStopRow(lt_);
         }
         if (configuredOptions_.hasOption("gte")) {
             gte_ = Bytes.toBytesBinary(Utils.slashisize(configuredOptions_.getOptionValue("gte")));
-            addRowFilter(CompareOp.GREATER_OR_EQUAL, gte_);
+            scan.setStartRow(gte_);
         }
         if (configuredOptions_.hasOption("lte")) {
             lte_ = Bytes.toBytesBinary(Utils.slashisize(configuredOptions_.getOptionValue("lte")));
-            addRowFilter(CompareOp.LESS_OR_EQUAL, lte_);
+            byte[] lt = increment(lte_);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(String.format("Incrementing lte value of %s from bytes %s to %s to set stop row",
+                          Bytes.toString(lte_), toString(lte_), toString(lt)));
+            }
+
+            if (lt != null) {
+                scan.setStopRow(increment(lte_));
+            }
+
+            // The WhileMatchFilter will short-circuit the scan after we no longer match. The
+            // setStopRow call will limit the number of regions we need to scan
+            addFilter(new WhileMatchFilter(new RowFilter(CompareOp.LESS_OR_EQUAL, new BinaryComparator(lte_))));
         }
         if (configuredOptions_.hasOption("minTimestamp") || configuredOptions_.hasOption("maxTimestamp")){
             scan.setTimeRange(minTimestamp_, maxTimestamp_);
         }
         if (configuredOptions_.hasOption("timestamp")){
-        	scan.setTimeStamp(timestamp_);
+            scan.setTimeStamp(timestamp_);
         }
-        // apply any column filters
-        FilterList allColumnFilters = null;
-        for (ColumnInfo colInfo : columnInfo_) {
-            // all column family filters roll up to one parent OR filter
-            if (allColumnFilters == null) {
-                allColumnFilters = new FilterList(FilterList.Operator.MUST_PASS_ONE);
-            }
 
-            // and each filter contains a column family filter
-            FilterList thisColumnFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL);
-            thisColumnFilter.addFilter(new FamilyFilter(CompareOp.EQUAL,
-                    new BinaryComparator(colInfo.getColumnFamily())));
+        // if the group of columnInfos for this family doesn't contain a prefix, we don't need
+        // to set any filters, we can just call addColumn or addFamily. See javadocs below.
+        boolean columnPrefixExists = false;
+        for (ColumnInfo columnInfo : columnInfo_) {
+            if (columnInfo.getColumnPrefix() != null) {
+                columnPrefixExists = true;
+                break;
+            }
+        }
 
-            if (colInfo.isColumnMap()) {
+        if (!columnPrefixExists) {
+            addFiltersWithoutColumnPrefix(columnInfo_);
+        }
+        else {
+            addFiltersWithColumnPrefix(columnInfo_);
+        }
+    }
 
+    /**
+     * If there is no column with a prefix, we don't need filters, we can just call addColumn and
+     * addFamily on the scan
+     */
+    private void addFiltersWithoutColumnPrefix(List<ColumnInfo> columnInfos) {
+        for (ColumnInfo columnInfo : columnInfos) {
+            if (columnInfo.columnName != null) {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("Adding family:prefix filters with values " +
-                        Bytes.toString(colInfo.getColumnFamily()) + COLON +
-                        Bytes.toString(colInfo.getColumnPrefix()));
-                }
-
-                // each column map filter consists of a FamilyFilter AND
-                // optionally a PrefixFilter
-                if (colInfo.getColumnPrefix() != null) {
-                    thisColumnFilter.addFilter(new ColumnPrefixFilter(
-                        colInfo.getColumnPrefix()));
+                    LOG.debug("Adding column to scan via addColumn with cf:name = " +
+                            Bytes.toString(columnInfo.getColumnFamily()) + ":" +
+                            Bytes.toString(columnInfo.getColumnName()));
                 }
+                scan.addColumn(columnInfo.getColumnFamily(), columnInfo.getColumnName());
             }
             else {
-
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("Adding family:descriptor filters with values " +
-                        Bytes.toString(colInfo.getColumnFamily()) + COLON +
-                        Bytes.toString(colInfo.getColumnName()));
+                    LOG.debug("Adding column family to scan via addFamily with cf:name = " +
+                            Bytes.toString(columnInfo.getColumnFamily()));
                 }
+                scan.addFamily(columnInfo.getColumnFamily());
+            }
+        }
+    }
 
-                // each column value filter consists of a FamilyFilter AND
-                // a QualifierFilter
-                thisColumnFilter.addFilter(new QualifierFilter(CompareOp.EQUAL,
-                        new BinaryComparator(colInfo.getColumnName())));
+    /**
+     *  If we have a qualifier with a prefix and a wildcard (i.e. cf:foo*), we need a filter on every
+     *  possible column to be returned as shown below. This will become very inneficient for long
+     *  lists of columns mixed with a prefixed wildcard.
+     *
+     *  FilterList - must pass ALL of
+     *   - FamilyFilter
+     *   - AND a must pass ONE FilterList of
+     *    - either Qualifier
+     *    - or ColumnPrefixFilter
+     *
+     *  If we have only column family filters (i.e. cf:*) or explicit column descriptors
+     *  (i.e., cf:foo) or a mix of both then we don't need filters, since the scan will take
+     *  care of that.
+     */
+    private void addFiltersWithColumnPrefix(List<ColumnInfo> columnInfos) {
+        // we need to apply a CF AND column list filter for each family
+        FilterList allColumnFilters = null;
+        Map<String, List<ColumnInfo>> groupedMap = groupByFamily(columnInfos);
+        for (String cfString : groupedMap.keySet()) {
+            List<ColumnInfo> columnInfoList = groupedMap.get(cfString);
+            byte[] cf = Bytes.toBytes(cfString);
+
+            // all filters roll up to one parent OR filter
+            if (allColumnFilters == null) {
+                allColumnFilters = new FilterList(FilterList.Operator.MUST_PASS_ONE);
             }
 
-            allColumnFilters.addFilter(thisColumnFilter);
-        }
+            // each group contains a column family filter AND (all) and an OR (one of) of
+            // the column filters
+            FilterList thisColumnGroupFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL);
+            thisColumnGroupFilter.addFilter(new FamilyFilter(CompareOp.EQUAL, new BinaryComparator(cf)));
+            FilterList columnFilters = new FilterList(FilterList.Operator.MUST_PASS_ONE);
+            for (ColumnInfo colInfo : columnInfoList) {
+                if (colInfo.isColumnMap()) {
 
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Adding family:prefix filters with values " +
+                                Bytes.toString(colInfo.getColumnFamily()) + COLON +
+                                Bytes.toString(colInfo.getColumnPrefix()));
+                    }
+
+                    // add a PrefixFilter to the list of column filters
+                    if (colInfo.getColumnPrefix() != null) {
+                        columnFilters.addFilter(new ColumnPrefixFilter(
+                            colInfo.getColumnPrefix()));
+                    }
+                }
+                else {
+
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Adding family:descriptor filters with values " +
+                                Bytes.toString(colInfo.getColumnFamily()) + COLON +
+                                Bytes.toString(colInfo.getColumnName()));
+                    }
+
+                    // add a QualifierFilter to the list of column filters
+                    columnFilters.addFilter(new QualifierFilter(CompareOp.EQUAL,
+                            new BinaryComparator(colInfo.getColumnName())));
+                }
+            }
+            thisColumnGroupFilter.addFilter(columnFilters);
+            allColumnFilters.addFilter(thisColumnGroupFilter);
+        }
         if (allColumnFilters != null) {
             addFilter(allColumnFilters);
         }
@@ -1036,4 +1120,74 @@ public class HBaseStorage extends LoadFu
         public String toString() { return originalColumnName; }
     }
 
+    /**
+     * Group the list of ColumnInfo objects by their column family and returns a map of CF to its
+     * list of ColumnInfo objects. Using String as key since it implements Comparable.
+     * @param columnInfos the columnInfo list to group
+     * @return a Map of lists, keyed by their column family.
+     */
+    static Map<String, List<ColumnInfo>> groupByFamily(List<ColumnInfo> columnInfos) {
+        Map<String, List<ColumnInfo>> groupedMap = new HashMap<String, List<ColumnInfo>>();
+        for (ColumnInfo columnInfo : columnInfos) {
+            String cf = Bytes.toString(columnInfo.getColumnFamily());
+            List<ColumnInfo> columnInfoList = groupedMap.get(cf);
+            if (columnInfoList == null) {
+                columnInfoList = new ArrayList<ColumnInfo>();
+            }
+            columnInfoList.add(columnInfo);
+            groupedMap.put(cf, columnInfoList);
+        }
+        return groupedMap;
+    }
+
+    static String toString(byte[] bytes) {
+        if (bytes == null) { return null; }
+
+        StringBuffer sb = new StringBuffer();
+        for (int i = 0; i < bytes.length; i++) {
+            if (i > 0) { sb.append("|"); }
+            sb.append(bytes[i]);
+        }
+        return sb.toString();
+    }
+
+    /**
+     * Increments the byte array by one for use with setting stopRow. If all bytes in the array are
+     * set to the maximum byte value, then the original array will be returned with a 0 byte appended
+     * to it. This is because HBase compares bytes from left to right. If byte array B is equal to
+     * byte array A, but with an extra byte appended, A will be < B. For example
+     * {@code}A = byte[] {-1}{@code} increments to
+     * {@code}B = byte[] {-1, 0}{@code} and {@code}A < B{@code}
+     * @param bytes array to increment bytes on
+     * @return a copy of the byte array incremented by 1
+     */
+    static byte[] increment(byte[] bytes) {
+        boolean allAtMax = true;
+        for(int i = 0; i < bytes.length; i++) {
+            if((bytes[bytes.length - i - 1] & 0x0ff) != 255) {
+                allAtMax = false;
+                break;
+            }
+        }
+
+        if (allAtMax) {
+            return Arrays.copyOf(bytes, bytes.length + 1);
+        }
+
+        byte[] incremented = bytes.clone();
+        for(int i = bytes.length - 1; i >= 0; i--) {
+            boolean carry = false;
+            int val = bytes[i] & 0x0ff;
+            int total = val + 1;
+            if(total > 255) {
+                carry = true;
+                total %= 256;
+            } else if (total < 0) {
+                carry = true;
+            }
+            incremented[i] = (byte)total;
+            if (!carry) return incremented;
+        }
+        return incremented;
+    }
 }

Added: pig/branches/branch-0.11/test/org/apache/pig/backend/hadoop/hbase/TestHBaseStorageFiltering.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/test/org/apache/pig/backend/hadoop/hbase/TestHBaseStorageFiltering.java?rev=1411570&view=auto
==============================================================================
--- pig/branches/branch-0.11/test/org/apache/pig/backend/hadoop/hbase/TestHBaseStorageFiltering.java (added)
+++ pig/branches/branch-0.11/test/org/apache/pig/backend/hadoop/hbase/TestHBaseStorageFiltering.java Tue Nov 20 07:45:52 2012
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.hbase;
+
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.FamilyFilter;
+import org.apache.hadoop.hbase.filter.QualifierFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.pig.impl.util.UDFContext;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Class to test how filters are used by HBaseStorage, since mis-use can be costly in terms of
+ * perfomance.
+ */
+public class TestHBaseStorageFiltering {
+
+    private Filter getHBaseStorageFilter(String firstArg)
+            throws NoSuchFieldException, ParseException, IOException, IllegalAccessException {
+        return initHBaseStorageFilter(firstArg, "");
+    }
+
+    // Helper to initialize HBaseStorage and return its Filter
+    private Filter initHBaseStorageFilter(String firstArg, String secondArg)
+            throws NoSuchFieldException, ParseException, IOException, IllegalAccessException {
+        UDFContext.getUDFContext().setClientSystemProps(new Properties());
+
+        Field scanField = HBaseStorage.class.getDeclaredField("scan");
+        scanField.setAccessible(true);
+
+        HBaseStorage storage = new HBaseStorage(firstArg, secondArg);
+        Scan scan = (Scan)scanField.get(storage);
+        return scan.getFilter();
+    }
+
+    // Helper to initialize HBaseStorage and return its List of ColumnInfo objects
+    @SuppressWarnings("unchecked")
+    private List<HBaseStorage.ColumnInfo> getHBaseColumnInfo(String firstArg, String secondArg)
+            throws NoSuchFieldException, ParseException, IOException, IllegalAccessException {
+        UDFContext.getUDFContext().setClientSystemProps(new Properties());
+
+        Field columnInfoField = HBaseStorage.class.getDeclaredField("columnInfo_");
+        columnInfoField.setAccessible(true);
+
+        HBaseStorage storage = new HBaseStorage(firstArg, secondArg);
+        List<HBaseStorage.ColumnInfo> columnInfoList =
+                (List<HBaseStorage.ColumnInfo>)columnInfoField.get(storage);
+        return columnInfoList;
+    }
+
+    @Test
+    public void testDescriptors() throws Exception {
+        Filter filter = getHBaseStorageFilter("cf1:a cf1:b");
+        assertNull(filter);
+    }
+
+    @Test
+    public void testFamily() throws Exception {
+        Filter filter = getHBaseStorageFilter("cf1:");
+        assertNull(filter);
+    }
+
+    @Test
+    public void testFamily2() throws Exception {
+        Filter filter = getHBaseStorageFilter("cf1:*");
+        assertNull(filter);
+    }
+
+    @Test
+    public void testPrefix() throws Exception {
+        Filter filter = getHBaseStorageFilter("cf1:foo*");
+        List<Filter> childFilters = assertFilterList(filter, FilterList.Operator.MUST_PASS_ALL, 1);
+        childFilters = assertFilterList(childFilters.get(0), FilterList.Operator.MUST_PASS_ONE, 1);
+        childFilters = assertFilterList(childFilters.get(0), FilterList.Operator.MUST_PASS_ALL, 2);
+        assertFamilyFilter(childFilters.get(0), CompareFilter.CompareOp.EQUAL, "cf1");
+        childFilters = assertFilterList(childFilters.get(1), FilterList.Operator.MUST_PASS_ONE, 1);
+        assertPrefixFilter(childFilters.get(0), "foo");
+    }
+
+    @Test
+    public void testDescriptorsAndFamily1() throws Exception {
+        Filter filter = getHBaseStorageFilter("cf1:a cf1:b cf2:");
+        assertNull(filter);
+    }
+
+    @Test
+    public void testDescriptorsAndFamily2() throws Exception {
+        Filter filter = getHBaseStorageFilter("cf1:a cf1:b cf2:*");
+        assertNull(filter);
+    }
+
+    @Test
+    public void testDescriptorsAndPrefix() throws Exception {
+        Filter filter = getHBaseStorageFilter("cf1:a cf1:b cf2:foo*");
+        List<Filter> childFilters = assertFilterList(filter, FilterList.Operator.MUST_PASS_ALL, 1);
+        List<Filter> groupFilters = assertFilterList(childFilters.get(0), FilterList.Operator.MUST_PASS_ONE, 2);
+
+        List<Filter> firstFilters = assertFilterList(groupFilters.get(0), FilterList.Operator.MUST_PASS_ALL, 2);
+        FamilyFilter firstFamilyFilter = assertFamilyFilter(firstFilters.get(0), CompareFilter.CompareOp.EQUAL);
+
+        List<Filter> secondFilters = assertFilterList(groupFilters.get(1), FilterList.Operator.MUST_PASS_ALL, 2);
+        FamilyFilter secondFamilyFilter = assertFamilyFilter(secondFilters.get(0), CompareFilter.CompareOp.EQUAL);
+
+        // one of the above will be the cf1 filters, one will be the cf2. Order is unknown
+        Filter cf1ColumnList;
+        Filter cf2ColumnList;
+        if (Bytes.toString(firstFamilyFilter.getComparator().getValue()).equals("cf1")) {
+            assertEquals("cf2", Bytes.toString(secondFamilyFilter.getComparator().getValue()));
+            cf1ColumnList = firstFilters.get(1);
+            cf2ColumnList = secondFilters.get(1);
+        }
+        else {
+            assertEquals("cf1", Bytes.toString(secondFamilyFilter.getComparator().getValue()));
+            assertEquals("cf2", Bytes.toString(firstFamilyFilter.getComparator().getValue()));
+            cf1ColumnList = secondFilters.get(1);
+            cf2ColumnList = firstFilters.get(1);
+        }
+
+        List<Filter> c1ColumnFilters = assertFilterList(cf1ColumnList, FilterList.Operator.MUST_PASS_ONE, 2);
+        assertQualifierFilter(c1ColumnFilters.get(0), CompareFilter.CompareOp.EQUAL, "a");
+        assertQualifierFilter(c1ColumnFilters.get(1), CompareFilter.CompareOp.EQUAL, "b");
+
+        List<Filter> c2ColumnFilters = assertFilterList(cf2ColumnList, FilterList.Operator.MUST_PASS_ONE, 1);
+        assertPrefixFilter(c2ColumnFilters.get(0), "foo");
+    }
+
+    @Test
+    public void testColumnGroups() throws Exception {
+        List<HBaseStorage.ColumnInfo> columnInfoList = getHBaseColumnInfo("cf1:a cf1:b cf2:foo*", "");
+        Map<String, List<HBaseStorage.ColumnInfo>> groupMap = HBaseStorage.groupByFamily(columnInfoList);
+        assertEquals(2, groupMap.size());
+
+        List<HBaseStorage.ColumnInfo> cf1List = groupMap.get("cf1");
+        assertNotNull(cf1List);
+        assertEquals(2, cf1List.size());
+        assertColumnInfo(cf1List.get(0), "cf1", "a", null);
+        assertColumnInfo(cf1List.get(1), "cf1", "b", null);
+
+        List<HBaseStorage.ColumnInfo> cf2List = groupMap.get("cf2");
+        assertNotNull(cf2List);
+        assertEquals(1, cf2List.size());
+        assertColumnInfo(cf2List.get(0), "cf2", null, "foo");
+    }
+
+    @Test
+    public void testIncrementStrings() {
+        doIncrementTest("100", "101");
+        doIncrementTest("0001", "0002");
+        doIncrementTest("aaaccccc", "aaaccccd");
+    }
+
+    @Test
+    public void testIncrementBytes() {
+        doIncrementTest(Bytes.toBytes(0x03), Bytes.toBytes(0x04));
+        doIncrementTest(new byte[] {0, 1, 0}, new byte[] {0, 1, 1});
+        doIncrementTest(new byte[] {127}, new byte[] {-128});
+        doIncrementTest(new byte[] {-1}, new byte[] {-1, 0});
+        doIncrementTest(new byte[] {-1, -1}, new byte[] {-1, -1, 0});
+        doIncrementTest(new byte[] {0, -1, -1}, new byte[] {1, 0, 0});
+        doIncrementTest(Bytes.toBytes(0xFFFFFFFF), new byte[] {-1, -1, -1, -1, 0});
+        doIncrementTest(Bytes.toBytes(Long.MAX_VALUE), new byte[] {-128, 0, 0, 0, 0, 0, 0, 0});
+    }
+
+    private void doIncrementTest(String initial, String expected) {
+        byte[] initialBytes = Bytes.toBytes(initial);
+        byte[] expectedBytes = Bytes.toBytes(expected);
+        byte[] incrementedBytes = HBaseStorage.increment(initialBytes);
+        assertTrue(String.format("Expected bytes %s (%s) did not equal found bytes %s (%s)",
+                HBaseStorage.toString(expectedBytes), expected, HBaseStorage.toString(incrementedBytes), Bytes.toString(incrementedBytes)),
+                Bytes.compareTo(expectedBytes, incrementedBytes) == 0);
+        assertEquals(expected, Bytes.toString(incrementedBytes));
+        assertTrue("Initial value of " + initial + " should be < " + Bytes.toString(incrementedBytes),
+                Bytes.compareTo(initialBytes, incrementedBytes) < 0);
+    }
+
+    private void doIncrementTest(byte[] initial, byte[] expected) {
+        byte[] incrementedBytes = HBaseStorage.increment(initial);
+        if (expected == null) {
+            assertNull("Expected null bytes but found " + HBaseStorage.toString(incrementedBytes), incrementedBytes);
+        } else {
+            assertTrue(String.format("Expected bytes %s did not equal found bytes %s",
+                    HBaseStorage.toString(expected), HBaseStorage.toString(incrementedBytes)),
+                    Bytes.compareTo(expected, incrementedBytes) == 0);
+        }
+        assertTrue("Initial value of should be < incremented value",
+                Bytes.compareTo(initial, incrementedBytes) < 0);
+    }
+
+    private void assertColumnInfo(HBaseStorage.ColumnInfo columnInfo,
+                                  String columnFamily, String columnName, String prefix) {
+        assertEquals(columnFamily, Bytes.toString(columnInfo.getColumnFamily()));
+        assertEquals(columnName, Bytes.toString(columnInfo.getColumnName()));
+        assertEquals(prefix, Bytes.toString(columnInfo.getColumnPrefix()));
+    }
+
+    private List<Filter> assertFilterList(Filter filter, FilterList.Operator operator, int size) {
+        assertTrue("Filter is not a FilterList: " + filter.getClass().getSimpleName(),
+                filter instanceof FilterList);
+        FilterList filterList = (FilterList)filter;
+        assertEquals("Unexpected operator", operator, filterList.getOperator());
+        assertEquals("Unexpected filter list size", size, filterList.getFilters().size());
+        return filterList.getFilters();
+    }
+
+    private FamilyFilter assertFamilyFilter(Filter filter, CompareFilter.CompareOp compareOp, String value) {
+        FamilyFilter familyFilter = assertFamilyFilter(filter, compareOp);
+        assertEquals("Unexpected value", value, Bytes.toString(familyFilter.getComparator().getValue()));
+        return familyFilter;
+    }
+
+    private FamilyFilter assertFamilyFilter(Filter filter, CompareFilter.CompareOp compareOp) {
+        assertTrue("Filter is not a FamilyFilter: " + filter.getClass().getSimpleName(),
+                filter instanceof FamilyFilter);
+        FamilyFilter familyFilter = (FamilyFilter)filter;
+        assertEquals("Unexpected compareOp", compareOp, familyFilter.getOperator());
+        return familyFilter;
+    }
+
+    private void assertPrefixFilter(Filter filter, String prefix) {
+        assertTrue("Filter is not a ColumnPrefixFilter: " + filter.getClass().getSimpleName(),
+                filter instanceof ColumnPrefixFilter);
+        ColumnPrefixFilter familyFilter = (ColumnPrefixFilter)filter;
+        assertEquals("Unexpected prefix", prefix, Bytes.toString(familyFilter.getPrefix()));
+    }
+
+    private void assertQualifierFilter(Filter filter, CompareFilter.CompareOp compareOp, String value) {
+        assertTrue("Filter is not a QualifierFilter: " + filter.getClass().getSimpleName(),
+                filter instanceof QualifierFilter);
+        QualifierFilter qualifierFilter = (QualifierFilter)filter;
+        assertEquals("Unexpected compareOp", compareOp, qualifierFilter.getOperator());
+        assertEquals("Unexpected value", value, Bytes.toString(qualifierFilter.getComparator().getValue()));
+    }
+
+    private void printFilters(List<Filter> filters) {
+        for (Filter child : filters) {
+            printFilters(child);
+        }
+    }
+
+    private void printFilters(Filter filter) {
+        if (filter != null) {
+            System.out.println(filter.toString());
+            if (filter instanceof FilterList) {
+                printFilters(((FilterList)filter).getFilters());
+            }
+        }
+    }
+}