You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2015/09/22 01:04:35 UTC

svn commit: r1704439 - in /pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java test/org/apache/pig/test/TestHBaseStorageParams.java

Author: rohini
Date: Mon Sep 21 23:04:34 2015
New Revision: 1704439

URL: http://svn.apache.org/viewvc?rev=1704439&view=rev
Log:
PIG-4663: HBaseStorage should allow the MaxResultsPerColumnFamily limit to avoid memory or scan timeout issues (pmazak via rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
    pig/trunk/test/org/apache/pig/test/TestHBaseStorageParams.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1704439&r1=1704438&r2=1704439&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Sep 21 23:04:34 2015
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-4663: HBaseStorage should allow the MaxResultsPerColumnFamily limit to avoid memory or scan timeout issues (pmazak via rohini)
+
 PIG-4673: Built In UDF - REPLACE_MULTI : For a given string, search and replace all occurrences
  of search keys with replacement values (murali.k.h.rao@gmail.com via daijy)
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1704439&r1=1704438&r2=1704439&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Mon Sep 21 23:04:34 2015
@@ -204,6 +204,7 @@ public class HBaseStorage extends LoadFu
         validOptions_.addOption("cacheBlocks", true, "Set whether blocks should be cached for the scan");
         validOptions_.addOption("caching", true, "Number of rows scanners should cache");
         validOptions_.addOption("limit", true, "Per-region limit");
+        validOptions_.addOption("maxResultsPerColumnFamily", true, "Limit the maximum number of values returned per row per column family");
         validOptions_.addOption("delim", true, "Column delimiter");
         validOptions_.addOption("ignoreWhitespace", true, "Ignore spaces when parsing columns");
         validOptions_.addOption("caster", true, "Caster to use for converting values. A class name, " +
@@ -250,6 +251,7 @@ public class HBaseStorage extends LoadFu
      * <li>-lte=maxKeyVal
      * <li>-regex=match regex on KeyVal
      * <li>-limit=numRowsPerRegion max number of rows to retrieve per region
+     * <li>-maxResultsPerColumnFamily= Limit the maximum number of values returned per row per column family
      * <li>-delim=char delimiter to use when parsing column names (default is space or comma)
      * <li>-ignoreWhitespace=(true|false) ignore spaces when parsing column names (default true)
      * <li>-cacheBlocks=(true|false) Set whether blocks should be cached for the scan (default false).
@@ -274,7 +276,7 @@ public class HBaseStorage extends LoadFu
             configuredOptions_ = parser_.parse(validOptions_, optsArr);
         } catch (ParseException e) {
             HelpFormatter formatter = new HelpFormatter();
-            formatter.printHelp( "[-loadKey] [-gt] [-gte] [-lt] [-lte] [-regex] [-cacheBlocks] [-caching] [-caster] [-noWAL] [-limit] [-delim] [-ignoreWhitespace] [-minTimestamp] [-maxTimestamp] [-timestamp] [-includeTimestamp] [-includeTombstone]", validOptions_ );
+            formatter.printHelp( "[-loadKey] [-gt] [-gte] [-lt] [-lte] [-regex] [-cacheBlocks] [-caching] [-caster] [-noWAL] [-limit] [-maxResultsPerColumnFamily] [-delim] [-ignoreWhitespace] [-minTimestamp] [-maxTimestamp] [-timestamp] [-includeTimestamp] [-includeTombstone]", validOptions_ );
             throw e;
         }
 
@@ -468,6 +470,10 @@ public class HBaseStorage extends LoadFu
         if (configuredOptions_.hasOption("timestamp")){
             scan.setTimeStamp(timestamp_);
         }
+        if (configuredOptions_.hasOption("maxResultsPerColumnFamily")){
+            int maxResultsPerColumnFamily_ = Integer.valueOf(configuredOptions_.getOptionValue("maxResultsPerColumnFamily"));
+            scan.setMaxResultsPerColumnFamily(maxResultsPerColumnFamily_);
+        }
 
         // if the group of columnInfos for this family doesn't contain a prefix, we don't need
         // to set any filters, we can just call addColumn or addFamily. See javadocs below.

Modified: pig/trunk/test/org/apache/pig/test/TestHBaseStorageParams.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestHBaseStorageParams.java?rev=1704439&r1=1704438&r2=1704439&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestHBaseStorageParams.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestHBaseStorageParams.java Mon Sep 21 23:04:34 2015
@@ -19,6 +19,7 @@ package org.apache.pig.test;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.pig.backend.hadoop.hbase.HBaseStorage;
 import org.apache.pig.impl.util.UDFContext;
 import org.junit.Assert;
@@ -26,6 +27,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.util.Properties;
 
 public class TestHBaseStorageParams {
@@ -77,6 +79,23 @@ public class TestHBaseStorageParams {
       doColumnParseTest(storage, "foo:a", "foo:b ", " foo:c,d");
     }
 
+    /**
+     * Assert that -maxResultsPerColumnFamily actually gets set on Scan
+     */
+    @Test
+    public void testSetsMaxResultsPerColumnFamily() throws Exception {
+        Field scanField = HBaseStorage.class.getDeclaredField("scan");
+        scanField.setAccessible(true);
+
+        HBaseStorage storageNoMax = new HBaseStorage("", "");
+        Scan scan = (Scan)scanField.get(storageNoMax);
+        Assert.assertEquals(-1, scan.getMaxResultsPerColumnFamily());
+
+        HBaseStorage storageWithMax = new HBaseStorage("", "-maxResultsPerColumnFamily 123");
+        scan = (Scan)scanField.get(storageWithMax);
+        Assert.assertEquals(123, scan.getMaxResultsPerColumnFamily());
+    }
+
     private void doColumnParseTest(HBaseStorage storage, String... names) {
       Assert.assertEquals("Wrong column count",
         names.length, storage.getColumnInfoList().size());