You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by dv...@apache.org on 2011/04/16 23:31:41 UTC
svn commit: r1094057 - in /pig/branches/branch-0.8: CHANGES.txt
src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
test/org/apache/pig/test/TestHBaseStorage.java
Author: dvryaboy
Date: Sat Apr 16 21:31:40 2011
New Revision: 1094057
URL: http://svn.apache.org/viewvc?rev=1094057&view=rev
Log:
PIG-1870: HBaseStorage does not project correctly
Modified:
pig/branches/branch-0.8/CHANGES.txt
pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
pig/branches/branch-0.8/test/org/apache/pig/test/TestHBaseStorage.java
Modified: pig/branches/branch-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/CHANGES.txt?rev=1094057&r1=1094056&r2=1094057&view=diff
==============================================================================
--- pig/branches/branch-0.8/CHANGES.txt (original)
+++ pig/branches/branch-0.8/CHANGES.txt Sat Apr 16 21:31:40 2011
@@ -34,6 +34,8 @@ PIG-1886: Add zookeeper jar to list of j
BUG FIXES
+PIG-1870: HBaseStorage doesn't project correctly (dvryaboy)
+
PIG-1979: New logical plan failing with ERROR 2229: Couldn't find matching uid -1 (daijy)
PIG-1977: "Stream closed" error while reading Pig temp files (results of intermediate jobs) (rding)
Modified: pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1094057&r1=1094056&r2=1094057&view=diff
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original)
+++ pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Sat Apr 16 21:31:40 2011
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
@@ -75,6 +76,8 @@ import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.util.Utils;
import com.google.common.collect.Lists;
@@ -99,6 +102,7 @@ public class HBaseStorage extends LoadFu
private RecordReader reader;
private RecordWriter writer;
private Scan scan;
+ private String contextSignature = null;
private final CommandLine configuredOptions_;
private final static Options validOptions_ = new Options();
@@ -116,6 +120,10 @@ public class HBaseStorage extends LoadFu
private ResourceSchema schema_;
+ private RequiredFieldList requiredFieldList;
+
+ private boolean initialized = false;
+
private static void populateValidOptions() {
validOptions_.addOption("loadKey", false, "Load Key");
validOptions_.addOption("gt", true, "Records must be greater than this value " +
@@ -240,6 +248,17 @@ public class HBaseStorage extends LoadFu
@Override
public Tuple getNext() throws IOException {
try {
+ if (!initialized) {
+ Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass(),
+ new String[] {contextSignature});
+
+ String projectedFields = p.getProperty(contextSignature+"_projectedFields");
+ if (projectedFields != null) {
+ requiredFieldList = (RequiredFieldList) ObjectSerializer.deserialize(projectedFields);
+ pushProjection(requiredFieldList);
+ }
+ initialized = true;
+ }
if (reader.nextKeyValue()) {
ImmutableBytesWritable rowKey = (ImmutableBytesWritable) reader
.getCurrentKey();
@@ -289,8 +308,16 @@ public class HBaseStorage extends LoadFu
}
@Override
+ public void setUDFContextSignature(String signature) {
+ this.contextSignature = signature;
+ }
+
+ @Override
public void setLocation(String location, Job job) throws IOException {
job.getConfiguration().setBoolean("pig.noSplitCombination", true);
+ m_conf = job.getConfiguration();
+ HBaseConfiguration.addHbaseResources(m_conf);
+
// Make sure the HBase, ZooKeeper, and Guava jars get shipped.
TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
org.apache.hadoop.hbase.client.HTable.class,
@@ -304,12 +331,22 @@ public class HBaseStorage extends LoadFu
if (m_table == null) {
m_table = new HTable(m_conf, tablename);
}
- HBaseConfiguration.addHbaseResources(m_conf);
m_table.setScannerCaching(caching_);
m_conf.set(TableInputFormat.INPUT_TABLE, tablename);
+
+ // Set up scan if it is not already set up.
+ if (m_conf.get(TableInputFormat.SCAN) != null) {
+ return;
+ }
+
for (byte[][] col : columnList_) {
scan.addColumn(col[0], col[1]);
}
+ if (requiredFieldList != null) {
+ Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass(),
+ new String[] {contextSignature});
+ p.setProperty(contextSignature + "_projectedFields", ObjectSerializer.serialize(requiredFieldList));
+ }
m_conf.set(TableInputFormat.SCAN, convertScanToString(scan));
}
@@ -372,6 +409,15 @@ public class HBaseStorage extends LoadFu
@SuppressWarnings("unchecked")
@Override
public void putNext(Tuple t) throws IOException {
+ if (!initialized) {
+ Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass(),
+ new String[] {contextSignature});
+ String serializedSchema = p.getProperty(contextSignature + "_schema");
+ if (serializedSchema!= null) {
+ schema_ = (ResourceSchema) ObjectSerializer.deserialize(serializedSchema);
+ }
+ initialized = true;
+ }
ResourceFieldSchema[] fieldSchemas = (schema_ == null) ? null : schema_.getFields();
Put put=new Put(objToBytes(t.get(0),
(fieldSchemas == null) ? DataType.findType(t.get(0)) : fieldSchemas[0].getType()));
@@ -391,6 +437,7 @@ public class HBaseStorage extends LoadFu
@SuppressWarnings("unchecked")
private byte[] objToBytes(Object o, byte type) throws IOException {
LoadStoreCaster caster = (LoadStoreCaster) caster_;
+ if (o == null) return null;
switch (type) {
case DataType.BYTEARRAY: return ((DataByteArray) o).get();
case DataType.BAG: return caster.toBytes((DataBag) o);
@@ -418,7 +465,9 @@ public class HBaseStorage extends LoadFu
}
@Override
- public void setStoreFuncUDFContextSignature(String signature) { }
+ public void setStoreFuncUDFContextSignature(String signature) {
+ this.contextSignature = signature;
+ }
@Override
public void setStoreLocation(String location, Job job) throws IOException {
@@ -427,7 +476,11 @@ public class HBaseStorage extends LoadFu
}else{
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, location);
}
- m_conf = HBaseConfiguration.create(job.getConfiguration());
+ Properties props = UDFContext.getUDFContext().getUDFProperties(getClass(), new String[]{contextSignature});
+ if (!props.containsKey(contextSignature + "_schema")) {
+ props.setProperty(contextSignature + "_schema", ObjectSerializer.serialize(schema_));
+ }
+ m_conf = HBaseConfiguration.addHbaseResources(job.getConfiguration());
}
@Override
@@ -446,29 +499,33 @@ public class HBaseStorage extends LoadFu
@Override
public RequiredFieldResponse pushProjection(
RequiredFieldList requiredFieldList) throws FrontendException {
+
+ // colOffset is the offset in our columnList that we need to apply to indexes we get from requiredFields
+ // (row key is not a real column)
+ int colOffset = loadRowKey_ ? 1 : 0;
+ // projOffset is the offset to the requiredFieldList we need to apply when figuring out which columns to prune.
+ // (if key is pruned, we should skip row key's element in this list when trimming colList)
+ int projOffset = colOffset;
+
+ this.requiredFieldList = requiredFieldList;
List<RequiredField> requiredFields = requiredFieldList.getFields();
+ if (requiredFieldList != null && requiredFields.size() > (columnList_.size() + colOffset)) {
+ throw new FrontendException("The list of columns to project from HBase is larger than HBaseStorage is configured to load.");
+ }
+
List<byte[][]> newColumns = Lists.newArrayListWithExpectedSize(requiredFields.size());
// HBase Row Key is the first column in the schema when it's loaded,
// and is not included in the columnList (since it's not a proper column).
- int offset = loadRowKey_ ? 1 : 0;
-
- if (loadRowKey_) {
- if (requiredFields.size() < 1 || requiredFields.get(0).getIndex() != 0) {
+ if (loadRowKey_ &&
+ ( requiredFields.size() < 1 || requiredFields.get(0).getIndex() != 0)) {
loadRowKey_ = false;
- } else {
- // We just processed the fact that the row key needs to be loaded.
- requiredFields.remove(0);
+ projOffset = 0;
}
- }
- for (RequiredField field : requiredFields) {
- int fieldIndex = field.getIndex();
- newColumns.add(columnList_.get(fieldIndex - offset));
- }
- LOG.debug("pushProjection After Projection: loadRowKey is " + loadRowKey_) ;
- for (byte[][] col : newColumns) {
- LOG.debug("pushProjection -- col: " + Bytes.toStringBinary(col[0]) + ":" + Bytes.toStringBinary(col[1]));
+ for (int i = projOffset; i < requiredFields.size(); i++) {
+ int fieldIndex = requiredFields.get(i).getIndex();
+ newColumns.add(columnList_.get(fieldIndex - colOffset));
}
columnList_ = newColumns;
return new RequiredFieldResponse(true);
Modified: pig/branches/branch-0.8/test/org/apache/pig/test/TestHBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/test/org/apache/pig/test/TestHBaseStorage.java?rev=1094057&r1=1094056&r2=1094057&view=diff
==============================================================================
--- pig/branches/branch-0.8/test/org/apache/pig/test/TestHBaseStorage.java (original)
+++ pig/branches/branch-0.8/test/org/apache/pig/test/TestHBaseStorage.java Sat Apr 16 21:31:40 2011
@@ -18,12 +18,14 @@ package org.apache.pig.test;
import java.io.IOException;
import java.util.Iterator;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
@@ -41,6 +43,8 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
+import com.google.common.collect.Lists;
+
public class TestHBaseStorage {
private static final Log LOG = LogFactory.getLog(TestHBaseStorage.class);
@@ -66,7 +70,6 @@ public class TestHBaseStorage {
@BeforeClass
public static void setUp() throws Exception {
-
// This is needed by Pig
MiniCluster cluster = MiniCluster.buildCluster();
@@ -78,7 +81,7 @@ public class TestHBaseStorage {
util.startMiniHBaseCluster(1, 1);
- pig = new PigServer(ExecType.MAPREDUCE,
+ pig = new PigServer(ExecType.LOCAL,
ConfigurationUtil.toProperties(conf));
}
@@ -97,14 +100,26 @@ public class TestHBaseStorage {
@After
public void tearDown() throws Exception {
try {
- util.deleteTable(Bytes.toBytesBinary(TESTTABLE_1));
+ deleteAllRows(TESTTABLE_1);
} catch (IOException e) {}
try {
- util.deleteTable(Bytes.toBytesBinary(TESTTABLE_2));
+ deleteAllRows(TESTTABLE_2);
} catch (IOException e) {}
pig.shutdown();
}
+ // DVR: I've found that it is faster to delete all rows in small tables
+ // than to drop them.
+ private void deleteAllRows(String tableName) throws Exception {
+ HTable table = new HTable(conf, tableName);
+ ResultScanner scanner = table.getScanner(new Scan());
+ List<Delete> deletes = Lists.newArrayList();
+ for (Result row : scanner) {
+ deletes.add(new Delete(row.getRow()));
+ }
+ table.delete(deletes);
+ }
+
/**
* load from hbase test
*
@@ -122,7 +137,6 @@ public class TestHBaseStorage {
LOG.info("LoadFromHBase Starting");
while (it.hasNext()) {
Tuple t = it.next();
- LOG.info("LoadFromHBase " + t);
String col_a = ((DataByteArray) t.get(0)).toString();
String col_b = ((DataByteArray) t.get(1)).toString();
String col_c = ((DataByteArray) t.get(2)).toString();
@@ -151,10 +165,9 @@ public class TestHBaseStorage {
+ "') as (col_a, col_b, col_c);");
Iterator<Tuple> it = pig.openIterator("a");
int count = 0;
- LOG.info("LoadFromHBase Starting");
+ LOG.info("BackwardsCompatibility Starting");
while (it.hasNext()) {
Tuple t = it.next();
- LOG.info("LoadFromHBase " + t);
String col_a = ((DataByteArray) t.get(0)).toString();
String col_b = ((DataByteArray) t.get(1)).toString();
String col_c = ((DataByteArray) t.get(2)).toString();
@@ -165,7 +178,7 @@ public class TestHBaseStorage {
count++;
}
Assert.assertEquals(TEST_ROW_COUNT, count);
- LOG.info("LoadFromHBase done");
+ LOG.info("BackwardsCompatibility done");
}
/**
@@ -182,10 +195,9 @@ public class TestHBaseStorage {
+ "','-loadKey') as (rowKey,col_a, col_b, col_c);");
Iterator<Tuple> it = pig.openIterator("a");
int count = 0;
- LOG.info("LoadFromHBase Starting");
+ LOG.info("LoadFromHBaseWithRowKey Starting");
while (it.hasNext()) {
Tuple t = it.next();
- LOG.info("LoadFromHBase " + t);
String rowKey = ((DataByteArray) t.get(0)).toString();
String col_a = ((DataByteArray) t.get(1)).toString();
String col_b = ((DataByteArray) t.get(2)).toString();
@@ -200,7 +212,7 @@ public class TestHBaseStorage {
count++;
}
Assert.assertEquals(TEST_ROW_COUNT, count);
- LOG.info("LoadFromHBase done");
+ LOG.info("LoadFromHBaseWithRowKey done");
}
/**
@@ -224,10 +236,9 @@ public class TestHBaseStorage {
Iterator<Tuple> it = pig.openIterator("a");
int count = 0;
int next = 1;
- LOG.info("LoadFromHBase Starting");
+ LOG.info("LoadFromHBaseWithParameters_1 Starting");
while (it.hasNext()) {
Tuple t = it.next();
- LOG.info("LoadFromHBase " + t);
String rowKey = ((DataByteArray) t.get(0)).toString();
String col_a = ((DataByteArray) t.get(1)).toString();
String col_b = ((DataByteArray) t.get(2)).toString();
@@ -243,7 +254,7 @@ public class TestHBaseStorage {
next++;
}
Assert.assertEquals(TEST_ROW_COUNT - 2, count);
- LOG.info("LoadFromHBase done");
+ LOG.info("LoadFromHBaseWithParameters_1 done");
}
/**
@@ -266,10 +277,9 @@ public class TestHBaseStorage {
Iterator<Tuple> it = pig.openIterator("a");
int count = 0;
int next = 1;
- LOG.info("LoadFromHBase Starting");
+ LOG.info("LoadFromHBaseWithParameters_2 Starting");
while (it.hasNext()) {
Tuple t = it.next();
- LOG.info("LoadFromHBase " + t);
String rowKey = ((DataByteArray) t.get(0)).toString();
String col_a = ((DataByteArray) t.get(1)).toString();
String col_b = ((DataByteArray) t.get(2)).toString();
@@ -285,7 +295,7 @@ public class TestHBaseStorage {
next++;
}
Assert.assertEquals(TEST_ROW_COUNT - 2, count);
- LOG.info("LoadFromHBase done");
+ LOG.info("LoadFromHBaseWithParameters_2 done");
}
/**
@@ -300,10 +310,9 @@ public class TestHBaseStorage {
+ "','-loadKey -limit 10') as (rowKey,col_a, col_b, col_c);");
Iterator<Tuple> it = pig.openIterator("a");
int count = 0;
- LOG.info("LoadFromHBase Starting");
+ LOG.info("LoadFromHBaseWithParameters_3 Starting");
while (it.hasNext()) {
Tuple t = it.next();
- LOG.info("LoadFromHBase " + t);
String rowKey = ((DataByteArray) t.get(0)).toString();
String col_a = ((DataByteArray) t.get(1)).toString();
String col_b = ((DataByteArray) t.get(2)).toString();
@@ -319,32 +328,69 @@ public class TestHBaseStorage {
}
// 'limit' apply for each region and here we have only one region
Assert.assertEquals(10, count);
- LOG.info("LoadFromHBase done");
+ LOG.info("LoadFromHBaseWithParameters_3 done");
}
/**
+ * Test Load from hbase with projection.
+ */
+ @Test
+ public void testLoadWithProjection_1() throws IOException {
+ prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
+ scanTable1(pig, DataFormat.HBaseBinary);
+ pig.registerQuery("b = FOREACH a GENERATE col_a, col_c;");
+ Iterator<Tuple> it = pig.openIterator("b");
+ int index = 0;
+ LOG.info("testLoadWithProjection_1 Starting");
+ while (it.hasNext()) {
+ Tuple t = it.next();
+ int col_a = (Integer) t.get(0);
+ String col_c = (String) t.get(1);
+ Assert.assertEquals(index, col_a);
+ Assert.assertEquals("Text_" + index, col_c);
+ Assert.assertEquals(2, t.size());
+ index++;
+ }
+ Assert.assertEquals(100, index);
+ LOG.info("testLoadWithProjection_1 done");
+ }
+
+ /**
+ * Test Load from hbase with projection and the default caster.
+ */
+ @Test
+ public void testLoadWithProjection_2() throws IOException {
+ prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
+ scanTable1(pig, DataFormat.UTF8PlainText);
+ pig.registerQuery("b = FOREACH a GENERATE col_a, col_c;");
+ Iterator<Tuple> it = pig.openIterator("b");
+ int index = 0;
+ LOG.info("testLoadWithProjection_2 Starting");
+ while (it.hasNext()) {
+ Tuple t = it.next();
+ int col_a = (Integer) t.get(0);
+ String col_c = (String) t.get(1);
+ Assert.assertEquals(index, col_a);
+ Assert.assertEquals("Text_" + index, col_c);
+ Assert.assertEquals(2, t.size());
+ index++;
+ }
+ Assert.assertEquals(100, index);
+ LOG.info("testLoadWithProjection_2 done");
+ }
+
+ /**
* Test Load from hbase using HBaseBinaryConverter
*/
@Test
public void testHBaseBinaryConverter() throws IOException {
prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
-
- pig.registerQuery("a = load 'hbase://"
- + TESTTABLE_1
- + "' using "
- + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
- + TESTCOLUMN_A
- + " "
- + TESTCOLUMN_B
- + " "
- + TESTCOLUMN_C
- + "','-loadKey -caster HBaseBinaryConverter') as (rowKey:chararray,col_a:int, col_b:double, col_c:chararray);");
+ scanTable1(pig, DataFormat.HBaseBinary);
Iterator<Tuple> it = pig.openIterator("a");
int index = 0;
- LOG.info("LoadFromHBase Starting");
+ LOG.info("testHBaseBinaryConverter Starting");
while (it.hasNext()) {
Tuple t = it.next();
- LOG.info("LoadFromHBase " + t);
String rowKey = (String) t.get(0);
int col_a = (Integer) t.get(1);
double col_b = (Double) t.get(2);
@@ -357,7 +403,7 @@ public class TestHBaseStorage {
Assert.assertEquals("Text_" + index, col_c);
index++;
}
- LOG.info("LoadFromHBase done");
+ LOG.info("testHBaseBinaryConverter done");
}
/**
@@ -370,23 +416,13 @@ public class TestHBaseStorage {
public void testStoreToHBase_1() throws IOException {
prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
prepareTable(TESTTABLE_2, false, DataFormat.HBaseBinary);
-
- pig.registerQuery("a = load 'hbase://"
- + TESTTABLE_1
- + "' using "
- + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
- + TESTCOLUMN_A
- + " "
- + TESTCOLUMN_B
- + " "
- + TESTCOLUMN_C
- + "','-loadKey -caster HBaseBinaryConverter') as (rowKey:chararray,col_a:int, col_b:double, col_c:chararray);");
+ scanTable1(pig, DataFormat.HBaseBinary);
pig.store("a", TESTTABLE_2,
"org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+ TESTCOLUMN_A + " " + TESTCOLUMN_B + " "
+ TESTCOLUMN_C + "','-caster HBaseBinaryConverter')");
- HTable table = new HTable(conf, TESTTABLE_2);
+ HTable table = new HTable(TESTTABLE_2);
ResultScanner scanner = table.getScanner(new Scan());
Iterator<Result> iter = scanner.iterator();
int i = 0;
@@ -408,6 +444,42 @@ public class TestHBaseStorage {
/**
* load from hbase 'TESTTABLE_1' using HBaseBinary format, and store it into
+ * 'TESTTABLE_2' using HBaseBinaryFormat projecting out column c
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testStoreToHBase_1_with_projection() throws IOException {
+ System.getProperties().setProperty("pig.usenewlogicalplan", "false");
+ prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
+ prepareTable(TESTTABLE_2, false, DataFormat.HBaseBinary);
+ scanTable1(pig, DataFormat.HBaseBinary);
+ pig.registerQuery("b = FOREACH a GENERATE rowKey, col_a, col_b;");
+ pig.store("b", TESTTABLE_2,
+ "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+ + TESTCOLUMN_A + " " + TESTCOLUMN_B +
+ "','-caster HBaseBinaryConverter')");
+
+ HTable table = new HTable(TESTTABLE_2);
+ ResultScanner scanner = table.getScanner(new Scan());
+ Iterator<Result> iter = scanner.iterator();
+ int i = 0;
+ for (i = 0; iter.hasNext(); ++i) {
+ Result result = iter.next();
+ String v = String.valueOf(i);
+ String rowKey = Bytes.toString(result.getRow());
+ int col_a = Bytes.toInt(getColValue(result, TESTCOLUMN_A));
+ double col_b = Bytes.toDouble(getColValue(result, TESTCOLUMN_B));
+
+ Assert.assertEquals("00".substring(v.length()) + v, rowKey);
+ Assert.assertEquals(i, col_a);
+ Assert.assertEquals(i + 0.0, col_b, 1e-6);
+ }
+ Assert.assertEquals(100, i);
+ }
+
+ /**
+ * load from hbase 'TESTTABLE_1' using HBaseBinary format, and store it into
* 'TESTTABLE_2' using UTF-8 Plain Text format
*
* @throws IOException
@@ -416,23 +488,13 @@ public class TestHBaseStorage {
public void testStoreToHBase_2() throws IOException {
prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
prepareTable(TESTTABLE_2, false, DataFormat.HBaseBinary);
-
- pig.registerQuery("a = load 'hbase://"
- + TESTTABLE_1
- + "' using "
- + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
- + TESTCOLUMN_A
- + " "
- + TESTCOLUMN_B
- + " "
- + TESTCOLUMN_C
- + "','-loadKey -caster HBaseBinaryConverter') as (rowKey:chararray,col_a:int, col_b:double, col_c:chararray);");
+ scanTable1(pig, DataFormat.HBaseBinary);
pig.store("a", TESTTABLE_2,
"org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+ TESTCOLUMN_A + " " + TESTCOLUMN_B + " "
+ TESTCOLUMN_C + "')");
- HTable table = new HTable(conf, TESTTABLE_2);
+ HTable table = new HTable(TESTTABLE_2);
ResultScanner scanner = table.getScanner(new Scan());
Iterator<Result> iter = scanner.iterator();
int i = 0;
@@ -451,21 +513,114 @@ public class TestHBaseStorage {
}
Assert.assertEquals(100, i);
}
+ /**
+ * load from hbase 'TESTTABLE_1' using HBaseBinary format, and store it into
+ * 'TESTTABLE_2' using UTF-8 Plain Text format projecting column c
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testStoreToHBase_2_with_projection() throws IOException {
+ prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
+ prepareTable(TESTTABLE_2, false, DataFormat.UTF8PlainText);
+ scanTable1(pig, DataFormat.HBaseBinary);
+ pig.registerQuery("b = FOREACH a GENERATE rowKey, col_a, col_b;");
+ pig.store("b", TESTTABLE_2,
+ "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+ + TESTCOLUMN_A + " " + TESTCOLUMN_B + "')");
+
+ HTable table = new HTable(TESTTABLE_2);
+ ResultScanner scanner = table.getScanner(new Scan());
+ Iterator<Result> iter = scanner.iterator();
+ int i = 0;
+ for (i = 0; iter.hasNext(); ++i) {
+ Result result = iter.next();
+ String v = i + "";
+ String rowKey = new String(result.getRow());
+ int col_a = Integer.parseInt(new String(getColValue(result, TESTCOLUMN_A)));
+ double col_b = Double.parseDouble(new String(getColValue(result, TESTCOLUMN_B)));
+
+ Assert.assertEquals("00".substring(v.length()) + v, rowKey);
+ Assert.assertEquals(i, col_a);
+ Assert.assertEquals(i + 0.0, col_b, 1e-6);
+ }
+ Assert.assertEquals(100, i);
+ }
/**
+ * load from hbase 'TESTTABLE_1' using UTF-8 Plain Text format, and store it
+ * into 'TESTTABLE_2' using UTF-8 Plain Text format projecting column c
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testStoreToHBase_3_with_projection_no_caster() throws IOException {
+ prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
+ prepareTable(TESTTABLE_2, false, DataFormat.UTF8PlainText);
+ scanTable1(pig, DataFormat.UTF8PlainText);
+ pig.registerQuery("b = FOREACH a GENERATE rowKey, col_a, col_b;");
+ pig.store("b", TESTTABLE_2,
+ "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+ + TESTCOLUMN_A + " " + TESTCOLUMN_B + "')");
+
+ HTable table = new HTable(TESTTABLE_2);
+ ResultScanner scanner = table.getScanner(new Scan());
+ Iterator<Result> iter = scanner.iterator();
+ int i = 0;
+ for (i = 0; iter.hasNext(); ++i) {
+ Result result = iter.next();
+ String v = i + "";
+ String rowKey = new String(result.getRow());
+
+ String col_a = new String(getColValue(result, TESTCOLUMN_A));
+ String col_b = new String(getColValue(result, TESTCOLUMN_B));
+
+ Assert.assertEquals("00".substring(v.length()) + v, rowKey);
+ Assert.assertEquals(i + "", col_a);
+ Assert.assertEquals(i + 0.0 + "", col_b);
+ }
+ Assert.assertEquals(100, i);
+ }
+
+
+ private void scanTable1(PigServer pig, DataFormat dataFormat) throws IOException {
+ scanTable1(pig, dataFormat, "");
+ }
+
+ private void scanTable1(PigServer pig, DataFormat dataFormat, String extraParams) throws IOException {
+ pig.registerQuery("a = load 'hbase://"
+ + TESTTABLE_1
+ + "' using "
+ + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+ + TESTCOLUMN_A
+ + " "
+ + TESTCOLUMN_B
+ + " "
+ + TESTCOLUMN_C
+ + "','-loadKey "
+ + (dataFormat == DataFormat.HBaseBinary ? " -caster HBaseBinaryConverter" : "")
+ + " " + extraParams + " "
+ + "') as (rowKey:chararray,col_a:int, col_b:double, col_c:chararray);");
+ }
+ /**
* Prepare a table in hbase for testing.
*
*/
private void prepareTable(String tableName, boolean initData,
DataFormat format) throws IOException {
// define the table schema
+ HTable table = null;
try {
- util.deleteTable(Bytes.toBytesBinary(tableName));
- } catch (IOException e) {
+ deleteAllRows(tableName);
+ } catch (Exception e) {
// It's ok, table might not exist.
}
- HTable table = util.createTable(Bytes.toBytesBinary(tableName),
+ try {
+ table = util.createTable(Bytes.toBytesBinary(tableName),
COLUMNFAMILY);
+ } catch (Exception e) {
+ table = new HTable(Bytes.toBytesBinary(tableName));
+ }
if (initData) {
for (int i = 0; i < TEST_ROW_COUNT; i++) {
String v = i + "";
@@ -511,6 +666,7 @@ public class TestHBaseStorage {
*/
private static byte[] getColValue(Result result, String colName) {
byte[][] colArray = Bytes.toByteArrays(colName.split(":"));
+ byte[] val = result.getValue(colArray[0], colArray[1]);
return result.getValue(colArray[0], colArray[1]);
}