You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by dv...@apache.org on 2012/09/03 00:00:47 UTC
svn commit: r1380064 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
test/org/apache/pig/test/TestHBaseStorage.java
Author: dvryaboy
Date: Sun Sep 2 22:00:46 2012
New Revision: 1380064
URL: http://svn.apache.org/viewvc?rev=1380064&view=rev
Log:
PIG-2886: Add Scan TimeRange to HBaseStorage
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1380064&r1=1380063&r2=1380064&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sun Sep 2 22:00:46 2012
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-2886: Add Scan TimeRange to HBaseStorage (ted.m via dvryaboy)
+
PIG-2895: jodatime jar missing in pig-withouthadoop.jar (thejas)
PIG-2888: Improve performance of POPartialAgg (dvryaboy)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1380064&r1=1380063&r2=1380064&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Sun Sep 2 22:00:46 2012
@@ -162,6 +162,9 @@ public class HBaseStorage extends LoadFu
private final long limit_;
private final int caching_;
private final boolean noWAL_;
+ private final long minTimestamp_;
+ private final long maxTimestamp_;
+ private final long timestamp_;
protected transient byte[] gt_;
protected transient byte[] gte_;
@@ -187,6 +190,10 @@ public class HBaseStorage extends LoadFu
validOptions_.addOption("caster", true, "Caster to use for converting values. A class name, " +
"HBaseBinaryConverter, or Utf8StorageConverter. For storage, casters must implement LoadStoreCaster.");
validOptions_.addOption("noWAL", false, "Sets the write ahead to false for faster loading. To be used with extreme caution since this could result in data loss (see http://hbase.apache.org/book.html#perf.hbase.client.putwal).");
+ validOptions_.addOption("minTimestamp", true, "Record must have timestamp greater or equal to this value");
+ validOptions_.addOption("maxTimestamp", true, "Record must have timestamp less then this value");
+ validOptions_.addOption("timestamp", true, "Record must have timestamp equal to this value");
+
}
/**
@@ -225,6 +232,9 @@ public class HBaseStorage extends LoadFu
* <li>-ignoreWhitespace=(true|false) ignore spaces when parsing column names (default true)
* <li>-caching=numRows number of rows to cache (faster scans, more memory).
* <li>-noWAL=(true|false) Sets the write ahead to false for faster loading.
+ * <li>-minTimestamp= Scan's timestamp for min timeRange
+ * <li>-maxTimestamp= Scan's timestamp for max timeRange
+ * <li>-timestamp= Scan's specified timestamp
* To be used with extreme caution, since this could result in data loss
* (see http://hbase.apache.org/book.html#perf.hbase.client.putwal).
* </ul>
@@ -238,7 +248,7 @@ public class HBaseStorage extends LoadFu
configuredOptions_ = parser_.parse(validOptions_, optsArr);
} catch (ParseException e) {
HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp( "[-loadKey] [-gt] [-gte] [-lt] [-lte] [-columnPrefix] [-caching] [-caster] [-noWAL] [-limit] [-delim] [-ignoreWhitespace]", validOptions_ );
+ formatter.printHelp( "[-loadKey] [-gt] [-gte] [-lt] [-lte] [-columnPrefix] [-caching] [-caster] [-noWAL] [-limit] [-delim] [-ignoreWhitespace] [-minTimestamp] [-maxTimestamp] [-timestamp]", validOptions_ );
throw e;
}
@@ -281,6 +291,25 @@ public class HBaseStorage extends LoadFu
caching_ = Integer.valueOf(configuredOptions_.getOptionValue("caching", "100"));
limit_ = Long.valueOf(configuredOptions_.getOptionValue("limit", "-1"));
noWAL_ = configuredOptions_.hasOption("noWAL");
+
+ if (configuredOptions_.hasOption("minTimestamp")){
+ minTimestamp_ = Long.parseLong(configuredOptions_.getOptionValue("minTimestamp"));
+ } else {
+ minTimestamp_ = Long.MIN_VALUE;
+ }
+
+ if (configuredOptions_.hasOption("maxTimestamp")){
+ maxTimestamp_ = Long.parseLong(configuredOptions_.getOptionValue("maxTimestamp"));
+ } else {
+ maxTimestamp_ = Long.MAX_VALUE;
+ }
+
+ if (configuredOptions_.hasOption("timestamp")){
+ timestamp_ = Long.parseLong(configuredOptions_.getOptionValue("timestamp"));
+ } else {
+ timestamp_ = 0;
+ }
+
initScan();
}
@@ -337,7 +366,7 @@ public class HBaseStorage extends LoadFu
return columnInfo;
}
- private void initScan() {
+ private void initScan() throws IOException{
scan = new Scan();
// Map-reduce jobs should not run with cacheBlocks
@@ -360,7 +389,12 @@ public class HBaseStorage extends LoadFu
lte_ = Bytes.toBytesBinary(Utils.slashisize(configuredOptions_.getOptionValue("lte")));
addRowFilter(CompareOp.LESS_OR_EQUAL, lte_);
}
-
+ if (configuredOptions_.hasOption("minTimestamp") || configuredOptions_.hasOption("maxTimestamp")){
+ scan.setTimeRange(minTimestamp_, maxTimestamp_);
+ }
+ if (configuredOptions_.hasOption("timestamp")){
+ scan.setTimeStamp(timestamp_);
+ }
// apply any column filters
FilterList allColumnFilters = null;
for (ColumnInfo colInfo : columnInfo_) {
Modified: pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java?rev=1380064&r1=1380063&r2=1380064&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java Sun Sep 2 22:00:46 2012
@@ -30,12 +30,14 @@ import org.apache.hadoop.hbase.MiniHBase
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.hbase.HBaseStorage;
import org.apache.pig.data.DataByteArray;
@@ -183,6 +185,95 @@ public class TestHBaseStorage {
}
/**
+ * Test Load from hbase with maxTimestamp, minTimestamp, timestamp
+ *
+ */
+ @Test
+ public void testLoadWithSpecifiedTimestampAndRanges() throws IOException {
+ long beforeTimeStamp = System.currentTimeMillis() - 10;
+
+ HTable table = prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
+
+ long afterTimeStamp = System.currentTimeMillis() + 10;
+
+ Assert.assertEquals("MaxTimestamp is set before rows added", 0, queryWithTimestamp(null , beforeTimeStamp, null));
+
+ Assert.assertEquals("MaxTimestamp is set after rows added", TEST_ROW_COUNT, queryWithTimestamp(null, afterTimeStamp, null));
+
+ Assert.assertEquals("MinTimestamp is set after rows added", 0, queryWithTimestamp(afterTimeStamp, null, null));
+
+ Assert.assertEquals("MinTimestamp is set before rows added", TEST_ROW_COUNT, queryWithTimestamp(beforeTimeStamp, null, null));
+
+ Assert.assertEquals("Timestamp range is set around rows added", TEST_ROW_COUNT, queryWithTimestamp(beforeTimeStamp, afterTimeStamp, null));
+
+ Assert.assertEquals("Timestamp range is set after rows added", 0, queryWithTimestamp(afterTimeStamp, afterTimeStamp + 10, null));
+
+ Assert.assertEquals("Timestamp range is set before rows added", 0, queryWithTimestamp(beforeTimeStamp - 10, beforeTimeStamp, null));
+
+ Assert.assertEquals("Timestamp is set before rows added", 0, queryWithTimestamp(null, null, beforeTimeStamp));
+
+ Assert.assertEquals("Timestamp is set after rows added", 0, queryWithTimestamp(null, null, afterTimeStamp));
+
+ long specifiedTimestamp = table.get(new Get(Bytes.toBytes("00"))).getColumnLatest(COLUMNFAMILY, Bytes.toBytes("col_a")).getTimestamp();
+
+ Assert.assertTrue("Timestamp is set equals to row 01", queryWithTimestamp(null, null, specifiedTimestamp) > 0);
+
+
+ LOG.info("LoadFromHBase done");
+ }
+
+ private int queryWithTimestamp(Long minTimestamp, Long maxTimestamp, Long timestamp) throws IOException,
+ ExecException {
+
+ StringBuilder extraParams = new StringBuilder();
+
+ if (minTimestamp != null){
+ extraParams.append(" -minTimestamp " + minTimestamp + " ");
+ }
+
+ if (maxTimestamp != null){
+ extraParams.append(" -maxTimestamp " + maxTimestamp + " ");
+ }
+
+ if (timestamp != null){
+ extraParams.append(" -timestamp " + timestamp + " ");
+ }
+
+
+ pig.registerQuery("a = load 'hbase://"
+ + TESTTABLE_1
+ + "' using "
+ + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+ + TESTCOLUMN_A
+ + " "
+ + TESTCOLUMN_B
+ + " "
+ + TESTCOLUMN_C
+ + " pig:"
+ + "','-loadKey " + extraParams.toString() + "') as (rowKey, col_a, col_b, col_c);");
+ Iterator<Tuple> it = pig.openIterator("a");
+ int count = 0;
+ LOG.info("LoadFromHBase Starting");
+ while (it.hasNext()) {
+ Tuple t = it.next();
+ LOG.info("LoadFromHBase " + t);
+ String rowKey = ((DataByteArray) t.get(0)).toString();
+ String col_a = ((DataByteArray) t.get(1)).toString();
+ String col_b = ((DataByteArray) t.get(2)).toString();
+ String col_c = ((DataByteArray) t.get(3)).toString();
+
+ Assert.assertEquals("00".substring((count + "").length()) + count,
+ rowKey);
+ Assert.assertEquals(count, Integer.parseInt(col_a));
+ Assert.assertEquals(count + 0.0, Double.parseDouble(col_b), 1e-6);
+ Assert.assertEquals("Text_" + count, col_c);
+
+ count++;
+ }
+ return count;
+ }
+
+ /**
* * Test Load from hbase with map parameters and column prefix
*
*/
@@ -905,7 +996,7 @@ public class TestHBaseStorage {
* Prepare a table in hbase for testing.
*
*/
- private void prepareTable(String tableName, boolean initData,
+ private HTable prepareTable(String tableName, boolean initData,
DataFormat format) throws IOException {
// define the table schema
HTable table = null;
@@ -968,6 +1059,7 @@ public class TestHBaseStorage {
}
table.flushCommits();
}
+ return table;
}
/**