You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by dv...@apache.org on 2011/08/11 23:15:50 UTC
svn commit: r1156834 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
test/org/apache/pig/test/TestHBaseStorage.java
Author: dvryaboy
Date: Thu Aug 11 21:15:49 2011
New Revision: 1156834
URL: http://svn.apache.org/viewvc?rev=1156834&view=rev
Log:
PIG-2174: HBaseStorage column filters miss some fields
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1156834&r1=1156833&r2=1156834&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Aug 11 21:15:49 2011
@@ -105,6 +105,8 @@ PIG-2011: Speed up TestTypedMap.java (dv
BUG FIXES
+PIG-2174: HBaseStorage column filters miss some fields (billgraham via dvryaboy)
+
PIG-2183: Pig not working with Hadoop 0.20.203.0 (daijy)
PIG-2090: re-enable TestGrunt test cases (thejas)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1156834&r1=1156833&r2=1156834&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Thu Aug 11 21:15:49 2011
@@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.client.Sc
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.FamilyFilter;
import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
@@ -334,12 +335,17 @@ public class HBaseStorage extends LoadFu
// apply any column filters
FilterList allColumnFilters = null;
for (ColumnInfo colInfo : columnInfo_) {
- if (colInfo.isColumnMap() && colInfo.getColumnPrefix() != null) {
+ // all column family filters roll up to one parent OR filter
+ if (allColumnFilters == null) {
+ allColumnFilters = new FilterList(FilterList.Operator.MUST_PASS_ONE);
+ }
- // all column family filters roll up to one parent OR filter
- if (allColumnFilters == null) {
- allColumnFilters = new FilterList(FilterList.Operator.MUST_PASS_ONE);
- }
+ // and each filter contains a column family filter
+ FilterList thisColumnFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL);
+ thisColumnFilter.addFilter(new FamilyFilter(CompareOp.EQUAL,
+ new BinaryComparator(colInfo.getColumnFamily())));
+
+ if (colInfo.isColumnMap()) {
if (LOG.isInfoEnabled()) {
LOG.info("Adding family:prefix filters with values " +
@@ -347,15 +353,28 @@ public class HBaseStorage extends LoadFu
Bytes.toString(colInfo.getColumnPrefix()));
}
- // each column family filter consists of a FamilyFilter AND a PrefixFilter
- FilterList thisColumnFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL);
- thisColumnFilter.addFilter(new FamilyFilter(CompareOp.EQUAL,
- new BinaryComparator(colInfo.getColumnFamily())));
- thisColumnFilter.addFilter(new ColumnPrefixFilter(
+ // each column map filter consists of a FamilyFilter AND
+ // optionally a PrefixFilter
+ if (colInfo.getColumnPrefix() != null) {
+ thisColumnFilter.addFilter(new ColumnPrefixFilter(
colInfo.getColumnPrefix()));
+ }
+ }
+ else {
- allColumnFilters.addFilter(thisColumnFilter);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Adding family:descriptor filters with values " +
+ Bytes.toString(colInfo.getColumnFamily()) + COLON +
+ Bytes.toString(colInfo.getColumnName()));
+ }
+
+ // each column value filter consists of a FamilyFilter AND
+ // a QualifierFilter
+ thisColumnFilter.addFilter(new QualifierFilter(CompareOp.EQUAL,
+ new BinaryComparator(colInfo.getColumnName())));
}
+
+ allColumnFilters.addFilter(thisColumnFilter);
}
if (allColumnFilters != null) {
@@ -374,7 +393,7 @@ public class HBaseStorage extends LoadFu
private void addFilter(Filter filter) {
FilterList scanFilter = (FilterList) scan.getFilter();
if (scanFilter == null) {
- scanFilter = new FilterList();
+ scanFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL);
}
scanFilter.addFilter(filter);
scan.setFilter(scanFilter);
Modified: pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java?rev=1156834&r1=1156833&r2=1156834&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java Thu Aug 11 21:15:49 2011
@@ -166,7 +166,7 @@ public class TestHBaseStorage {
Assert.assertEquals(count + 0.0, Double.parseDouble(col_b), 1e-6);
Assert.assertEquals("Text_" + count, col_c);
- Assert.assertEquals(4, pig_cf_map.size());
+ Assert.assertEquals(5, pig_cf_map.size());
Assert.assertEquals(count,
Integer.parseInt(pig_cf_map.get("col_a").toString()));
Assert.assertEquals(count + 0.0,
@@ -263,6 +263,87 @@ public class TestHBaseStorage {
LOG.info("LoadFromHBase done");
}
+ /**
+ * * Test Load from hbase with map parameters and column prefix with a
+ * static column
+ *
+ */
+ @Test
+ public void testLoadWithFixedAndPrefixedCols() throws IOException {
+ prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
+
+ //NOTE: I think there is some strangeness in how HBase handles column
+ // filters. I was only able to reproduce a bug related to missing column
+ // values in the response when I used 'sc' as a column name, instead of
+ // 'col_a' as I use below.
+ pig.registerQuery("a = load 'hbase://"
+ + TESTTABLE_1
+ + "' using "
+ + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+ + "pig:sc pig:prefixed_col_*"
+ + "','-loadKey') as (rowKey:chararray, sc:chararray, pig_cf_map:map[]);");
+ Iterator<Tuple> it = pig.openIterator("a");
+ int count = 0;
+ LOG.info("LoadFromHBase Starting");
+ while (it.hasNext()) {
+ Tuple t = it.next();
+ LOG.info("LoadFromHBase " + t);
+ String rowKey = (String) t.get(0);
+ String col_a = t.get(1) != null ? t.get(1).toString() : null;
+ Map pig_cf_map = (Map) t.get(2);
+ Assert.assertEquals(3, t.size());
+
+ Assert.assertEquals("00".substring((count + "").length()) + count,
+ rowKey);
+ Assert.assertEquals("PrefixedText_" + count,
+ ((DataByteArray) pig_cf_map.get("prefixed_col_d")).toString());
+ Assert.assertEquals(1, pig_cf_map.size());
+ Assert.assertEquals(Integer.toString(count), col_a);
+
+ count++;
+ }
+ Assert.assertEquals(TEST_ROW_COUNT, count);
+ LOG.info("LoadFromHBase done");
+ }
+
+ /**
+ * * Test Load from hbase with map parameters and with a
+ * static column
+ *
+ */
+ @Test
+ public void testLoadWithFixedAndPrefixedCols2() throws IOException {
+ prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
+
+ pig.registerQuery("a = load 'hbase://"
+ + TESTTABLE_1
+ + "' using "
+ + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+ + "pig:col_a pig:prefixed_col_*"
+ + "','-loadKey') as (rowKey:chararray, col_a:chararray, pig_cf_map:map[]);");
+ Iterator<Tuple> it = pig.openIterator("a");
+ int count = 0;
+ LOG.info("LoadFromHBase Starting");
+ while (it.hasNext()) {
+ Tuple t = it.next();
+ LOG.info("LoadFromHBase " + t);
+ String rowKey = (String) t.get(0);
+ String col_a = t.get(1) != null ? t.get(1).toString() : null;
+ Map pig_cf_map = (Map) t.get(2);
+ Assert.assertEquals(3, t.size());
+
+ Assert.assertEquals("00".substring((count + "").length()) + count,
+ rowKey);
+ Assert.assertEquals("PrefixedText_" + count,
+ ((DataByteArray) pig_cf_map.get("prefixed_col_d")).toString());
+ Assert.assertEquals(1, pig_cf_map.size());
+ Assert.assertEquals(Integer.toString(count), col_a);
+
+ count++;
+ }
+ Assert.assertEquals(TEST_ROW_COUNT, count);
+ LOG.info("LoadFromHBase done");
+ }
/**
* load from hbase test
@@ -810,6 +891,9 @@ public class TestHBaseStorage {
// row key: string type
Put put = new Put(Bytes.toBytes("00".substring(v.length())
+ v));
+ // sc: int type
+ put.add(COLUMNFAMILY, Bytes.toBytes("sc"),
+ Bytes.toBytes(i));
// col_a: int type
put.add(COLUMNFAMILY, Bytes.toBytes("col_a"),
Bytes.toBytes(i));
@@ -827,6 +911,9 @@ public class TestHBaseStorage {
// row key: string type
Put put = new Put(
("00".substring(v.length()) + v).getBytes());
+ // sc: int type
+ put.add(COLUMNFAMILY, Bytes.toBytes("sc"),
+ (i + "").getBytes()); // int
// col_a: int type
put.add(COLUMNFAMILY, Bytes.toBytes("col_a"),
(i + "").getBytes()); // int