You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by dv...@apache.org on 2011/08/11 23:15:50 UTC

svn commit: r1156834 - in /pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java test/org/apache/pig/test/TestHBaseStorage.java

Author: dvryaboy
Date: Thu Aug 11 21:15:49 2011
New Revision: 1156834

URL: http://svn.apache.org/viewvc?rev=1156834&view=rev
Log:
PIG-2174: HBaseStorage column filters miss some fields

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
    pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1156834&r1=1156833&r2=1156834&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Aug 11 21:15:49 2011
@@ -105,6 +105,8 @@ PIG-2011: Speed up TestTypedMap.java (dv
 
 BUG FIXES
 
+PIG-2174: HBaseStorage column filters miss some fields (billgraham via dvryaboy)
+
 PIG-2183: Pig not working with Hadoop 0.20.203.0 (daijy)
 
 PIG-2090: re-enable TestGrunt test cases (thejas)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1156834&r1=1156833&r2=1156834&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Thu Aug 11 21:15:49 2011
@@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.client.Sc
 import org.apache.hadoop.hbase.filter.BinaryComparator;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.QualifierFilter;
 import org.apache.hadoop.hbase.filter.RowFilter;
 import org.apache.hadoop.hbase.filter.FamilyFilter;
 import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
@@ -334,12 +335,17 @@ public class HBaseStorage extends LoadFu
         // apply any column filters
         FilterList allColumnFilters = null;
         for (ColumnInfo colInfo : columnInfo_) {
-            if (colInfo.isColumnMap() && colInfo.getColumnPrefix() != null) {
+            // all column family filters roll up to one parent OR filter
+            if (allColumnFilters == null) {
+                allColumnFilters = new FilterList(FilterList.Operator.MUST_PASS_ONE);
+            }
 
-                // all column family filters roll up to one parent OR filter
-                if (allColumnFilters == null) {
-                    allColumnFilters = new FilterList(FilterList.Operator.MUST_PASS_ONE);
-                }
+            // and each filter contains a column family filter
+            FilterList thisColumnFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL);
+            thisColumnFilter.addFilter(new FamilyFilter(CompareOp.EQUAL,
+                    new BinaryComparator(colInfo.getColumnFamily())));
+
+            if (colInfo.isColumnMap()) {
 
                 if (LOG.isInfoEnabled()) {
                     LOG.info("Adding family:prefix filters with values " +
@@ -347,15 +353,28 @@ public class HBaseStorage extends LoadFu
                         Bytes.toString(colInfo.getColumnPrefix()));
                 }
 
-                // each column family filter consists of a FamilyFilter AND a PrefixFilter
-                FilterList thisColumnFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL);
-                thisColumnFilter.addFilter(new FamilyFilter(CompareOp.EQUAL,
-                        new BinaryComparator(colInfo.getColumnFamily())));
-                thisColumnFilter.addFilter(new ColumnPrefixFilter(
+                // each column map filter consists of a FamilyFilter AND
+                // optionally a PrefixFilter
+                if (colInfo.getColumnPrefix() != null) {
+                    thisColumnFilter.addFilter(new ColumnPrefixFilter(
                         colInfo.getColumnPrefix()));
+                }
+            }
+            else {
 
-                allColumnFilters.addFilter(thisColumnFilter);
+                if (LOG.isInfoEnabled()) {
+                    LOG.info("Adding family:descriptor filters with values " +
+                        Bytes.toString(colInfo.getColumnFamily()) + COLON +
+                        Bytes.toString(colInfo.getColumnName()));
+                }
+
+                // each column value filter consists of a FamilyFilter AND
+                // a QualifierFilter
+                thisColumnFilter.addFilter(new QualifierFilter(CompareOp.EQUAL,
+                        new BinaryComparator(colInfo.getColumnName())));
             }
+
+            allColumnFilters.addFilter(thisColumnFilter);
         }
 
         if (allColumnFilters != null) {
@@ -374,7 +393,7 @@ public class HBaseStorage extends LoadFu
     private void addFilter(Filter filter) {
         FilterList scanFilter = (FilterList) scan.getFilter();
         if (scanFilter == null) {
-            scanFilter = new FilterList();
+            scanFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL);
         }
         scanFilter.addFilter(filter);
         scan.setFilter(scanFilter);

Modified: pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java?rev=1156834&r1=1156833&r2=1156834&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java Thu Aug 11 21:15:49 2011
@@ -166,7 +166,7 @@ public class TestHBaseStorage {
             Assert.assertEquals(count + 0.0, Double.parseDouble(col_b), 1e-6);
             Assert.assertEquals("Text_" + count, col_c);
 
-            Assert.assertEquals(4, pig_cf_map.size());
+            Assert.assertEquals(5, pig_cf_map.size());
             Assert.assertEquals(count,
                     Integer.parseInt(pig_cf_map.get("col_a").toString()));
             Assert.assertEquals(count + 0.0,
@@ -263,6 +263,87 @@ public class TestHBaseStorage {
         LOG.info("LoadFromHBase done");
     }
 
+    /**
+     *     * Test Load from hbase with map parameters and column prefix with a
+     *     static column
+     *
+     */
+    @Test
+    public void testLoadWithFixedAndPrefixedCols() throws IOException {
+        prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
+
+        //NOTE: I think there is some strangeness in how HBase handles column
+        // filters. I was only able to reproduce a bug related to missing column
+        // values in the response when I used 'sc' as a column name, instead of
+        // 'col_a' as I use below.
+        pig.registerQuery("a = load 'hbase://"
+                + TESTTABLE_1
+                + "' using "
+                + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+                + "pig:sc pig:prefixed_col_*"
+                + "','-loadKey') as (rowKey:chararray, sc:chararray, pig_cf_map:map[]);");
+        Iterator<Tuple> it = pig.openIterator("a");
+        int count = 0;
+        LOG.info("LoadFromHBase Starting");
+        while (it.hasNext()) {
+            Tuple t = it.next();
+            LOG.info("LoadFromHBase " + t);
+            String rowKey = (String) t.get(0);
+            String col_a = t.get(1) != null ? t.get(1).toString() : null;
+            Map pig_cf_map = (Map) t.get(2);
+            Assert.assertEquals(3, t.size());
+
+            Assert.assertEquals("00".substring((count + "").length()) + count,
+                    rowKey);
+            Assert.assertEquals("PrefixedText_" + count,
+                    ((DataByteArray) pig_cf_map.get("prefixed_col_d")).toString());
+            Assert.assertEquals(1, pig_cf_map.size());
+            Assert.assertEquals(Integer.toString(count), col_a);
+
+            count++;
+        }
+        Assert.assertEquals(TEST_ROW_COUNT, count);
+        LOG.info("LoadFromHBase done");
+    }
+  
+    /**
+     *     * Test Load from hbase with map parameters and with a
+     *     static column
+     *
+     */
+    @Test
+    public void testLoadWithFixedAndPrefixedCols2() throws IOException {
+        prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
+
+        pig.registerQuery("a = load 'hbase://"
+                + TESTTABLE_1
+                + "' using "
+                + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+                + "pig:col_a pig:prefixed_col_*"
+                + "','-loadKey') as (rowKey:chararray, col_a:chararray, pig_cf_map:map[]);");
+        Iterator<Tuple> it = pig.openIterator("a");
+        int count = 0;
+        LOG.info("LoadFromHBase Starting");
+        while (it.hasNext()) {
+            Tuple t = it.next();
+            LOG.info("LoadFromHBase " + t);
+            String rowKey = (String) t.get(0);
+            String col_a = t.get(1) != null ? t.get(1).toString() : null;
+            Map pig_cf_map = (Map) t.get(2);
+            Assert.assertEquals(3, t.size());
+
+            Assert.assertEquals("00".substring((count + "").length()) + count,
+                    rowKey);
+            Assert.assertEquals("PrefixedText_" + count,
+                    ((DataByteArray) pig_cf_map.get("prefixed_col_d")).toString());
+            Assert.assertEquals(1, pig_cf_map.size());
+            Assert.assertEquals(Integer.toString(count), col_a);
+
+            count++;
+        }
+        Assert.assertEquals(TEST_ROW_COUNT, count);
+        LOG.info("LoadFromHBase done");
+    }
 
     /**
      * load from hbase test
@@ -810,6 +891,9 @@ public class TestHBaseStorage {
                     // row key: string type
                     Put put = new Put(Bytes.toBytes("00".substring(v.length())
                             + v));
+                    // sc: int type
+                    put.add(COLUMNFAMILY, Bytes.toBytes("sc"),
+                            Bytes.toBytes(i));
                     // col_a: int type
                     put.add(COLUMNFAMILY, Bytes.toBytes("col_a"),
                             Bytes.toBytes(i));
@@ -827,6 +911,9 @@ public class TestHBaseStorage {
                     // row key: string type
                     Put put = new Put(
                             ("00".substring(v.length()) + v).getBytes());
+                    // sc: int type
+                    put.add(COLUMNFAMILY, Bytes.toBytes("sc"),
+                            (i + "").getBytes()); // int
                     // col_a: int type
                     put.add(COLUMNFAMILY, Bytes.toBytes("col_a"),
                             (i + "").getBytes()); // int