You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2013/11/10 07:17:03 UTC

svn commit: r1540450 - in /pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java test/org/apache/pig/test/TestHBaseStorage.java

Author: cheolsoo
Date: Sun Nov 10 06:17:03 2013
New Revision: 1540450

URL: http://svn.apache.org/r1540450
Log:
PIG-3388: No support for Regex for row filter in
org.apache.pig.backend.hadoop.hbase.HBaseStorage (lbendig via cheolsoo)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
    pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1540450&r1=1540449&r2=1540450&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sun Nov 10 06:17:03 2013
@@ -28,6 +28,8 @@ PIG-3419: Pluggable Execution Engine (ac
 
 IMPROVEMENTS
 
+PIG-3388: No support for Regex for row filter in org.apache.pig.backend.hadoop.hbase.HBaseStorage (lbendig via cheolsoo)
+
 PIG-3522: Remove shock from pig (daijy)
 
 PIG-3295: Casting from bytearray failing after Union even when each field is from a single Loader (knoguchi)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1540450&r1=1540449&r2=1540450&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Sun Nov 10 06:17:03 2013
@@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.filter.Co
 import org.apache.hadoop.hbase.filter.FamilyFilter;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
 import org.apache.hadoop.hbase.filter.RowFilter;
 import org.apache.hadoop.hbase.filter.WhileMatchFilter;
 import org.apache.hadoop.hbase.filter.QualifierFilter;
@@ -173,6 +174,7 @@ public class HBaseStorage extends LoadFu
     protected transient byte[] lt_;
     protected transient byte[] lte_;
 
+    private String regex_;
     private LoadCaster caster_;
 
     private ResourceSchema schema_;
@@ -185,6 +187,7 @@ public class HBaseStorage extends LoadFu
         validOptions_.addOption("lt", true, "Records must be less than this value (binary, double-slash-escaped)");
         validOptions_.addOption("gte", true, "Records must be greater than or equal to this value");
         validOptions_.addOption("lte", true, "Records must be less than or equal to this value");
+        validOptions_.addOption("regex", true, "Record must match this regular expression");
         validOptions_.addOption("caching", true, "Number of rows scanners should cache");
         validOptions_.addOption("limit", true, "Per-region limit");
         validOptions_.addOption("delim", true, "Column delimiter");
@@ -229,6 +232,7 @@ public class HBaseStorage extends LoadFu
      * <li>-lt=maxKeyVal
      * <li>-gte=minKeyVal
      * <li>-lte=maxKeyVal
+     * <li>-regex=match regex on KeyVal
      * <li>-limit=numRowsPerRegion max number of rows to retrieve per region
      * <li>-delim=char delimiter to use when parsing column names (default is space or comma)
      * <li>-ignoreWhitespace=(true|false) ignore spaces when parsing column names (default true)
@@ -251,7 +255,7 @@ public class HBaseStorage extends LoadFu
             configuredOptions_ = parser_.parse(validOptions_, optsArr);
         } catch (ParseException e) {
             HelpFormatter formatter = new HelpFormatter();
-            formatter.printHelp( "[-loadKey] [-gt] [-gte] [-lt] [-lte] [-columnPrefix] [-caching] [-caster] [-noWAL] [-limit] [-delim] [-ignoreWhitespace] [-minTimestamp] [-maxTimestamp] [-timestamp]", validOptions_ );
+            formatter.printHelp( "[-loadKey] [-gt] [-gte] [-lt] [-lte] [-regex] [-columnPrefix] [-caching] [-caster] [-noWAL] [-limit] [-delim] [-ignoreWhitespace] [-minTimestamp] [-maxTimestamp] [-timestamp]", validOptions_ );
             throw e;
         }
 
@@ -407,6 +411,10 @@ public class HBaseStorage extends LoadFu
             // setStopRow call will limit the number of regions we need to scan
             addFilter(new WhileMatchFilter(new RowFilter(CompareOp.LESS_OR_EQUAL, new BinaryComparator(lte_))));
         }
+        if (configuredOptions_.hasOption("regex")) {
+            regex_ = Utils.slashisize(configuredOptions_.getOptionValue("regex"));
+            addFilter(new RowFilter(CompareOp.EQUAL, new RegexStringComparator(regex_)));
+        }
         if (configuredOptions_.hasOption("minTimestamp") || configuredOptions_.hasOption("maxTimestamp")){
             scan.setTimeRange(minTimestamp_, maxTimestamp_);
         }
@@ -437,7 +445,7 @@ public class HBaseStorage extends LoadFu
      * addFamily on the scan
      */
     private void addFiltersWithoutColumnPrefix(List<ColumnInfo> columnInfos) {
-        // Need to check for mixed types in a family, so we don't call addColumn 
+        // Need to check for mixed types in a family, so we don't call addColumn
         // after addFamily on the same family
         Map<String, List<ColumnInfo>> groupedMap = groupByFamily(columnInfos);
         for (Entry<String, List<ColumnInfo>> entrySet : groupedMap.entrySet()) {
@@ -455,7 +463,7 @@ public class HBaseStorage extends LoadFu
                                 + Bytes.toString(columnInfo.getColumnFamily()) + ":"
                                 + Bytes.toString(columnInfo.getColumnName()));
                     }
-                    scan.addColumn(columnInfo.getColumnFamily(), columnInfo.getColumnName());                    
+                    scan.addColumn(columnInfo.getColumnFamily(), columnInfo.getColumnName());
                 }
             } else {
                 String family = entrySet.getKey();
@@ -463,7 +471,7 @@ public class HBaseStorage extends LoadFu
                     LOG.debug("Adding column family to scan via addFamily with cf:name = "
                             + family);
                 }
-                scan.addFamily(Bytes.toBytes(family));                
+                scan.addFamily(Bytes.toBytes(family));
             }
         }
     }

Modified: pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java?rev=1540450&r1=1540449&r2=1540450&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java Sun Nov 10 06:17:03 2013
@@ -71,7 +71,6 @@ public class TestHBaseStorage {
     private static final String TESTCOLUMN_A = "pig:col_a";
     private static final String TESTCOLUMN_B = "pig:col_b";
     private static final String TESTCOLUMN_C = "pig:col_c";
-    private static final String TESTCOLUMN_D = "pig:prefixed_col_d";
 
     private static final int TEST_ROW_COUNT = 100;
 
@@ -694,6 +693,90 @@ public class TestHBaseStorage {
         LOG.info("LoadFromHBaseWithParameters_3 Starting");
     }
 
+    /**
+     * Test Load from hbase with parameters regex [2-3][4-5]
+     *
+     */
+    @Test
+    public void testLoadWithParameters_4() throws IOException {
+        prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
+
+        pig.registerQuery("a = load 'hbase://"
+                + TESTTABLE_1
+                + "' using "
+                + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+                + TESTCOLUMN_A
+                + " "
+                + TESTCOLUMN_B
+                + " "
+                + TESTCOLUMN_C
+                + "','-loadKey -regex [2-3][4-5]') as (rowKey,col_a, col_b, col_c);");
+        Iterator<Tuple> it = pig.openIterator("a");
+
+        int[] expectedValues = {24, 25, 34, 35};
+        int count = 0;
+        int countExpected = 4;
+        LOG.info("LoadFromHBaseWithParameters_4 Starting");
+        while (it.hasNext()) {
+            Tuple t = it.next();
+            LOG.info("LoadFromHBase " + t);
+            String rowKey = ((DataByteArray) t.get(0)).toString();
+            String col_a = ((DataByteArray) t.get(1)).toString();
+            String col_b = ((DataByteArray) t.get(2)).toString();
+            String col_c = ((DataByteArray) t.get(3)).toString();
+
+            Assert.assertEquals(expectedValues[count] + "", rowKey);
+            Assert.assertEquals(expectedValues[count], Integer.parseInt(col_a));
+            Assert.assertEquals((double) expectedValues[count], Double.parseDouble(col_b), 1e-6);
+            Assert.assertEquals("Text_" + expectedValues[count], col_c);
+
+            count++;
+        }
+        Assert.assertEquals(countExpected, count);
+        LOG.info("LoadFromHBaseWithParameters_4 done");
+    }
+
+    /**
+     * Test Load from hbase with parameters lt and gt (10&lt;key&lt;30) and regex \\d[5]
+     */
+    @Test
+    public void testLoadWithParameters_5() throws IOException {
+        prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
+
+        pig.registerQuery("a = load 'hbase://"
+                + TESTTABLE_1
+                + "' using "
+                + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+                + TESTCOLUMN_A
+                + " "
+                + TESTCOLUMN_B
+                + " "
+                + TESTCOLUMN_C
+                + "','-loadKey -gt 10 -lt 30 -regex \\\\d[5]') as (rowKey,col_a, col_b, col_c);");
+        Iterator<Tuple> it = pig.openIterator("a");
+
+        int[] expectedValues = {15, 25};
+        int count = 0;
+        int countExpected = 2;
+        LOG.info("LoadFromHBaseWithParameters_5 Starting");
+        while (it.hasNext()) {
+            Tuple t = it.next();
+            LOG.info("LoadFromHBase " + t);
+            String rowKey = ((DataByteArray) t.get(0)).toString();
+            String col_a = ((DataByteArray) t.get(1)).toString();
+            String col_b = ((DataByteArray) t.get(2)).toString();
+            String col_c = ((DataByteArray) t.get(3)).toString();
+
+            Assert.assertEquals(expectedValues[count] + "", rowKey);
+            Assert.assertEquals(expectedValues[count], Integer.parseInt(col_a));
+            Assert.assertEquals((double) expectedValues[count], Double.parseDouble(col_b), 1e-6);
+            Assert.assertEquals("Text_" + expectedValues[count], col_c);
+
+            count++;
+        }
+        Assert.assertEquals(countExpected, count);
+        LOG.info("LoadFromHBaseWithParameters_5 done");
+    }
 
     /**
      * Test Load from hbase with projection.
@@ -1112,7 +1195,6 @@ public class TestHBaseStorage {
      */
     private static byte[] getColValue(Result result, String colName) {
         byte[][] colArray = Bytes.toByteArrays(colName.split(":"));
-        byte[] val = result.getValue(colArray[0], colArray[1]);
         return result.getValue(colArray[0], colArray[1]);
 
     }