You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by dv...@apache.org on 2011/04/17 08:58:34 UTC

svn commit: r1094109 - in /pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java test/org/apache/pig/test/TestHBaseStorage.java

Author: dvryaboy
Date: Sun Apr 17 06:58:34 2011
New Revision: 1094109

URL: http://svn.apache.org/viewvc?rev=1094109&view=rev
Log:
PIG-1782: Add ability to load data by column family in HBaseStorage

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
    pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1094109&r1=1094108&r2=1094109&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sun Apr 17 06:58:34 2011
@@ -32,6 +32,8 @@ PIG-1876: Typed map for Pig (daijy)
 
 IMPROVEMENTS
 
+PIG-1782: Add ability to load data by column family in HBaseStorage (billgraham via dvryaboy)
+
 PIG-1772: Pig 090 Documentation (chandec via olgan)
 
 PIG-1954: Design deployment interface for e2e test harness (gates)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1094109&r1=1094108&r2=1094109&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Sun Apr 17 06:58:34 2011
@@ -17,13 +17,15 @@
 package org.apache.pig.backend.hadoop.hbase;
 
 import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableMap;
+import java.util.HashMap;
 import java.util.Properties;
 
 import org.apache.commons.cli.CommandLine;
@@ -34,8 +36,8 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
@@ -45,6 +47,9 @@ import org.apache.hadoop.hbase.filter.Bi
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.FamilyFilter;
+import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
+import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
@@ -75,18 +80,44 @@ import org.apache.pig.data.DataByteArray
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.Utils;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.UDFContext;
-import org.apache.pig.impl.util.Utils;
 
 import com.google.common.collect.Lists;
 
 /**
- * A HBase implementation of LoadFunc and StoreFunc
+ * A HBase implementation of LoadFunc and StoreFunc.
+ * <P>
+ * Below is an example showing how to load data from HBase:
+ * <pre>{@code
+ * raw = LOAD 'hbase://SampleTable'
+ *       USING org.apache.pig.backend.hadoop.hbase.HBaseStorage(
+ *       'info:first_name info:last_name friends:* info:*', '-loadKey true -limit 5')
+ *       AS (id:bytearray, first_name:chararray, last_name:chararray, friends_map:map[], info_map:map[]);
+ * }</pre>
+ * This example loads data redundantly from the info column family just to
+ * illustrate usage. Note that the row key is inserted first in the result schema.
+ * To load only column names that start with a given prefix, specify the column
+ * name with a trailing '*'. For example passing <code>friends:bob_*</code> to
+ * the constructor in the above example would cause only columns that start with
+ * <i>bob_</i> to be loaded.
+ * <P>
+ * Below is an example showing how to store data into HBase:
+ * <pre>{@code
+ * copy = STORE raw INTO 'hbase://SampleTableCopy'
+ *       USING org.apache.pig.backend.hadoop.hbase.HBaseStorage(
+ *       'info:first_name info:last_name friends:* info:*')
+ *       AS (info:first_name info:last_name buddies:* info:*);
+ * }</pre>
+ * Note that STORE will expect the first value in the tuple to be the row key.
+ * Scalars values need to map to an explicit column descriptor and maps need to
+ * map to a column family name. In the above examples, the <code>friends</code>
+ * column family data from <code>SampleTable</code> will be written to a
+ * <code>buddies</code> column family in the <code>SampleTableCopy</code> table.
  * 
- * TODO(dmitriy) test that all this stuff works
- * TODO(dmitriy) documentation
  */
 public class HBaseStorage extends LoadFunc implements StoreFuncInterface, LoadPushDown, OrderedLoadFunc {
     
@@ -95,8 +126,10 @@ public class HBaseStorage extends LoadFu
     private final static String STRING_CASTER = "UTF8StorageConverter";
     private final static String BYTE_CASTER = "HBaseBinaryConverter";
     private final static String CASTER_PROPERTY = "pig.hbase.caster";
+    private final static String ASTERISK = "*";
+    private final static String COLON = ":";
     
-    private List<byte[][]> columnList_ = Lists.newArrayList();
+    private List<ColumnInfo> columnInfo_ = Lists.newArrayList();
     private HTable m_table;
     private Configuration m_conf;
     private RecordReader reader;
@@ -119,9 +152,7 @@ public class HBaseStorage extends LoadFu
     private LoadCaster caster_;
 
     private ResourceSchema schema_;
-
     private RequiredFieldList requiredFieldList;
-
     private boolean initialized = false;
 
     private static void populateValidOptions() { 
@@ -142,7 +173,16 @@ public class HBaseStorage extends LoadFu
      * provided columns.
      * 
      * @param columnList
-     *            columnlist that is a presented string delimited by space.
+     *        columnlist that is a presented string delimited by space. To
+     *        retreive all columns in a column family <code>Foo</code>, specify
+     *        a column as either <code>Foo:</code> or <code>Foo:*</code>. To fetch
+     *        only columns in the CF that start with <I>bar</I>, specify
+     *        <code>Foo:bar*</code>. The resulting tuple will always be the size
+     *        of the number of tokens in <code>columnList</code>. Items in the
+     *        tuple will be scalar values when a full column descriptor is
+     *        specified, or a map of column descriptors to values when a column
+     *        family is specified.
+     *
      * @throws ParseException when unale to parse arguments
      * @throws IOException 
      */
@@ -173,13 +213,13 @@ public class HBaseStorage extends LoadFu
             configuredOptions_ = parser_.parse(validOptions_, optsArr);
         } catch (ParseException e) {
             HelpFormatter formatter = new HelpFormatter();
-            formatter.printHelp( "[-loadKey] [-gt] [-gte] [-lt] [-lte] [-caching] [-caster]", validOptions_ );
+            formatter.printHelp( "[-loadKey] [-gt] [-gte] [-lt] [-lte] [-columnPrefix] [-caching] [-caster] [-limit]", validOptions_ );
             throw e;
         }
 
         loadRowKey_ = configuredOptions_.hasOption("loadKey");  
         for (String colName : colNames) {
-            columnList_.add(Bytes.toByteArrays(colName.split(":")));
+            columnInfo_.add(new ColumnInfo(colName));
         }
 
         m_conf = HBaseConfiguration.create();
@@ -191,21 +231,13 @@ public class HBaseStorage extends LoadFu
             caster_ = new HBaseBinaryConverter();
         } else {
             try {
-                @SuppressWarnings("unchecked")
-                Class<LoadCaster> casterClass = (Class<LoadCaster>) Class.forName(casterOption);
-                caster_ = casterClass.newInstance();
+              caster_ = (LoadCaster) PigContext.instantiateFuncFromSpec(casterOption);
             } catch (ClassCastException e) {
                 LOG.error("Congifured caster does not implement LoadCaster interface.");
                 throw new IOException(e);
-            } catch (ClassNotFoundException e) {
+            } catch (RuntimeException e) {
                 LOG.error("Configured caster class not found.", e);
                 throw new IOException(e);
-            } catch (InstantiationException e) {
-                LOG.error("Unable to instantiate configured caster " + casterOption, e);
-                throw new IOException(e);
-            } catch (IllegalAccessException e) {
-                LOG.error("Illegal Access Exception for configured caster " + casterOption, e); 
-                throw new IOException(e);
             }
         }
 
@@ -219,29 +251,67 @@ public class HBaseStorage extends LoadFu
         // Set filters, if any.
         if (configuredOptions_.hasOption("gt")) {
             gt_ = Bytes.toBytesBinary(Utils.slashisize(configuredOptions_.getOptionValue("gt")));
-            addFilter(CompareOp.GREATER, gt_);
+            addRowFilter(CompareOp.GREATER, gt_);
         }
         if (configuredOptions_.hasOption("lt")) {
             lt_ = Bytes.toBytesBinary(Utils.slashisize(configuredOptions_.getOptionValue("lt")));
-            addFilter(CompareOp.LESS, lt_);
+            addRowFilter(CompareOp.LESS, lt_);
         }
         if (configuredOptions_.hasOption("gte")) {
             gte_ = Bytes.toBytesBinary(Utils.slashisize(configuredOptions_.getOptionValue("gte")));
-            addFilter(CompareOp.GREATER_OR_EQUAL, gte_);
+            addRowFilter(CompareOp.GREATER_OR_EQUAL, gte_);
         }
         if (configuredOptions_.hasOption("lte")) {
             lte_ = Bytes.toBytesBinary(Utils.slashisize(configuredOptions_.getOptionValue("lte")));
-            addFilter(CompareOp.LESS_OR_EQUAL, lte_);
+            addRowFilter(CompareOp.LESS_OR_EQUAL, lte_);
+        }
+
+        // apply any column filters
+        FilterList allColumnFilters = null;
+        for (ColumnInfo colInfo : columnInfo_) {
+            if (colInfo.isColumnMap() && colInfo.getColumnPrefix() != null) {
+
+                // all column family filters roll up to one parent OR filter
+                if (allColumnFilters == null) {
+                    allColumnFilters = new FilterList(FilterList.Operator.MUST_PASS_ONE);
+        }
+
+                if (LOG.isInfoEnabled()) {
+                    LOG.info("Adding family:prefix filters with values " +
+                        Bytes.toString(colInfo.getColumnFamily()) + COLON +
+                        Bytes.toString(colInfo.getColumnPrefix()));
+    }
+
+                // each column family filter consists of a FamilyFilter AND a PrefixFilter
+                FilterList thisColumnFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL);
+                thisColumnFilter.addFilter(new FamilyFilter(CompareOp.EQUAL,
+                        new BinaryComparator(colInfo.getColumnFamily())));
+                thisColumnFilter.addFilter(new ColumnPrefixFilter(
+                        colInfo.getColumnPrefix()));
+
+                allColumnFilters.addFilter(thisColumnFilter);
+            }
+        }
+
+        if (allColumnFilters != null) {
+            addFilter(allColumnFilters);
+        }
+    }
+
+    private void addRowFilter(CompareOp op, byte[] val) {
+        if (LOG.isInfoEnabled()) {
+            LOG.info("Adding filter " + op.toString() +
+                    " with value " + Bytes.toStringBinary(val));
         }
+        addFilter(new RowFilter(op, new BinaryComparator(val)));
     }
 
-    private void addFilter(CompareOp op, byte[] val) {
-        LOG.info("Adding filter " + op.toString() + " with value " + Bytes.toStringBinary(val));
+    private void addFilter(Filter filter) {
         FilterList scanFilter = (FilterList) scan.getFilter();
         if (scanFilter == null) {
             scanFilter = new FilterList();
         }
-        scanFilter.addFilter(new RowFilter(op, new BinaryComparator(val)));
+        scanFilter.addFilter(filter);
         scan.setFilter(scanFilter);
     }
 
@@ -263,7 +333,15 @@ public class HBaseStorage extends LoadFu
                 ImmutableBytesWritable rowKey = (ImmutableBytesWritable) reader
                 .getCurrentKey();
                 Result result = (Result) reader.getCurrentValue();
-                int tupleSize=columnList_.size();
+
+                int tupleSize = columnInfo_.size();
+
+                // use a map of families -> qualifiers with the most recent
+                // version of the cell. Fetching multiple vesions could be a
+                // useful feature.
+                NavigableMap<byte[], NavigableMap<byte[], byte[]>> resultsMap =
+                        result.getNoVersionMap();
+
                 if (loadRowKey_){
                     tupleSize++;
                 }
@@ -274,13 +352,52 @@ public class HBaseStorage extends LoadFu
                     tuple.set(0, new DataByteArray(rowKey.get()));
                     startIndex++;
                 }
-                for (int i=0;i<columnList_.size();++i){
-                    byte[] cell=result.getValue(columnList_.get(i)[0],columnList_.get(i)[1]);
-                    if (cell!=null)
-                        tuple.set(i+startIndex, new DataByteArray(cell));
-                    else
-                        tuple.set(i+startIndex, null);
+                for (int i = 0;i < columnInfo_.size(); ++i){
+                    int currentIndex = startIndex + i;
+
+                    ColumnInfo columnInfo = columnInfo_.get(i);
+                    if (columnInfo.isColumnMap()) {
+                        // It's a column family so we need to iterate and set all
+                        // values found
+                        NavigableMap<byte[], byte[]> cfResults =
+                                resultsMap.get(columnInfo.getColumnFamily());
+                        Map<String, DataByteArray> cfMap =
+                                new HashMap<String, DataByteArray>();
+
+                        if (cfResults != null) {
+                            for (byte[] quantifier : cfResults.keySet()) {
+                                // We need to check against the prefix filter to
+                                // see if this value should be included. We can't
+                                // just rely on the server-side filter, since a
+                                // user could specify multiple CF filters for the
+                                // same CF.
+                                if (columnInfo.getColumnPrefix() == null ||
+                                        columnInfo.hasPrefixMatch(quantifier)) {
+
+                                    byte[] cell = cfResults.get(quantifier);
+                                    DataByteArray value =
+                                            cell == null ? null : new DataByteArray(cell);
+                                    cfMap.put(Bytes.toString(quantifier), value);
+                                }
+                            }
+                        }
+                        tuple.set(currentIndex, cfMap);
+                    } else {
+                        // It's a column so set the value
+                        byte[] cell=result.getValue(columnInfo.getColumnFamily(),
+                                                    columnInfo.getColumnName());
+                        DataByteArray value =
+                                cell == null ? null : new DataByteArray(cell);
+                        tuple.set(currentIndex, value);
+                    }
+                }
+
+                if (LOG.isDebugEnabled()) {
+                    for (int i = 0; i < tuple.size(); i++) {
+                        LOG.debug("tuple value:" + tuple.get(i));
+                    }
                 }
+
                 return tuple;
             }
         } catch (InterruptedException e) {
@@ -339,8 +456,16 @@ public class HBaseStorage extends LoadFu
             return;
         }
 
-        for (byte[][] col : columnList_) {
-            scan.addColumn(col[0], col[1]);
+        for (ColumnInfo columnInfo : columnInfo_) {
+            // do we have a column family, or a column?
+            if (columnInfo.isColumnMap()) {
+                scan.addFamily(columnInfo.getColumnFamily());
+            }
+            else {
+                scan.addColumn(columnInfo.getColumnFamily(),
+                               columnInfo.getColumnName());
+            }
+
         }
         if (requiredFieldList != null) {
             Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass(),
@@ -423,10 +548,38 @@ public class HBaseStorage extends LoadFu
                 (fieldSchemas == null) ? DataType.findType(t.get(0)) : fieldSchemas[0].getType()));
         long ts=System.currentTimeMillis();
         
+        if (LOG.isDebugEnabled()) {
+            for (ColumnInfo columnInfo : columnInfo_) {
+                LOG.debug("putNext -- col: " + columnInfo);
+            }
+        }
+
         for (int i=1;i<t.size();++i){
-            put.add(columnList_.get(i-1)[0], columnList_.get(i-1)[1], ts, objToBytes(t.get(i),
-                    (fieldSchemas == null) ? DataType.findType(t.get(i)) : fieldSchemas[i].getType()));
+            ColumnInfo columnInfo = columnInfo_.get(i-1);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("putNext - tuple: " + i + ", value=" + t.get(i) +
+                        ", cf:column=" + columnInfo);
+        }
+
+            if (!columnInfo.isColumnMap()) {
+                put.add(columnInfo.getColumnFamily(), columnInfo.getColumnName(),
+                        ts, objToBytes(t.get(i), (fieldSchemas == null) ?
+                        DataType.findType(t.get(i)) : fieldSchemas[i].getType()));
+            } else {
+                Map<String, Object> cfMap = (Map<String, Object>) t.get(i);
+                for (String colName : cfMap.keySet()) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("putNext - colName=" + colName +
+                                  ", class: " + colName.getClass());
+                    }
+                    // TODO deal with the fact that maps can have types now. Currently we detect types at
+                    // runtime in the case of storing to a cf, which is suboptimal.
+                    put.add(columnInfo.getColumnFamily(), Bytes.toBytes(colName.toString()), ts,
+                            objToBytes(cfMap.get(colName), DataType.findType(cfMap.get(colName))));
+                }
+            }
         }
+
         try {
             writer.write(null, put);
         } catch (InterruptedException e) {
@@ -499,6 +652,8 @@ public class HBaseStorage extends LoadFu
     @Override
     public RequiredFieldResponse pushProjection(
             RequiredFieldList requiredFieldList) throws FrontendException {
+        List<RequiredField>  requiredFields = requiredFieldList.getFields();
+        List<ColumnInfo> newColumns = Lists.newArrayListWithExpectedSize(requiredFields.size());
 
         // colOffset is the offset in our columnList that we need to apply to indexes we get from requiredFields
         // (row key is not a real column)
@@ -506,17 +661,12 @@ public class HBaseStorage extends LoadFu
         // projOffset is the offset to the requiredFieldList we need to apply when figuring out which columns to prune.
         // (if key is pruned, we should skip row key's element in this list when trimming colList)
         int projOffset = colOffset;
-
         this.requiredFieldList = requiredFieldList;
-        List<RequiredField>  requiredFields = requiredFieldList.getFields();
-        if (requiredFieldList != null && requiredFields.size() > (columnList_.size() + colOffset)) {
+
+        if (requiredFieldList != null && requiredFields.size() > (columnInfo_.size() + colOffset)) {
             throw new FrontendException("The list of columns to project from HBase is larger than HBaseStorage is configured to load.");
         }
 
-        List<byte[][]> newColumns = Lists.newArrayListWithExpectedSize(requiredFields.size());
-        
-        // HBase Row Key is the first column in the schema when it's loaded, 
-        // and is not included in the columnList (since it's not a proper column).
         if (loadRowKey_ &&
                 ( requiredFields.size() < 1 || requiredFields.get(0).getIndex() != 0)) {
                 loadRowKey_ = false;
@@ -525,9 +675,16 @@ public class HBaseStorage extends LoadFu
         
         for (int i = projOffset; i < requiredFields.size(); i++) {
             int fieldIndex = requiredFields.get(i).getIndex();
-            newColumns.add(columnList_.get(fieldIndex - colOffset));
+            newColumns.add(columnInfo_.get(fieldIndex - colOffset));
         }
-        columnList_ = newColumns;
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("pushProjection After Projection: loadRowKey is " + loadRowKey_) ;
+            for (ColumnInfo colInfo : newColumns) {
+                LOG.debug("pushProjection -- col: " + colInfo);
+        }
+        }
+        columnInfo_ = newColumns;
         return new RequiredFieldResponse(true);
     }
 
@@ -554,4 +711,54 @@ public class HBaseStorage extends LoadFu
         };
     }
 
+    /**
+     * Class to encapsulate logic around which column names were specified in each
+     * position of the column list. Users can specify columns names in one of 4
+     * ways: 'Foo:', 'Foo:*', 'Foo:bar*' or 'Foo:bar'. The first 3 result in a
+     * Map being added to the tuple, while the last results in a scalar. The 3rd
+     * form results in a prefix-filtered Map.
+     */
+    private class ColumnInfo {
+
+        final String originalColumnName;  // always set
+        final byte[] columnFamily; // always set
+        final byte[] columnName; // set if it exists and doesn't contain '*'
+        final byte[] columnPrefix; // set if contains a prefix followed by '*'
+
+        public ColumnInfo(String colName) {
+            originalColumnName = colName;
+            String[] cfAndColumn = colName.split(COLON, 2);
+
+            //CFs are byte[1] and columns are byte[2]
+            columnFamily = Bytes.toBytes(cfAndColumn[0]);
+            if (cfAndColumn.length > 1 &&
+                    cfAndColumn[1].length() > 0 && !ASTERISK.equals(cfAndColumn[1])) {
+                if (cfAndColumn[1].endsWith(ASTERISK)) {
+                    columnPrefix = Bytes.toBytes(cfAndColumn[1].substring(0,
+                            cfAndColumn[1].length() - 1));
+                    columnName = null;
+                }
+                else {
+                    columnName = Bytes.toBytes(cfAndColumn[1]);
+                    columnPrefix = null;
+                }
+            } else {
+              columnPrefix = null;
+              columnName = null;
+            }
+        }
+
+        public byte[] getColumnFamily() { return columnFamily; }
+        public byte[] getColumnName() { return columnName; }
+        public byte[] getColumnPrefix() { return columnPrefix; }
+        public boolean isColumnMap() { return columnName == null; }
+
+        public boolean hasPrefixMatch(byte[] qualifier) {
+            return Bytes.startsWith(qualifier, columnPrefix);
+        }
+
+        @Override
+        public String toString() { return originalColumnName; }
+    }
+
 }

Modified: pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java?rev=1094109&r1=1094108&r2=1094109&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java Sun Apr 17 06:58:34 2011
@@ -18,6 +18,7 @@ package org.apache.pig.test;
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -40,6 +41,7 @@ import org.apache.pig.data.Tuple;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -66,23 +68,19 @@ public class TestHBaseStorage {
     private static final String TESTCOLUMN_A = "pig:col_a";
     private static final String TESTCOLUMN_B = "pig:col_b";
     private static final String TESTCOLUMN_C = "pig:col_c";
+    private static final String TESTCOLUMN_D = "pig:prefixed_col_d";
+
     private static final int TEST_ROW_COUNT = 100;
 
     @BeforeClass
     public static void setUp() throws Exception {
-
         // This is needed by Pig
         cluster = MiniCluster.buildCluster();
-
         conf = cluster.getConfiguration();
-        conf.setInt("mapred.map.max.attempts", 1);
 
         util = new HBaseTestingUtility(conf);
         util.startMiniZKCluster();
         util.startMiniHBaseCluster(1, 1);
-
-        pig = new PigServer(ExecType.LOCAL,
-                ConfigurationUtil.toProperties(conf));
     }
 
     @AfterClass
@@ -98,6 +96,13 @@ public class TestHBaseStorage {
         cluster.shutDown();
     }
 
+
+    @Before
+    public void beforeTest() throws Exception {
+        pig = new PigServer(ExecType.LOCAL,
+                ConfigurationUtil.toProperties(conf));
+    }
+
     @After
     public void tearDown() throws Exception {
         try {
@@ -122,6 +127,141 @@ public class TestHBaseStorage {
     }
 
     /**
+     * Test Load from hbase with map parameters
+     *
+     */
+    @Test
+    public void testLoadWithMap_1() throws IOException {
+        prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
+
+        pig.registerQuery("a = load 'hbase://"
+                + TESTTABLE_1
+                + "' using "
+                + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+                + TESTCOLUMN_A
+                + " "
+                + TESTCOLUMN_B
+                + " "
+                + TESTCOLUMN_C
+                + " pig:"
+                + "','-loadKey') as (rowKey, col_a, col_b, col_c, pig_cf_map);");
+        Iterator<Tuple> it = pig.openIterator("a");
+        int count = 0;
+        LOG.info("LoadFromHBase Starting");
+        while (it.hasNext()) {
+            Tuple t = it.next();
+            LOG.info("LoadFromHBase " + t);
+            String rowKey = ((DataByteArray) t.get(0)).toString();
+            String col_a = ((DataByteArray) t.get(1)).toString();
+            String col_b = ((DataByteArray) t.get(2)).toString();
+            String col_c = ((DataByteArray) t.get(3)).toString();
+            Map pig_cf_map = (Map) t.get(4);
+
+            Assert.assertEquals("00".substring((count + "").length()) + count,
+                    rowKey);
+            Assert.assertEquals(count, Integer.parseInt(col_a));
+            Assert.assertEquals(count + 0.0, Double.parseDouble(col_b), 1e-6);
+            Assert.assertEquals("Text_" + count, col_c);
+
+            Assert.assertEquals(4, pig_cf_map.size());
+            Assert.assertEquals(count,
+                    Integer.parseInt(pig_cf_map.get("col_a").toString()));
+            Assert.assertEquals(count + 0.0,
+                    Double.parseDouble(pig_cf_map.get("col_b").toString()), 1e-6);
+            Assert.assertEquals("Text_" + count,
+                    ((DataByteArray) pig_cf_map.get("col_c")).toString());
+            Assert.assertEquals("PrefixedText_" + count,
+                    ((DataByteArray) pig_cf_map.get("prefixed_col_d")).toString());
+
+            count++;
+        }
+        Assert.assertEquals(TEST_ROW_COUNT, count);
+        LOG.info("LoadFromHBase done");
+    }
+
+    /**
+     *     * Test Load from hbase with map parameters and column prefix
+     *
+     */
+    @Test
+    public void testLoadWithMap_2_col_prefix() throws IOException {
+        prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
+
+        pig.registerQuery("a = load 'hbase://"
+                + TESTTABLE_1
+                + "' using "
+                + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+                + "pig:prefixed_col_*"
+                + "','-loadKey') as (rowKey:chararray, pig_cf_map:map[]);");
+        Iterator<Tuple> it = pig.openIterator("a");
+        int count = 0;
+        LOG.info("LoadFromHBase Starting");
+        while (it.hasNext()) {
+            Tuple t = it.next();
+            LOG.info("LoadFromHBase " + t);
+            String rowKey = t.get(0).toString();
+            Map pig_cf_map = (Map) t.get(1);
+            Assert.assertEquals(2, t.size());
+
+            Assert.assertEquals("00".substring((count + "").length()) + count,
+                    rowKey);
+            Assert.assertEquals("PrefixedText_" + count,
+                    ((DataByteArray) pig_cf_map.get("prefixed_col_d")).toString());
+            Assert.assertEquals(1, pig_cf_map.size());
+
+            count++;
+        }
+        Assert.assertEquals(TEST_ROW_COUNT, count);
+        LOG.info("LoadFromHBase done");
+    }
+
+    /**
+     * Test Load from hbase with map parameters and multiple column prefixs
+     *
+     */
+    @Test
+    public void testLoadWithMap_3_col_prefix() throws IOException {
+        prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
+
+        pig.registerQuery("a = load 'hbase://"
+                + TESTTABLE_1
+                + "' using "
+                + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+                + "pig:col_* pig:prefixed_col_*"
+                + "','-loadKey') as (rowKey:chararray, pig_cf_map:map[], pig_prefix_cf_map:map[]);");
+        Iterator<Tuple> it = pig.openIterator("a");
+        int count = 0;
+        LOG.info("LoadFromHBase Starting");
+        while (it.hasNext()) {
+            Tuple t = it.next();
+            LOG.info("LoadFromHBase " + t);
+            String rowKey = t.get(0).toString();
+            Map pig_cf_map = (Map) t.get(1);
+            Map pig_prefix_cf_map = (Map) t.get(2);
+            Assert.assertEquals(3, t.size());
+
+            Assert.assertEquals("00".substring((count + "").length()) + count,
+                    rowKey);
+            Assert.assertEquals("PrefixedText_" + count,
+                    ((DataByteArray) pig_prefix_cf_map.get("prefixed_col_d")).toString());
+            Assert.assertEquals(1, pig_prefix_cf_map.size());
+
+            Assert.assertEquals(count,
+                    Integer.parseInt(pig_cf_map.get("col_a").toString()));
+            Assert.assertEquals(count + 0.0,
+                    Double.parseDouble(pig_cf_map.get("col_b").toString()), 1e-6);
+            Assert.assertEquals("Text_" + count,
+                    ((DataByteArray) pig_cf_map.get("col_c")).toString());
+            Assert.assertEquals(3, pig_cf_map.size());
+
+            count++;
+        }
+        Assert.assertEquals(TEST_ROW_COUNT, count);
+        LOG.info("LoadFromHBase done");
+    }
+
+
+    /**
      * load from hbase test
      * 
      * @throws IOException
@@ -129,6 +269,10 @@ public class TestHBaseStorage {
     @Test
     public void testLoadFromHBase() throws IOException {
         prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
+        LOG.info("QUERY: " + "a = load 'hbase://" + TESTTABLE_1 + "' using "
+                + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+                + TESTCOLUMN_A + " " + TESTCOLUMN_B + " " + TESTCOLUMN_C +" pig:col_d"
+                + "') as (col_a, col_b, col_c, col_d);");
         pig.registerQuery("a = load 'hbase://" + TESTTABLE_1 + "' using "
                 + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
                 + TESTCOLUMN_A + " " + TESTCOLUMN_B + " " + TESTCOLUMN_C +" pig:col_d"
@@ -619,7 +763,7 @@ public class TestHBaseStorage {
         try {
             deleteAllRows(tableName);
         } catch (Exception e) {
-        // that's ok, table may not exist yet
+            // It's ok, table might not exist.
         }
         try {
         table = util.createTable(Bytes.toBytesBinary(tableName),
@@ -627,6 +771,7 @@ public class TestHBaseStorage {
         } catch (Exception e) {
             table = new HTable(Bytes.toBytesBinary(tableName));
         }
+
         if (initData) {
             for (int i = 0; i < TEST_ROW_COUNT; i++) {
                 String v = i + "";
@@ -643,6 +788,9 @@ public class TestHBaseStorage {
                     // col_c: string type
                     put.add(COLUMNFAMILY, Bytes.toBytes("col_c"),
                             Bytes.toBytes("Text_" + i));
+                    // prefixed_col_d: string type
+                    put.add(COLUMNFAMILY, Bytes.toBytes("prefixed_col_d"),
+                            Bytes.toBytes("PrefixedText_" + i));
                     table.put(put);
                 } else {
                     // row key: string type
@@ -657,6 +805,9 @@ public class TestHBaseStorage {
                     // col_c: string type
                     put.add(COLUMNFAMILY, Bytes.toBytes("col_c"),
                             ("Text_" + i).getBytes());
+                    // prefixed_col_d: string type
+                    put.add(COLUMNFAMILY, Bytes.toBytes("prefixed_col_d"),
+                            ("PrefixedText_" + i).getBytes());
                     table.put(put);
                 }
             }