You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by dv...@apache.org on 2011/04/17 08:58:34 UTC
svn commit: r1094109 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
test/org/apache/pig/test/TestHBaseStorage.java
Author: dvryaboy
Date: Sun Apr 17 06:58:34 2011
New Revision: 1094109
URL: http://svn.apache.org/viewvc?rev=1094109&view=rev
Log:
PIG-1782: Add ability to load data by column family in HBaseStorage
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1094109&r1=1094108&r2=1094109&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sun Apr 17 06:58:34 2011
@@ -32,6 +32,8 @@ PIG-1876: Typed map for Pig (daijy)
IMPROVEMENTS
+PIG-1782: Add ability to load data by column family in HBaseStorage (billgraham via dvryaboy)
+
PIG-1772: Pig 090 Documentation (chandec via olgan)
PIG-1954: Design deployment interface for e2e test harness (gates)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1094109&r1=1094108&r2=1094109&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Sun Apr 17 06:58:34 2011
@@ -17,13 +17,15 @@
package org.apache.pig.backend.hadoop.hbase;
import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.DataInput;
+import java.io.DataOutput;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.NavigableMap;
+import java.util.HashMap;
import java.util.Properties;
import org.apache.commons.cli.CommandLine;
@@ -34,8 +36,8 @@ import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
@@ -45,6 +47,9 @@ import org.apache.hadoop.hbase.filter.Bi
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.FamilyFilter;
+import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
+import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
@@ -75,18 +80,44 @@ import org.apache.pig.data.DataByteArray
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.Utils;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.UDFContext;
-import org.apache.pig.impl.util.Utils;
import com.google.common.collect.Lists;
/**
- * A HBase implementation of LoadFunc and StoreFunc
+ * A HBase implementation of LoadFunc and StoreFunc.
+ * <P>
+ * Below is an example showing how to load data from HBase:
+ * <pre>{@code
+ * raw = LOAD 'hbase://SampleTable'
+ * USING org.apache.pig.backend.hadoop.hbase.HBaseStorage(
+ * 'info:first_name info:last_name friends:* info:*', '-loadKey true -limit 5')
+ * AS (id:bytearray, first_name:chararray, last_name:chararray, friends_map:map[], info_map:map[]);
+ * }</pre>
+ * This example loads data redundantly from the info column family just to
+ * illustrate usage. Note that the row key is inserted first in the result schema.
+ * To load only column names that start with a given prefix, specify the column
+ * name with a trailing '*'. For example passing <code>friends:bob_*</code> to
+ * the constructor in the above example would cause only columns that start with
+ * <i>bob_</i> to be loaded.
+ * <P>
+ * Below is an example showing how to store data into HBase:
+ * <pre>{@code
+ * copy = STORE raw INTO 'hbase://SampleTableCopy'
+ * USING org.apache.pig.backend.hadoop.hbase.HBaseStorage(
+ * 'info:first_name info:last_name friends:* info:*')
+ * AS (info:first_name info:last_name buddies:* info:*);
+ * }</pre>
+ * Note that STORE will expect the first value in the tuple to be the row key.
+ * Scalars values need to map to an explicit column descriptor and maps need to
+ * map to a column family name. In the above examples, the <code>friends</code>
+ * column family data from <code>SampleTable</code> will be written to a
+ * <code>buddies</code> column family in the <code>SampleTableCopy</code> table.
*
- * TODO(dmitriy) test that all this stuff works
- * TODO(dmitriy) documentation
*/
public class HBaseStorage extends LoadFunc implements StoreFuncInterface, LoadPushDown, OrderedLoadFunc {
@@ -95,8 +126,10 @@ public class HBaseStorage extends LoadFu
private final static String STRING_CASTER = "UTF8StorageConverter";
private final static String BYTE_CASTER = "HBaseBinaryConverter";
private final static String CASTER_PROPERTY = "pig.hbase.caster";
+ private final static String ASTERISK = "*";
+ private final static String COLON = ":";
- private List<byte[][]> columnList_ = Lists.newArrayList();
+ private List<ColumnInfo> columnInfo_ = Lists.newArrayList();
private HTable m_table;
private Configuration m_conf;
private RecordReader reader;
@@ -119,9 +152,7 @@ public class HBaseStorage extends LoadFu
private LoadCaster caster_;
private ResourceSchema schema_;
-
private RequiredFieldList requiredFieldList;
-
private boolean initialized = false;
private static void populateValidOptions() {
@@ -142,7 +173,16 @@ public class HBaseStorage extends LoadFu
* provided columns.
*
* @param columnList
- * columnlist that is a presented string delimited by space.
+ * columnlist that is a presented string delimited by space. To
+ * retreive all columns in a column family <code>Foo</code>, specify
+ * a column as either <code>Foo:</code> or <code>Foo:*</code>. To fetch
+ * only columns in the CF that start with <I>bar</I>, specify
+ * <code>Foo:bar*</code>. The resulting tuple will always be the size
+ * of the number of tokens in <code>columnList</code>. Items in the
+ * tuple will be scalar values when a full column descriptor is
+ * specified, or a map of column descriptors to values when a column
+ * family is specified.
+ *
* @throws ParseException when unale to parse arguments
* @throws IOException
*/
@@ -173,13 +213,13 @@ public class HBaseStorage extends LoadFu
configuredOptions_ = parser_.parse(validOptions_, optsArr);
} catch (ParseException e) {
HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp( "[-loadKey] [-gt] [-gte] [-lt] [-lte] [-caching] [-caster]", validOptions_ );
+ formatter.printHelp( "[-loadKey] [-gt] [-gte] [-lt] [-lte] [-columnPrefix] [-caching] [-caster] [-limit]", validOptions_ );
throw e;
}
loadRowKey_ = configuredOptions_.hasOption("loadKey");
for (String colName : colNames) {
- columnList_.add(Bytes.toByteArrays(colName.split(":")));
+ columnInfo_.add(new ColumnInfo(colName));
}
m_conf = HBaseConfiguration.create();
@@ -191,21 +231,13 @@ public class HBaseStorage extends LoadFu
caster_ = new HBaseBinaryConverter();
} else {
try {
- @SuppressWarnings("unchecked")
- Class<LoadCaster> casterClass = (Class<LoadCaster>) Class.forName(casterOption);
- caster_ = casterClass.newInstance();
+ caster_ = (LoadCaster) PigContext.instantiateFuncFromSpec(casterOption);
} catch (ClassCastException e) {
LOG.error("Congifured caster does not implement LoadCaster interface.");
throw new IOException(e);
- } catch (ClassNotFoundException e) {
+ } catch (RuntimeException e) {
LOG.error("Configured caster class not found.", e);
throw new IOException(e);
- } catch (InstantiationException e) {
- LOG.error("Unable to instantiate configured caster " + casterOption, e);
- throw new IOException(e);
- } catch (IllegalAccessException e) {
- LOG.error("Illegal Access Exception for configured caster " + casterOption, e);
- throw new IOException(e);
}
}
@@ -219,29 +251,67 @@ public class HBaseStorage extends LoadFu
// Set filters, if any.
if (configuredOptions_.hasOption("gt")) {
gt_ = Bytes.toBytesBinary(Utils.slashisize(configuredOptions_.getOptionValue("gt")));
- addFilter(CompareOp.GREATER, gt_);
+ addRowFilter(CompareOp.GREATER, gt_);
}
if (configuredOptions_.hasOption("lt")) {
lt_ = Bytes.toBytesBinary(Utils.slashisize(configuredOptions_.getOptionValue("lt")));
- addFilter(CompareOp.LESS, lt_);
+ addRowFilter(CompareOp.LESS, lt_);
}
if (configuredOptions_.hasOption("gte")) {
gte_ = Bytes.toBytesBinary(Utils.slashisize(configuredOptions_.getOptionValue("gte")));
- addFilter(CompareOp.GREATER_OR_EQUAL, gte_);
+ addRowFilter(CompareOp.GREATER_OR_EQUAL, gte_);
}
if (configuredOptions_.hasOption("lte")) {
lte_ = Bytes.toBytesBinary(Utils.slashisize(configuredOptions_.getOptionValue("lte")));
- addFilter(CompareOp.LESS_OR_EQUAL, lte_);
+ addRowFilter(CompareOp.LESS_OR_EQUAL, lte_);
+ }
+
+ // apply any column filters
+ FilterList allColumnFilters = null;
+ for (ColumnInfo colInfo : columnInfo_) {
+ if (colInfo.isColumnMap() && colInfo.getColumnPrefix() != null) {
+
+ // all column family filters roll up to one parent OR filter
+ if (allColumnFilters == null) {
+ allColumnFilters = new FilterList(FilterList.Operator.MUST_PASS_ONE);
+ }
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Adding family:prefix filters with values " +
+ Bytes.toString(colInfo.getColumnFamily()) + COLON +
+ Bytes.toString(colInfo.getColumnPrefix()));
+ }
+
+ // each column family filter consists of a FamilyFilter AND a PrefixFilter
+ FilterList thisColumnFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL);
+ thisColumnFilter.addFilter(new FamilyFilter(CompareOp.EQUAL,
+ new BinaryComparator(colInfo.getColumnFamily())));
+ thisColumnFilter.addFilter(new ColumnPrefixFilter(
+ colInfo.getColumnPrefix()));
+
+ allColumnFilters.addFilter(thisColumnFilter);
+ }
+ }
+
+ if (allColumnFilters != null) {
+ addFilter(allColumnFilters);
+ }
+ }
+
+ private void addRowFilter(CompareOp op, byte[] val) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Adding filter " + op.toString() +
+ " with value " + Bytes.toStringBinary(val));
}
+ addFilter(new RowFilter(op, new BinaryComparator(val)));
}
- private void addFilter(CompareOp op, byte[] val) {
- LOG.info("Adding filter " + op.toString() + " with value " + Bytes.toStringBinary(val));
+ private void addFilter(Filter filter) {
FilterList scanFilter = (FilterList) scan.getFilter();
if (scanFilter == null) {
scanFilter = new FilterList();
}
- scanFilter.addFilter(new RowFilter(op, new BinaryComparator(val)));
+ scanFilter.addFilter(filter);
scan.setFilter(scanFilter);
}
@@ -263,7 +333,15 @@ public class HBaseStorage extends LoadFu
ImmutableBytesWritable rowKey = (ImmutableBytesWritable) reader
.getCurrentKey();
Result result = (Result) reader.getCurrentValue();
- int tupleSize=columnList_.size();
+
+ int tupleSize = columnInfo_.size();
+
+ // use a map of families -> qualifiers with the most recent
+ // version of the cell. Fetching multiple vesions could be a
+ // useful feature.
+ NavigableMap<byte[], NavigableMap<byte[], byte[]>> resultsMap =
+ result.getNoVersionMap();
+
if (loadRowKey_){
tupleSize++;
}
@@ -274,13 +352,52 @@ public class HBaseStorage extends LoadFu
tuple.set(0, new DataByteArray(rowKey.get()));
startIndex++;
}
- for (int i=0;i<columnList_.size();++i){
- byte[] cell=result.getValue(columnList_.get(i)[0],columnList_.get(i)[1]);
- if (cell!=null)
- tuple.set(i+startIndex, new DataByteArray(cell));
- else
- tuple.set(i+startIndex, null);
+ for (int i = 0;i < columnInfo_.size(); ++i){
+ int currentIndex = startIndex + i;
+
+ ColumnInfo columnInfo = columnInfo_.get(i);
+ if (columnInfo.isColumnMap()) {
+ // It's a column family so we need to iterate and set all
+ // values found
+ NavigableMap<byte[], byte[]> cfResults =
+ resultsMap.get(columnInfo.getColumnFamily());
+ Map<String, DataByteArray> cfMap =
+ new HashMap<String, DataByteArray>();
+
+ if (cfResults != null) {
+ for (byte[] quantifier : cfResults.keySet()) {
+ // We need to check against the prefix filter to
+ // see if this value should be included. We can't
+ // just rely on the server-side filter, since a
+ // user could specify multiple CF filters for the
+ // same CF.
+ if (columnInfo.getColumnPrefix() == null ||
+ columnInfo.hasPrefixMatch(quantifier)) {
+
+ byte[] cell = cfResults.get(quantifier);
+ DataByteArray value =
+ cell == null ? null : new DataByteArray(cell);
+ cfMap.put(Bytes.toString(quantifier), value);
+ }
+ }
+ }
+ tuple.set(currentIndex, cfMap);
+ } else {
+ // It's a column so set the value
+ byte[] cell=result.getValue(columnInfo.getColumnFamily(),
+ columnInfo.getColumnName());
+ DataByteArray value =
+ cell == null ? null : new DataByteArray(cell);
+ tuple.set(currentIndex, value);
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ for (int i = 0; i < tuple.size(); i++) {
+ LOG.debug("tuple value:" + tuple.get(i));
+ }
}
+
return tuple;
}
} catch (InterruptedException e) {
@@ -339,8 +456,16 @@ public class HBaseStorage extends LoadFu
return;
}
- for (byte[][] col : columnList_) {
- scan.addColumn(col[0], col[1]);
+ for (ColumnInfo columnInfo : columnInfo_) {
+ // do we have a column family, or a column?
+ if (columnInfo.isColumnMap()) {
+ scan.addFamily(columnInfo.getColumnFamily());
+ }
+ else {
+ scan.addColumn(columnInfo.getColumnFamily(),
+ columnInfo.getColumnName());
+ }
+
}
if (requiredFieldList != null) {
Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass(),
@@ -423,10 +548,38 @@ public class HBaseStorage extends LoadFu
(fieldSchemas == null) ? DataType.findType(t.get(0)) : fieldSchemas[0].getType()));
long ts=System.currentTimeMillis();
+ if (LOG.isDebugEnabled()) {
+ for (ColumnInfo columnInfo : columnInfo_) {
+ LOG.debug("putNext -- col: " + columnInfo);
+ }
+ }
+
for (int i=1;i<t.size();++i){
- put.add(columnList_.get(i-1)[0], columnList_.get(i-1)[1], ts, objToBytes(t.get(i),
- (fieldSchemas == null) ? DataType.findType(t.get(i)) : fieldSchemas[i].getType()));
+ ColumnInfo columnInfo = columnInfo_.get(i-1);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("putNext - tuple: " + i + ", value=" + t.get(i) +
+ ", cf:column=" + columnInfo);
+ }
+
+ if (!columnInfo.isColumnMap()) {
+ put.add(columnInfo.getColumnFamily(), columnInfo.getColumnName(),
+ ts, objToBytes(t.get(i), (fieldSchemas == null) ?
+ DataType.findType(t.get(i)) : fieldSchemas[i].getType()));
+ } else {
+ Map<String, Object> cfMap = (Map<String, Object>) t.get(i);
+ for (String colName : cfMap.keySet()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("putNext - colName=" + colName +
+ ", class: " + colName.getClass());
+ }
+ // TODO deal with the fact that maps can have types now. Currently we detect types at
+ // runtime in the case of storing to a cf, which is suboptimal.
+ put.add(columnInfo.getColumnFamily(), Bytes.toBytes(colName.toString()), ts,
+ objToBytes(cfMap.get(colName), DataType.findType(cfMap.get(colName))));
+ }
+ }
}
+
try {
writer.write(null, put);
} catch (InterruptedException e) {
@@ -499,6 +652,8 @@ public class HBaseStorage extends LoadFu
@Override
public RequiredFieldResponse pushProjection(
RequiredFieldList requiredFieldList) throws FrontendException {
+ List<RequiredField> requiredFields = requiredFieldList.getFields();
+ List<ColumnInfo> newColumns = Lists.newArrayListWithExpectedSize(requiredFields.size());
// colOffset is the offset in our columnList that we need to apply to indexes we get from requiredFields
// (row key is not a real column)
@@ -506,17 +661,12 @@ public class HBaseStorage extends LoadFu
// projOffset is the offset to the requiredFieldList we need to apply when figuring out which columns to prune.
// (if key is pruned, we should skip row key's element in this list when trimming colList)
int projOffset = colOffset;
-
this.requiredFieldList = requiredFieldList;
- List<RequiredField> requiredFields = requiredFieldList.getFields();
- if (requiredFieldList != null && requiredFields.size() > (columnList_.size() + colOffset)) {
+
+ if (requiredFieldList != null && requiredFields.size() > (columnInfo_.size() + colOffset)) {
throw new FrontendException("The list of columns to project from HBase is larger than HBaseStorage is configured to load.");
}
- List<byte[][]> newColumns = Lists.newArrayListWithExpectedSize(requiredFields.size());
-
- // HBase Row Key is the first column in the schema when it's loaded,
- // and is not included in the columnList (since it's not a proper column).
if (loadRowKey_ &&
( requiredFields.size() < 1 || requiredFields.get(0).getIndex() != 0)) {
loadRowKey_ = false;
@@ -525,9 +675,16 @@ public class HBaseStorage extends LoadFu
for (int i = projOffset; i < requiredFields.size(); i++) {
int fieldIndex = requiredFields.get(i).getIndex();
- newColumns.add(columnList_.get(fieldIndex - colOffset));
+ newColumns.add(columnInfo_.get(fieldIndex - colOffset));
}
- columnList_ = newColumns;
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("pushProjection After Projection: loadRowKey is " + loadRowKey_) ;
+ for (ColumnInfo colInfo : newColumns) {
+ LOG.debug("pushProjection -- col: " + colInfo);
+ }
+ }
+ columnInfo_ = newColumns;
return new RequiredFieldResponse(true);
}
@@ -554,4 +711,54 @@ public class HBaseStorage extends LoadFu
};
}
+ /**
+ * Class to encapsulate logic around which column names were specified in each
+ * position of the column list. Users can specify columns names in one of 4
+ * ways: 'Foo:', 'Foo:*', 'Foo:bar*' or 'Foo:bar'. The first 3 result in a
+ * Map being added to the tuple, while the last results in a scalar. The 3rd
+ * form results in a prefix-filtered Map.
+ */
+ private class ColumnInfo {
+
+ final String originalColumnName; // always set
+ final byte[] columnFamily; // always set
+ final byte[] columnName; // set if it exists and doesn't contain '*'
+ final byte[] columnPrefix; // set if contains a prefix followed by '*'
+
+ public ColumnInfo(String colName) {
+ originalColumnName = colName;
+ String[] cfAndColumn = colName.split(COLON, 2);
+
+ //CFs are byte[1] and columns are byte[2]
+ columnFamily = Bytes.toBytes(cfAndColumn[0]);
+ if (cfAndColumn.length > 1 &&
+ cfAndColumn[1].length() > 0 && !ASTERISK.equals(cfAndColumn[1])) {
+ if (cfAndColumn[1].endsWith(ASTERISK)) {
+ columnPrefix = Bytes.toBytes(cfAndColumn[1].substring(0,
+ cfAndColumn[1].length() - 1));
+ columnName = null;
+ }
+ else {
+ columnName = Bytes.toBytes(cfAndColumn[1]);
+ columnPrefix = null;
+ }
+ } else {
+ columnPrefix = null;
+ columnName = null;
+ }
+ }
+
+ public byte[] getColumnFamily() { return columnFamily; }
+ public byte[] getColumnName() { return columnName; }
+ public byte[] getColumnPrefix() { return columnPrefix; }
+ public boolean isColumnMap() { return columnName == null; }
+
+ public boolean hasPrefixMatch(byte[] qualifier) {
+ return Bytes.startsWith(qualifier, columnPrefix);
+ }
+
+ @Override
+ public String toString() { return originalColumnName; }
+ }
+
}
Modified: pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java?rev=1094109&r1=1094108&r2=1094109&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java Sun Apr 17 06:58:34 2011
@@ -18,6 +18,7 @@ package org.apache.pig.test;
import java.io.IOException;
import java.util.Iterator;
+import java.util.Map;
import java.util.List;
import org.apache.commons.logging.Log;
@@ -40,6 +41,7 @@ import org.apache.pig.data.Tuple;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -66,23 +68,19 @@ public class TestHBaseStorage {
private static final String TESTCOLUMN_A = "pig:col_a";
private static final String TESTCOLUMN_B = "pig:col_b";
private static final String TESTCOLUMN_C = "pig:col_c";
+ private static final String TESTCOLUMN_D = "pig:prefixed_col_d";
+
private static final int TEST_ROW_COUNT = 100;
@BeforeClass
public static void setUp() throws Exception {
-
// This is needed by Pig
cluster = MiniCluster.buildCluster();
-
conf = cluster.getConfiguration();
- conf.setInt("mapred.map.max.attempts", 1);
util = new HBaseTestingUtility(conf);
util.startMiniZKCluster();
util.startMiniHBaseCluster(1, 1);
-
- pig = new PigServer(ExecType.LOCAL,
- ConfigurationUtil.toProperties(conf));
}
@AfterClass
@@ -98,6 +96,13 @@ public class TestHBaseStorage {
cluster.shutDown();
}
+
+ @Before
+ public void beforeTest() throws Exception {
+ pig = new PigServer(ExecType.LOCAL,
+ ConfigurationUtil.toProperties(conf));
+ }
+
@After
public void tearDown() throws Exception {
try {
@@ -122,6 +127,141 @@ public class TestHBaseStorage {
}
/**
+ * Test Load from hbase with map parameters
+ *
+ */
+ @Test
+ public void testLoadWithMap_1() throws IOException {
+ prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
+
+ pig.registerQuery("a = load 'hbase://"
+ + TESTTABLE_1
+ + "' using "
+ + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+ + TESTCOLUMN_A
+ + " "
+ + TESTCOLUMN_B
+ + " "
+ + TESTCOLUMN_C
+ + " pig:"
+ + "','-loadKey') as (rowKey, col_a, col_b, col_c, pig_cf_map);");
+ Iterator<Tuple> it = pig.openIterator("a");
+ int count = 0;
+ LOG.info("LoadFromHBase Starting");
+ while (it.hasNext()) {
+ Tuple t = it.next();
+ LOG.info("LoadFromHBase " + t);
+ String rowKey = ((DataByteArray) t.get(0)).toString();
+ String col_a = ((DataByteArray) t.get(1)).toString();
+ String col_b = ((DataByteArray) t.get(2)).toString();
+ String col_c = ((DataByteArray) t.get(3)).toString();
+ Map pig_cf_map = (Map) t.get(4);
+
+ Assert.assertEquals("00".substring((count + "").length()) + count,
+ rowKey);
+ Assert.assertEquals(count, Integer.parseInt(col_a));
+ Assert.assertEquals(count + 0.0, Double.parseDouble(col_b), 1e-6);
+ Assert.assertEquals("Text_" + count, col_c);
+
+ Assert.assertEquals(4, pig_cf_map.size());
+ Assert.assertEquals(count,
+ Integer.parseInt(pig_cf_map.get("col_a").toString()));
+ Assert.assertEquals(count + 0.0,
+ Double.parseDouble(pig_cf_map.get("col_b").toString()), 1e-6);
+ Assert.assertEquals("Text_" + count,
+ ((DataByteArray) pig_cf_map.get("col_c")).toString());
+ Assert.assertEquals("PrefixedText_" + count,
+ ((DataByteArray) pig_cf_map.get("prefixed_col_d")).toString());
+
+ count++;
+ }
+ Assert.assertEquals(TEST_ROW_COUNT, count);
+ LOG.info("LoadFromHBase done");
+ }
+
+ /**
+ * * Test Load from hbase with map parameters and column prefix
+ *
+ */
+ @Test
+ public void testLoadWithMap_2_col_prefix() throws IOException {
+ prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
+
+ pig.registerQuery("a = load 'hbase://"
+ + TESTTABLE_1
+ + "' using "
+ + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+ + "pig:prefixed_col_*"
+ + "','-loadKey') as (rowKey:chararray, pig_cf_map:map[]);");
+ Iterator<Tuple> it = pig.openIterator("a");
+ int count = 0;
+ LOG.info("LoadFromHBase Starting");
+ while (it.hasNext()) {
+ Tuple t = it.next();
+ LOG.info("LoadFromHBase " + t);
+ String rowKey = t.get(0).toString();
+ Map pig_cf_map = (Map) t.get(1);
+ Assert.assertEquals(2, t.size());
+
+ Assert.assertEquals("00".substring((count + "").length()) + count,
+ rowKey);
+ Assert.assertEquals("PrefixedText_" + count,
+ ((DataByteArray) pig_cf_map.get("prefixed_col_d")).toString());
+ Assert.assertEquals(1, pig_cf_map.size());
+
+ count++;
+ }
+ Assert.assertEquals(TEST_ROW_COUNT, count);
+ LOG.info("LoadFromHBase done");
+ }
+
+ /**
+ * Test Load from hbase with map parameters and multiple column prefixs
+ *
+ */
+ @Test
+ public void testLoadWithMap_3_col_prefix() throws IOException {
+ prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
+
+ pig.registerQuery("a = load 'hbase://"
+ + TESTTABLE_1
+ + "' using "
+ + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+ + "pig:col_* pig:prefixed_col_*"
+ + "','-loadKey') as (rowKey:chararray, pig_cf_map:map[], pig_prefix_cf_map:map[]);");
+ Iterator<Tuple> it = pig.openIterator("a");
+ int count = 0;
+ LOG.info("LoadFromHBase Starting");
+ while (it.hasNext()) {
+ Tuple t = it.next();
+ LOG.info("LoadFromHBase " + t);
+ String rowKey = t.get(0).toString();
+ Map pig_cf_map = (Map) t.get(1);
+ Map pig_prefix_cf_map = (Map) t.get(2);
+ Assert.assertEquals(3, t.size());
+
+ Assert.assertEquals("00".substring((count + "").length()) + count,
+ rowKey);
+ Assert.assertEquals("PrefixedText_" + count,
+ ((DataByteArray) pig_prefix_cf_map.get("prefixed_col_d")).toString());
+ Assert.assertEquals(1, pig_prefix_cf_map.size());
+
+ Assert.assertEquals(count,
+ Integer.parseInt(pig_cf_map.get("col_a").toString()));
+ Assert.assertEquals(count + 0.0,
+ Double.parseDouble(pig_cf_map.get("col_b").toString()), 1e-6);
+ Assert.assertEquals("Text_" + count,
+ ((DataByteArray) pig_cf_map.get("col_c")).toString());
+ Assert.assertEquals(3, pig_cf_map.size());
+
+ count++;
+ }
+ Assert.assertEquals(TEST_ROW_COUNT, count);
+ LOG.info("LoadFromHBase done");
+ }
+
+
+ /**
* load from hbase test
*
* @throws IOException
@@ -129,6 +269,10 @@ public class TestHBaseStorage {
@Test
public void testLoadFromHBase() throws IOException {
prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
+ LOG.info("QUERY: " + "a = load 'hbase://" + TESTTABLE_1 + "' using "
+ + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+ + TESTCOLUMN_A + " " + TESTCOLUMN_B + " " + TESTCOLUMN_C +" pig:col_d"
+ + "') as (col_a, col_b, col_c, col_d);");
pig.registerQuery("a = load 'hbase://" + TESTTABLE_1 + "' using "
+ "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+ TESTCOLUMN_A + " " + TESTCOLUMN_B + " " + TESTCOLUMN_C +" pig:col_d"
@@ -619,7 +763,7 @@ public class TestHBaseStorage {
try {
deleteAllRows(tableName);
} catch (Exception e) {
- // that's ok, table may not exist yet
+ // It's ok, table might not exist.
}
try {
table = util.createTable(Bytes.toBytesBinary(tableName),
@@ -627,6 +771,7 @@ public class TestHBaseStorage {
} catch (Exception e) {
table = new HTable(Bytes.toBytesBinary(tableName));
}
+
if (initData) {
for (int i = 0; i < TEST_ROW_COUNT; i++) {
String v = i + "";
@@ -643,6 +788,9 @@ public class TestHBaseStorage {
// col_c: string type
put.add(COLUMNFAMILY, Bytes.toBytes("col_c"),
Bytes.toBytes("Text_" + i));
+ // prefixed_col_d: string type
+ put.add(COLUMNFAMILY, Bytes.toBytes("prefixed_col_d"),
+ Bytes.toBytes("PrefixedText_" + i));
table.put(put);
} else {
// row key: string type
@@ -657,6 +805,9 @@ public class TestHBaseStorage {
// col_c: string type
put.add(COLUMNFAMILY, Bytes.toBytes("col_c"),
("Text_" + i).getBytes());
+ // prefixed_col_d: string type
+ put.add(COLUMNFAMILY, Bytes.toBytes("prefixed_col_d"),
+ ("PrefixedText_" + i).getBytes());
table.put(put);
}
}