You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2013/11/10 07:17:03 UTC
svn commit: r1540450 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
test/org/apache/pig/test/TestHBaseStorage.java
Author: cheolsoo
Date: Sun Nov 10 06:17:03 2013
New Revision: 1540450
URL: http://svn.apache.org/r1540450
Log:
PIG-3388: No support for Regex for row filter in
org.apache.pig.backend.hadoop.hbase.HBaseStorage (lbendig via cheolsoo)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1540450&r1=1540449&r2=1540450&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sun Nov 10 06:17:03 2013
@@ -28,6 +28,8 @@ PIG-3419: Pluggable Execution Engine (ac
IMPROVEMENTS
+PIG-3388: No support for Regex for row filter in org.apache.pig.backend.hadoop.hbase.HBaseStorage (lbendig via cheolsoo)
+
PIG-3522: Remove shock from pig (daijy)
PIG-3295: Casting from bytearray failing after Union even when each field is from a single Loader (knoguchi)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1540450&r1=1540449&r2=1540450&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Sun Nov 10 06:17:03 2013
@@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.filter.Co
import org.apache.hadoop.hbase.filter.FamilyFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.WhileMatchFilter;
import org.apache.hadoop.hbase.filter.QualifierFilter;
@@ -173,6 +174,7 @@ public class HBaseStorage extends LoadFu
protected transient byte[] lt_;
protected transient byte[] lte_;
+ private String regex_;
private LoadCaster caster_;
private ResourceSchema schema_;
@@ -185,6 +187,7 @@ public class HBaseStorage extends LoadFu
validOptions_.addOption("lt", true, "Records must be less than this value (binary, double-slash-escaped)");
validOptions_.addOption("gte", true, "Records must be greater than or equal to this value");
validOptions_.addOption("lte", true, "Records must be less than or equal to this value");
+ validOptions_.addOption("regex", true, "Record must match this regular expression");
validOptions_.addOption("caching", true, "Number of rows scanners should cache");
validOptions_.addOption("limit", true, "Per-region limit");
validOptions_.addOption("delim", true, "Column delimiter");
@@ -229,6 +232,7 @@ public class HBaseStorage extends LoadFu
* <li>-lt=maxKeyVal
* <li>-gte=minKeyVal
* <li>-lte=maxKeyVal
+ * <li>-regex=match regex on KeyVal
* <li>-limit=numRowsPerRegion max number of rows to retrieve per region
* <li>-delim=char delimiter to use when parsing column names (default is space or comma)
* <li>-ignoreWhitespace=(true|false) ignore spaces when parsing column names (default true)
@@ -251,7 +255,7 @@ public class HBaseStorage extends LoadFu
configuredOptions_ = parser_.parse(validOptions_, optsArr);
} catch (ParseException e) {
HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp( "[-loadKey] [-gt] [-gte] [-lt] [-lte] [-columnPrefix] [-caching] [-caster] [-noWAL] [-limit] [-delim] [-ignoreWhitespace] [-minTimestamp] [-maxTimestamp] [-timestamp]", validOptions_ );
+ formatter.printHelp( "[-loadKey] [-gt] [-gte] [-lt] [-lte] [-regex] [-columnPrefix] [-caching] [-caster] [-noWAL] [-limit] [-delim] [-ignoreWhitespace] [-minTimestamp] [-maxTimestamp] [-timestamp]", validOptions_ );
throw e;
}
@@ -407,6 +411,10 @@ public class HBaseStorage extends LoadFu
// setStopRow call will limit the number of regions we need to scan
addFilter(new WhileMatchFilter(new RowFilter(CompareOp.LESS_OR_EQUAL, new BinaryComparator(lte_))));
}
+ if (configuredOptions_.hasOption("regex")) {
+ regex_ = Utils.slashisize(configuredOptions_.getOptionValue("regex"));
+ addFilter(new RowFilter(CompareOp.EQUAL, new RegexStringComparator(regex_)));
+ }
if (configuredOptions_.hasOption("minTimestamp") || configuredOptions_.hasOption("maxTimestamp")){
scan.setTimeRange(minTimestamp_, maxTimestamp_);
}
@@ -437,7 +445,7 @@ public class HBaseStorage extends LoadFu
* addFamily on the scan
*/
private void addFiltersWithoutColumnPrefix(List<ColumnInfo> columnInfos) {
- // Need to check for mixed types in a family, so we don't call addColumn
+ // Need to check for mixed types in a family, so we don't call addColumn
// after addFamily on the same family
Map<String, List<ColumnInfo>> groupedMap = groupByFamily(columnInfos);
for (Entry<String, List<ColumnInfo>> entrySet : groupedMap.entrySet()) {
@@ -455,7 +463,7 @@ public class HBaseStorage extends LoadFu
+ Bytes.toString(columnInfo.getColumnFamily()) + ":"
+ Bytes.toString(columnInfo.getColumnName()));
}
- scan.addColumn(columnInfo.getColumnFamily(), columnInfo.getColumnName());
+ scan.addColumn(columnInfo.getColumnFamily(), columnInfo.getColumnName());
}
} else {
String family = entrySet.getKey();
@@ -463,7 +471,7 @@ public class HBaseStorage extends LoadFu
LOG.debug("Adding column family to scan via addFamily with cf:name = "
+ family);
}
- scan.addFamily(Bytes.toBytes(family));
+ scan.addFamily(Bytes.toBytes(family));
}
}
}
Modified: pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java?rev=1540450&r1=1540449&r2=1540450&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java Sun Nov 10 06:17:03 2013
@@ -71,7 +71,6 @@ public class TestHBaseStorage {
private static final String TESTCOLUMN_A = "pig:col_a";
private static final String TESTCOLUMN_B = "pig:col_b";
private static final String TESTCOLUMN_C = "pig:col_c";
- private static final String TESTCOLUMN_D = "pig:prefixed_col_d";
private static final int TEST_ROW_COUNT = 100;
@@ -694,6 +693,90 @@ public class TestHBaseStorage {
LOG.info("LoadFromHBaseWithParameters_3 Starting");
}
+ /**
+ * Test Load from hbase with parameters regex [2-3][4-5]
+ *
+ */
+ @Test
+ public void testLoadWithParameters_4() throws IOException {
+ prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
+
+ pig.registerQuery("a = load 'hbase://"
+ + TESTTABLE_1
+ + "' using "
+ + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+ + TESTCOLUMN_A
+ + " "
+ + TESTCOLUMN_B
+ + " "
+ + TESTCOLUMN_C
+ + "','-loadKey -regex [2-3][4-5]') as (rowKey,col_a, col_b, col_c);");
+ Iterator<Tuple> it = pig.openIterator("a");
+
+ int[] expectedValues = {24, 25, 34, 35};
+ int count = 0;
+ int countExpected = 4;
+ LOG.info("LoadFromHBaseWithParameters_4 Starting");
+ while (it.hasNext()) {
+ Tuple t = it.next();
+ LOG.info("LoadFromHBase " + t);
+ String rowKey = ((DataByteArray) t.get(0)).toString();
+ String col_a = ((DataByteArray) t.get(1)).toString();
+ String col_b = ((DataByteArray) t.get(2)).toString();
+ String col_c = ((DataByteArray) t.get(3)).toString();
+
+ Assert.assertEquals(expectedValues[count] + "", rowKey);
+ Assert.assertEquals(expectedValues[count], Integer.parseInt(col_a));
+ Assert.assertEquals((double) expectedValues[count], Double.parseDouble(col_b), 1e-6);
+ Assert.assertEquals("Text_" + expectedValues[count], col_c);
+
+ count++;
+ }
+ Assert.assertEquals(countExpected, count);
+ LOG.info("LoadFromHBaseWithParameters_4 done");
+ }
+
+ /**
+ * Test Load from hbase with parameters lt and gt (10<key<30) and regex \\d[5]
+ */
+ @Test
+ public void testLoadWithParameters_5() throws IOException {
+ prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
+
+ pig.registerQuery("a = load 'hbase://"
+ + TESTTABLE_1
+ + "' using "
+ + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+ + TESTCOLUMN_A
+ + " "
+ + TESTCOLUMN_B
+ + " "
+ + TESTCOLUMN_C
+ + "','-loadKey -gt 10 -lt 30 -regex \\\\d[5]') as (rowKey,col_a, col_b, col_c);");
+ Iterator<Tuple> it = pig.openIterator("a");
+
+ int[] expectedValues = {15, 25};
+ int count = 0;
+ int countExpected = 2;
+ LOG.info("LoadFromHBaseWithParameters_5 Starting");
+ while (it.hasNext()) {
+ Tuple t = it.next();
+ LOG.info("LoadFromHBase " + t);
+ String rowKey = ((DataByteArray) t.get(0)).toString();
+ String col_a = ((DataByteArray) t.get(1)).toString();
+ String col_b = ((DataByteArray) t.get(2)).toString();
+ String col_c = ((DataByteArray) t.get(3)).toString();
+
+ Assert.assertEquals(expectedValues[count] + "", rowKey);
+ Assert.assertEquals(expectedValues[count], Integer.parseInt(col_a));
+ Assert.assertEquals((double) expectedValues[count], Double.parseDouble(col_b), 1e-6);
+ Assert.assertEquals("Text_" + expectedValues[count], col_c);
+
+ count++;
+ }
+ Assert.assertEquals(countExpected, count);
+ LOG.info("LoadFromHBaseWithParameters_5 done");
+ }
/**
* Test Load from hbase with projection.
@@ -1112,7 +1195,6 @@ public class TestHBaseStorage {
*/
private static byte[] getColValue(Result result, String colName) {
byte[][] colArray = Bytes.toByteArrays(colName.split(":"));
- byte[] val = result.getValue(colArray[0], colArray[1]);
return result.getValue(colArray[0], colArray[1]);
}