You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by dv...@apache.org on 2012/09/03 00:00:47 UTC

svn commit: r1380064 - in /pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java test/org/apache/pig/test/TestHBaseStorage.java

Author: dvryaboy
Date: Sun Sep  2 22:00:46 2012
New Revision: 1380064

URL: http://svn.apache.org/viewvc?rev=1380064&view=rev
Log:
PIG-2886: Add Scan TimeRange to HBaseStorage

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
    pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1380064&r1=1380063&r2=1380064&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sun Sep  2 22:00:46 2012
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-2886: Add Scan TimeRange to HBaseStorage (ted.m via dvryaboy)
+
 PIG-2895: jodatime jar missing in pig-withouthadoop.jar  (thejas)
 
 PIG-2888: Improve performance of POPartialAgg (dvryaboy)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1380064&r1=1380063&r2=1380064&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Sun Sep  2 22:00:46 2012
@@ -162,6 +162,9 @@ public class HBaseStorage extends LoadFu
     private final long limit_;
     private final int caching_;
     private final boolean noWAL_;
+    private final long minTimestamp_;
+    private final long maxTimestamp_;
+    private final long timestamp_;
 
     protected transient byte[] gt_;
     protected transient byte[] gte_;
@@ -187,6 +190,10 @@ public class HBaseStorage extends LoadFu
         validOptions_.addOption("caster", true, "Caster to use for converting values. A class name, " +
                 "HBaseBinaryConverter, or Utf8StorageConverter. For storage, casters must implement LoadStoreCaster.");
         validOptions_.addOption("noWAL", false, "Sets the write ahead to false for faster loading. To be used with extreme caution since this could result in data loss (see http://hbase.apache.org/book.html#perf.hbase.client.putwal).");
+        validOptions_.addOption("minTimestamp", true, "Record must have timestamp greater or equal to this value");
+        validOptions_.addOption("maxTimestamp", true, "Record must have timestamp less then this value");
+        validOptions_.addOption("timestamp", true, "Record must have timestamp equal to this value");
+
     }
 
     /**
@@ -225,6 +232,9 @@ public class HBaseStorage extends LoadFu
      * <li>-ignoreWhitespace=(true|false) ignore spaces when parsing column names (default true)
      * <li>-caching=numRows  number of rows to cache (faster scans, more memory).
      * <li>-noWAL=(true|false) Sets the write ahead to false for faster loading.
+     * <li>-minTimestamp= Scan's timestamp for min timeRange
+     * <li>-maxTimestamp= Scan's timestamp for max timeRange
+     * <li>-timestamp= Scan's specified timestamp
      * To be used with extreme caution, since this could result in data loss
      * (see http://hbase.apache.org/book.html#perf.hbase.client.putwal).
      * </ul>
@@ -238,7 +248,7 @@ public class HBaseStorage extends LoadFu
             configuredOptions_ = parser_.parse(validOptions_, optsArr);
         } catch (ParseException e) {
             HelpFormatter formatter = new HelpFormatter();
-            formatter.printHelp( "[-loadKey] [-gt] [-gte] [-lt] [-lte] [-columnPrefix] [-caching] [-caster] [-noWAL] [-limit] [-delim] [-ignoreWhitespace]", validOptions_ );
+            formatter.printHelp( "[-loadKey] [-gt] [-gte] [-lt] [-lte] [-columnPrefix] [-caching] [-caster] [-noWAL] [-limit] [-delim] [-ignoreWhitespace] [-minTimestamp] [-maxTimestamp] [-timestamp]", validOptions_ );
             throw e;
         }
 
@@ -281,6 +291,25 @@ public class HBaseStorage extends LoadFu
         caching_ = Integer.valueOf(configuredOptions_.getOptionValue("caching", "100"));
         limit_ = Long.valueOf(configuredOptions_.getOptionValue("limit", "-1"));
         noWAL_ = configuredOptions_.hasOption("noWAL");
+        
+        if (configuredOptions_.hasOption("minTimestamp")){
+            minTimestamp_ = Long.parseLong(configuredOptions_.getOptionValue("minTimestamp"));
+        } else {
+            minTimestamp_ = Long.MIN_VALUE;
+        }
+        
+        if (configuredOptions_.hasOption("maxTimestamp")){
+            maxTimestamp_ = Long.parseLong(configuredOptions_.getOptionValue("maxTimestamp"));
+        } else {
+            maxTimestamp_ = Long.MAX_VALUE;
+        }
+
+        if (configuredOptions_.hasOption("timestamp")){
+            timestamp_ = Long.parseLong(configuredOptions_.getOptionValue("timestamp"));
+        } else {
+            timestamp_ = 0;
+        }
+        
         initScan();	    
     }
 
@@ -337,7 +366,7 @@ public class HBaseStorage extends LoadFu
         return columnInfo;
     }
 
-    private void initScan() {
+    private void initScan() throws IOException{
         scan = new Scan();
 
         // Map-reduce jobs should not run with cacheBlocks
@@ -360,7 +389,12 @@ public class HBaseStorage extends LoadFu
             lte_ = Bytes.toBytesBinary(Utils.slashisize(configuredOptions_.getOptionValue("lte")));
             addRowFilter(CompareOp.LESS_OR_EQUAL, lte_);
         }
-
+        if (configuredOptions_.hasOption("minTimestamp") || configuredOptions_.hasOption("maxTimestamp")){
+            scan.setTimeRange(minTimestamp_, maxTimestamp_);
+        }
+        if (configuredOptions_.hasOption("timestamp")){
+        	scan.setTimeStamp(timestamp_);
+        }
         // apply any column filters
         FilterList allColumnFilters = null;
         for (ColumnInfo colInfo : columnInfo_) {

Modified: pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java?rev=1380064&r1=1380063&r2=1380064&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java Sun Sep  2 22:00:46 2012
@@ -30,12 +30,14 @@ import org.apache.hadoop.hbase.MiniHBase
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.hbase.HBaseStorage;
 import org.apache.pig.data.DataByteArray;
@@ -183,6 +185,95 @@ public class TestHBaseStorage {
     }
 
     /**
+     * Test Load from hbase with maxTimestamp, minTimestamp, timestamp
+     *
+     */
+    @Test
+    public void testLoadWithSpecifiedTimestampAndRanges() throws IOException {
+        long beforeTimeStamp = System.currentTimeMillis() - 10;
+        
+        HTable table = prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
+
+        long afterTimeStamp = System.currentTimeMillis() + 10;
+        
+        Assert.assertEquals("MaxTimestamp is set before rows added", 0, queryWithTimestamp(null , beforeTimeStamp, null));
+        
+        Assert.assertEquals("MaxTimestamp is set after rows added", TEST_ROW_COUNT, queryWithTimestamp(null, afterTimeStamp, null));
+        
+        Assert.assertEquals("MinTimestamp is set after rows added", 0, queryWithTimestamp(afterTimeStamp, null, null));
+        
+        Assert.assertEquals("MinTimestamp is set before rows added", TEST_ROW_COUNT, queryWithTimestamp(beforeTimeStamp, null, null));
+        
+        Assert.assertEquals("Timestamp range is set around rows added", TEST_ROW_COUNT, queryWithTimestamp(beforeTimeStamp, afterTimeStamp, null));
+        
+        Assert.assertEquals("Timestamp range is set after rows added", 0, queryWithTimestamp(afterTimeStamp, afterTimeStamp + 10, null));
+        
+        Assert.assertEquals("Timestamp range is set before rows added", 0, queryWithTimestamp(beforeTimeStamp - 10, beforeTimeStamp, null));
+
+        Assert.assertEquals("Timestamp is set before rows added", 0, queryWithTimestamp(null, null, beforeTimeStamp));
+
+        Assert.assertEquals("Timestamp is set after rows added", 0, queryWithTimestamp(null, null, afterTimeStamp));
+
+        long specifiedTimestamp = table.get(new Get(Bytes.toBytes("00"))).getColumnLatest(COLUMNFAMILY, Bytes.toBytes("col_a")).getTimestamp();
+        
+        Assert.assertTrue("Timestamp is set equals to row 01", queryWithTimestamp(null, null, specifiedTimestamp) > 0);
+         
+        
+        LOG.info("LoadFromHBase done");
+    }
+
+    private int queryWithTimestamp(Long minTimestamp, Long maxTimestamp, Long timestamp) throws IOException,
+            ExecException {
+        
+        StringBuilder extraParams = new StringBuilder();
+        
+        if (minTimestamp != null){
+            extraParams.append(" -minTimestamp " + minTimestamp + " ");
+        }
+        
+        if (maxTimestamp != null){
+            extraParams.append(" -maxTimestamp " + maxTimestamp + " ");
+        }
+
+        if (timestamp != null){
+            extraParams.append(" -timestamp " + timestamp + " ");
+        }
+        
+        
+        pig.registerQuery("a = load 'hbase://"
+                + TESTTABLE_1
+                + "' using "
+                + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+                + TESTCOLUMN_A
+                + " "
+                + TESTCOLUMN_B
+                + " "
+                + TESTCOLUMN_C
+                + " pig:"
+                + "','-loadKey " + extraParams.toString() + "') as (rowKey, col_a, col_b, col_c);");
+        Iterator<Tuple> it = pig.openIterator("a");
+        int count = 0;
+        LOG.info("LoadFromHBase Starting");
+        while (it.hasNext()) {
+            Tuple t = it.next();
+            LOG.info("LoadFromHBase " + t);
+            String rowKey = ((DataByteArray) t.get(0)).toString();
+            String col_a = ((DataByteArray) t.get(1)).toString();
+            String col_b = ((DataByteArray) t.get(2)).toString();
+            String col_c = ((DataByteArray) t.get(3)).toString();
+
+            Assert.assertEquals("00".substring((count + "").length()) + count,
+                    rowKey);
+            Assert.assertEquals(count, Integer.parseInt(col_a));
+            Assert.assertEquals(count + 0.0, Double.parseDouble(col_b), 1e-6);
+            Assert.assertEquals("Text_" + count, col_c);
+
+            count++;
+        }
+        return count;
+    }
+
+    /**
      *     * Test Load from hbase with map parameters and column prefix
      *
      */
@@ -905,7 +996,7 @@ public class TestHBaseStorage {
      * Prepare a table in hbase for testing.
      * 
      */
-    private void prepareTable(String tableName, boolean initData,
+    private HTable prepareTable(String tableName, boolean initData,
             DataFormat format) throws IOException {
         // define the table schema
         HTable table = null;
@@ -968,6 +1059,7 @@ public class TestHBaseStorage {
             }
             table.flushCommits();
         }
+        return table;
     }
 
     /**