You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by dv...@apache.org on 2011/04/16 23:31:41 UTC

svn commit: r1094057 - in /pig/branches/branch-0.8: CHANGES.txt src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java test/org/apache/pig/test/TestHBaseStorage.java

Author: dvryaboy
Date: Sat Apr 16 21:31:40 2011
New Revision: 1094057

URL: http://svn.apache.org/viewvc?rev=1094057&view=rev
Log:
PIG-1870: HBaseStorage does not project correctly

Modified:
    pig/branches/branch-0.8/CHANGES.txt
    pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
    pig/branches/branch-0.8/test/org/apache/pig/test/TestHBaseStorage.java

Modified: pig/branches/branch-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/CHANGES.txt?rev=1094057&r1=1094056&r2=1094057&view=diff
==============================================================================
--- pig/branches/branch-0.8/CHANGES.txt (original)
+++ pig/branches/branch-0.8/CHANGES.txt Sat Apr 16 21:31:40 2011
@@ -34,6 +34,8 @@ PIG-1886: Add zookeeper jar to list of j
 
 BUG FIXES
 
+PIG-1870: HBaseStorage doesn't project correctly (dvryaboy)
+
 PIG-1979: New logical plan failing with ERROR 2229: Couldn't find matching uid -1 (daijy)
 
 PIG-1977: "Stream closed" error while reading Pig temp files (results of intermediate jobs) (rding)

Modified: pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1094057&r1=1094056&r2=1094057&view=diff
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original)
+++ pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Sat Apr 16 21:31:40 2011
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -75,6 +76,8 @@ import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.impl.util.Utils;
 
 import com.google.common.collect.Lists;
@@ -99,6 +102,7 @@ public class HBaseStorage extends LoadFu
     private RecordReader reader;
     private RecordWriter writer;
     private Scan scan;
+    private String contextSignature = null;
 
     private final CommandLine configuredOptions_;
     private final static Options validOptions_ = new Options();
@@ -116,6 +120,10 @@ public class HBaseStorage extends LoadFu
 
     private ResourceSchema schema_;
 
+    private RequiredFieldList requiredFieldList;
+
+    private boolean initialized = false;
+
     private static void populateValidOptions() { 
         validOptions_.addOption("loadKey", false, "Load Key");
         validOptions_.addOption("gt", true, "Records must be greater than this value " +
@@ -240,6 +248,17 @@ public class HBaseStorage extends LoadFu
     @Override
     public Tuple getNext() throws IOException {
         try {
+            if (!initialized) {
+                Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass(),
+                        new String[] {contextSignature});
+
+                String projectedFields = p.getProperty(contextSignature+"_projectedFields");
+                if (projectedFields != null) {
+                    requiredFieldList = (RequiredFieldList) ObjectSerializer.deserialize(projectedFields);
+                    pushProjection(requiredFieldList);
+                }
+                initialized = true;
+            }
             if (reader.nextKeyValue()) {
                 ImmutableBytesWritable rowKey = (ImmutableBytesWritable) reader
                 .getCurrentKey();
@@ -289,8 +308,16 @@ public class HBaseStorage extends LoadFu
     }
 
     @Override
+    public void setUDFContextSignature(String signature) {
+        this.contextSignature = signature;
+    }
+
+    @Override
     public void setLocation(String location, Job job) throws IOException {
         job.getConfiguration().setBoolean("pig.noSplitCombination", true);
+        m_conf = job.getConfiguration();
+        HBaseConfiguration.addHbaseResources(m_conf);
+
         // Make sure the HBase, ZooKeeper, and Guava jars get shipped.
         TableMapReduceUtil.addDependencyJars(job.getConfiguration(), 
             org.apache.hadoop.hbase.client.HTable.class,
@@ -304,12 +331,22 @@ public class HBaseStorage extends LoadFu
         if (m_table == null) {
             m_table = new HTable(m_conf, tablename);
         }
-        HBaseConfiguration.addHbaseResources(m_conf);
         m_table.setScannerCaching(caching_);
         m_conf.set(TableInputFormat.INPUT_TABLE, tablename);
+
+        // Set up scan if it is not already set up.
+        if (m_conf.get(TableInputFormat.SCAN) != null) {
+            return;
+        }
+
         for (byte[][] col : columnList_) {
             scan.addColumn(col[0], col[1]);
         }
+        if (requiredFieldList != null) {
+            Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass(),
+                    new String[] {contextSignature});
+            p.setProperty(contextSignature + "_projectedFields", ObjectSerializer.serialize(requiredFieldList));
+        }
         m_conf.set(TableInputFormat.SCAN, convertScanToString(scan));
     }
 
@@ -372,6 +409,15 @@ public class HBaseStorage extends LoadFu
     @SuppressWarnings("unchecked")
     @Override
     public void putNext(Tuple t) throws IOException {
+        if (!initialized) {
+            Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass(),
+                    new String[] {contextSignature});
+            String serializedSchema = p.getProperty(contextSignature + "_schema");
+            if (serializedSchema!= null) {
+                schema_ = (ResourceSchema) ObjectSerializer.deserialize(serializedSchema);
+            }
+            initialized = true;
+        }
         ResourceFieldSchema[] fieldSchemas = (schema_ == null) ? null : schema_.getFields();
         Put put=new Put(objToBytes(t.get(0), 
                 (fieldSchemas == null) ? DataType.findType(t.get(0)) : fieldSchemas[0].getType()));
@@ -391,6 +437,7 @@ public class HBaseStorage extends LoadFu
     @SuppressWarnings("unchecked")
     private byte[] objToBytes(Object o, byte type) throws IOException {
         LoadStoreCaster caster = (LoadStoreCaster) caster_;
+        if (o == null) return null;
         switch (type) {
         case DataType.BYTEARRAY: return ((DataByteArray) o).get();
         case DataType.BAG: return caster.toBytes((DataBag) o);
@@ -418,7 +465,9 @@ public class HBaseStorage extends LoadFu
     }
 
     @Override
-    public void setStoreFuncUDFContextSignature(String signature) { }
+    public void setStoreFuncUDFContextSignature(String signature) {
+        this.contextSignature = signature;
+    }
 
     @Override
     public void setStoreLocation(String location, Job job) throws IOException {
@@ -427,7 +476,11 @@ public class HBaseStorage extends LoadFu
         }else{
             job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, location);
         }
-        m_conf = HBaseConfiguration.create(job.getConfiguration());
+        Properties props = UDFContext.getUDFContext().getUDFProperties(getClass(), new String[]{contextSignature});
+        if (!props.containsKey(contextSignature + "_schema")) {
+            props.setProperty(contextSignature + "_schema",  ObjectSerializer.serialize(schema_));
+    }
+        m_conf = HBaseConfiguration.addHbaseResources(job.getConfiguration());
     }
 
     @Override
@@ -446,29 +499,33 @@ public class HBaseStorage extends LoadFu
     @Override
     public RequiredFieldResponse pushProjection(
             RequiredFieldList requiredFieldList) throws FrontendException {
+
+        // colOffset is the offset in our columnList that we need to apply to indexes we get from requiredFields
+        // (row key is not a real column)
+        int colOffset = loadRowKey_ ? 1 : 0;
+        // projOffset is the offset to the requiredFieldList we need to apply when figuring out which columns to prune.
+        // (if key is pruned, we should skip row key's element in this list when trimming colList)
+        int projOffset = colOffset;
+
+        this.requiredFieldList = requiredFieldList;
         List<RequiredField>  requiredFields = requiredFieldList.getFields();
+        if (requiredFieldList != null && requiredFields.size() > (columnList_.size() + colOffset)) {
+            throw new FrontendException("The list of columns to project from HBase is larger than HBaseStorage is configured to load.");
+        }
+
         List<byte[][]> newColumns = Lists.newArrayListWithExpectedSize(requiredFields.size());
         
         // HBase Row Key is the first column in the schema when it's loaded, 
         // and is not included in the columnList (since it's not a proper column).
-        int offset = loadRowKey_ ? 1 : 0;
-        
-        if (loadRowKey_) {
-            if (requiredFields.size() < 1 || requiredFields.get(0).getIndex() != 0) {
+        if (loadRowKey_ &&
+                ( requiredFields.size() < 1 || requiredFields.get(0).getIndex() != 0)) {
                 loadRowKey_ = false;
-            } else {
-                // We just processed the fact that the row key needs to be loaded.
-                requiredFields.remove(0);
+            projOffset = 0;
             }
-        }
         
-        for (RequiredField field : requiredFields) {
-            int fieldIndex = field.getIndex();
-            newColumns.add(columnList_.get(fieldIndex - offset));
-        }
-        LOG.debug("pushProjection After Projection: loadRowKey is " + loadRowKey_) ;
-        for (byte[][] col : newColumns) {
-            LOG.debug("pushProjection -- col: " + Bytes.toStringBinary(col[0]) + ":" + Bytes.toStringBinary(col[1]));
+        for (int i = projOffset; i < requiredFields.size(); i++) {
+            int fieldIndex = requiredFields.get(i).getIndex();
+            newColumns.add(columnList_.get(fieldIndex - colOffset));
         }
         columnList_ = newColumns;
         return new RequiredFieldResponse(true);

Modified: pig/branches/branch-0.8/test/org/apache/pig/test/TestHBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/test/org/apache/pig/test/TestHBaseStorage.java?rev=1094057&r1=1094056&r2=1094057&view=diff
==============================================================================
--- pig/branches/branch-0.8/test/org/apache/pig/test/TestHBaseStorage.java (original)
+++ pig/branches/branch-0.8/test/org/apache/pig/test/TestHBaseStorage.java Sat Apr 16 21:31:40 2011
@@ -18,12 +18,14 @@ package org.apache.pig.test;
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
@@ -41,6 +43,8 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import com.google.common.collect.Lists;
+
 public class TestHBaseStorage {
 
 	private static final Log LOG = LogFactory.getLog(TestHBaseStorage.class);
@@ -66,7 +70,6 @@ public class TestHBaseStorage {
 
 	@BeforeClass
 	public static void setUp() throws Exception {
-
         // This is needed by Pig
         MiniCluster cluster = MiniCluster.buildCluster();
 
@@ -78,7 +81,7 @@ public class TestHBaseStorage {
         util.startMiniHBaseCluster(1, 1);
 
 
-		pig = new PigServer(ExecType.MAPREDUCE,
+        pig = new PigServer(ExecType.LOCAL,
 				ConfigurationUtil.toProperties(conf));
 	}
 
@@ -97,14 +100,26 @@ public class TestHBaseStorage {
 	@After
 	public void tearDown() throws Exception {
         try {
-            util.deleteTable(Bytes.toBytesBinary(TESTTABLE_1));
+            deleteAllRows(TESTTABLE_1);
         } catch (IOException e) {}
         try {
-            util.deleteTable(Bytes.toBytesBinary(TESTTABLE_2));
+            deleteAllRows(TESTTABLE_2);
         } catch (IOException e) {}
 		pig.shutdown();
 	}
 
+    // DVR: I've found that it is faster to delete all rows in small tables
+    // than to drop them.
+    private void deleteAllRows(String tableName) throws Exception {
+        HTable table = new HTable(conf, tableName);
+        ResultScanner scanner = table.getScanner(new Scan());
+        List<Delete> deletes = Lists.newArrayList();
+        for (Result row : scanner) {
+            deletes.add(new Delete(row.getRow()));
+        }
+        table.delete(deletes);
+    }
+
 	/**
 	 * load from hbase test
 	 * 
@@ -122,7 +137,6 @@ public class TestHBaseStorage {
 		LOG.info("LoadFromHBase Starting");
 		while (it.hasNext()) {
 			Tuple t = it.next();
-			LOG.info("LoadFromHBase " + t);
 			String col_a = ((DataByteArray) t.get(0)).toString();
 			String col_b = ((DataByteArray) t.get(1)).toString();
 			String col_c = ((DataByteArray) t.get(2)).toString();
@@ -151,10 +165,9 @@ public class TestHBaseStorage {
 				+ "') as (col_a, col_b, col_c);");
 		Iterator<Tuple> it = pig.openIterator("a");
 		int count = 0;
-		LOG.info("LoadFromHBase Starting");
+        LOG.info("BackwardsCompatibility Starting");
 		while (it.hasNext()) {
 			Tuple t = it.next();
-			LOG.info("LoadFromHBase " + t);
 			String col_a = ((DataByteArray) t.get(0)).toString();
 			String col_b = ((DataByteArray) t.get(1)).toString();
 			String col_c = ((DataByteArray) t.get(2)).toString();
@@ -165,7 +178,7 @@ public class TestHBaseStorage {
 			count++;
 		}
 		Assert.assertEquals(TEST_ROW_COUNT, count);
-		LOG.info("LoadFromHBase done");
+        LOG.info("BackwardsCompatibility done");
 	}
 
 	/**
@@ -182,10 +195,9 @@ public class TestHBaseStorage {
 				+ "','-loadKey') as (rowKey,col_a, col_b, col_c);");
 		Iterator<Tuple> it = pig.openIterator("a");
 		int count = 0;
-		LOG.info("LoadFromHBase Starting");
+        LOG.info("LoadFromHBaseWithRowKey Starting");
 		while (it.hasNext()) {
 			Tuple t = it.next();
-			LOG.info("LoadFromHBase " + t);
 			String rowKey = ((DataByteArray) t.get(0)).toString();
 			String col_a = ((DataByteArray) t.get(1)).toString();
 			String col_b = ((DataByteArray) t.get(2)).toString();
@@ -200,7 +212,7 @@ public class TestHBaseStorage {
 			count++;
 		}
 		Assert.assertEquals(TEST_ROW_COUNT, count);
-		LOG.info("LoadFromHBase done");
+        LOG.info("LoadFromHBaseWithRowKey done");
 	}
 
 	/**
@@ -224,10 +236,9 @@ public class TestHBaseStorage {
 		Iterator<Tuple> it = pig.openIterator("a");
 		int count = 0;
 		int next = 1;
-		LOG.info("LoadFromHBase Starting");
+        LOG.info("LoadFromHBaseWithParameters_1 Starting");
 		while (it.hasNext()) {
 			Tuple t = it.next();
-			LOG.info("LoadFromHBase " + t);
 			String rowKey = ((DataByteArray) t.get(0)).toString();
 			String col_a = ((DataByteArray) t.get(1)).toString();
 			String col_b = ((DataByteArray) t.get(2)).toString();
@@ -243,7 +254,7 @@ public class TestHBaseStorage {
 			next++;
 		}
 		Assert.assertEquals(TEST_ROW_COUNT - 2, count);
-		LOG.info("LoadFromHBase done");
+        LOG.info("LoadFromHBaseWithParameters_1 done");
 	}
 
 	/**
@@ -266,10 +277,9 @@ public class TestHBaseStorage {
 		Iterator<Tuple> it = pig.openIterator("a");
 		int count = 0;
 		int next = 1;
-		LOG.info("LoadFromHBase Starting");
+        LOG.info("LoadFromHBaseWithParameters_2 Starting");
 		while (it.hasNext()) {
 			Tuple t = it.next();
-			LOG.info("LoadFromHBase " + t);
 			String rowKey = ((DataByteArray) t.get(0)).toString();
 			String col_a = ((DataByteArray) t.get(1)).toString();
 			String col_b = ((DataByteArray) t.get(2)).toString();
@@ -285,7 +295,7 @@ public class TestHBaseStorage {
 			next++;
 		}
 		Assert.assertEquals(TEST_ROW_COUNT - 2, count);
-		LOG.info("LoadFromHBase done");
+        LOG.info("LoadFromHBaseWithParameters_2 done");
 	}
 
 	/**
@@ -300,10 +310,9 @@ public class TestHBaseStorage {
 				+ "','-loadKey -limit 10') as (rowKey,col_a, col_b, col_c);");
 		Iterator<Tuple> it = pig.openIterator("a");
 		int count = 0;
-		LOG.info("LoadFromHBase Starting");
+        LOG.info("LoadFromHBaseWithParameters_3 Starting");
 		while (it.hasNext()) {
 			Tuple t = it.next();
-			LOG.info("LoadFromHBase " + t);
 			String rowKey = ((DataByteArray) t.get(0)).toString();
 			String col_a = ((DataByteArray) t.get(1)).toString();
 			String col_b = ((DataByteArray) t.get(2)).toString();
@@ -319,32 +328,69 @@ public class TestHBaseStorage {
 		}
 		// 'limit' apply for each region and here we have only one region
 		Assert.assertEquals(10, count);
-		LOG.info("LoadFromHBase done");
+        LOG.info("LoadFromHBaseWithParameters_3 done");
 	}
 
 	/**
+     * Test Load from hbase with projection.
+     */
+    @Test
+    public void testLoadWithProjection_1() throws IOException {
+        prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
+        scanTable1(pig, DataFormat.HBaseBinary);
+        pig.registerQuery("b = FOREACH a GENERATE col_a, col_c;");
+        Iterator<Tuple> it = pig.openIterator("b");
+        int index = 0;
+        LOG.info("testLoadWithProjection_1 Starting");
+        while (it.hasNext()) {
+            Tuple t = it.next();
+            int col_a = (Integer) t.get(0);
+            String col_c = (String) t.get(1);
+            Assert.assertEquals(index, col_a);
+            Assert.assertEquals("Text_" + index, col_c);
+            Assert.assertEquals(2, t.size());
+            index++;
+        }
+        Assert.assertEquals(100, index);
+        LOG.info("testLoadWithProjection_1 done");
+    }
+
+    /**
+     * Test Load from hbase with projection and the default caster.
+     */
+    @Test
+    public void testLoadWithProjection_2() throws IOException {
+        prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
+        scanTable1(pig, DataFormat.UTF8PlainText);
+        pig.registerQuery("b = FOREACH a GENERATE col_a, col_c;");
+        Iterator<Tuple> it = pig.openIterator("b");
+        int index = 0;
+        LOG.info("testLoadWithProjection_2 Starting");
+        while (it.hasNext()) {
+            Tuple t = it.next();
+            int col_a = (Integer) t.get(0);
+            String col_c = (String) t.get(1);
+            Assert.assertEquals(index, col_a);
+            Assert.assertEquals("Text_" + index, col_c);
+            Assert.assertEquals(2, t.size());
+            index++;
+        }
+        Assert.assertEquals(100, index);
+        LOG.info("testLoadWithProjection_2 done");
+    }
+
+    /**
 	 * Test Load from hbase using HBaseBinaryConverter
 	 */
 	@Test
 	public void testHBaseBinaryConverter() throws IOException {
 		prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
-
-		pig.registerQuery("a = load 'hbase://"
-				+ TESTTABLE_1
-				+ "' using "
-				+ "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
-				+ TESTCOLUMN_A
-				+ " "
-				+ TESTCOLUMN_B
-				+ " "
-				+ TESTCOLUMN_C
-				+ "','-loadKey -caster HBaseBinaryConverter') as (rowKey:chararray,col_a:int, col_b:double, col_c:chararray);");
+        scanTable1(pig, DataFormat.HBaseBinary);
 		Iterator<Tuple> it = pig.openIterator("a");
 		int index = 0;
-		LOG.info("LoadFromHBase Starting");
+        LOG.info("testHBaseBinaryConverter Starting");
 		while (it.hasNext()) {
 			Tuple t = it.next();
-			LOG.info("LoadFromHBase " + t);
 			String rowKey = (String) t.get(0);
 			int col_a = (Integer) t.get(1);
 			double col_b = (Double) t.get(2);
@@ -357,7 +403,7 @@ public class TestHBaseStorage {
 			Assert.assertEquals("Text_" + index, col_c);
 			index++;
 		}
-		LOG.info("LoadFromHBase done");
+        LOG.info("testHBaseBinaryConverter done");
 	}
 
 	/**
@@ -370,23 +416,13 @@ public class TestHBaseStorage {
 	public void testStoreToHBase_1() throws IOException {
 		prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
 		prepareTable(TESTTABLE_2, false, DataFormat.HBaseBinary);
-
-		pig.registerQuery("a = load 'hbase://"
-				+ TESTTABLE_1
-				+ "' using "
-				+ "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
-				+ TESTCOLUMN_A
-				+ " "
-				+ TESTCOLUMN_B
-				+ " "
-				+ TESTCOLUMN_C
-				+ "','-loadKey -caster HBaseBinaryConverter') as (rowKey:chararray,col_a:int, col_b:double, col_c:chararray);");
+        scanTable1(pig, DataFormat.HBaseBinary);
 		pig.store("a", TESTTABLE_2,
 				"org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
 						+ TESTCOLUMN_A + " " + TESTCOLUMN_B + " "
 						+ TESTCOLUMN_C + "','-caster HBaseBinaryConverter')");
 
-		HTable table = new HTable(conf, TESTTABLE_2);
+        HTable table = new HTable(TESTTABLE_2);
 		ResultScanner scanner = table.getScanner(new Scan());
 		Iterator<Result> iter = scanner.iterator();
 		int i = 0;
@@ -408,6 +444,42 @@ public class TestHBaseStorage {
 
 	/**
 	 * load from hbase 'TESTTABLE_1' using HBaseBinary format, and store it into
+     * 'TESTTABLE_2' using HBaseBinaryFormat projecting out column c
+     *
+     * @throws IOException
+     */
+    @Test
+    public void testStoreToHBase_1_with_projection() throws IOException {
+        System.getProperties().setProperty("pig.usenewlogicalplan", "false");
+        prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
+        prepareTable(TESTTABLE_2, false, DataFormat.HBaseBinary);
+        scanTable1(pig, DataFormat.HBaseBinary);
+        pig.registerQuery("b = FOREACH a GENERATE rowKey, col_a, col_b;");
+        pig.store("b",  TESTTABLE_2,
+                "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+                + TESTCOLUMN_A + " " + TESTCOLUMN_B +
+        "','-caster HBaseBinaryConverter')");
+
+        HTable table = new HTable(TESTTABLE_2);
+        ResultScanner scanner = table.getScanner(new Scan());
+        Iterator<Result> iter = scanner.iterator();
+        int i = 0;
+        for (i = 0; iter.hasNext(); ++i) {
+            Result result = iter.next();
+            String v = String.valueOf(i);
+            String rowKey = Bytes.toString(result.getRow());
+            int col_a = Bytes.toInt(getColValue(result, TESTCOLUMN_A));
+            double col_b = Bytes.toDouble(getColValue(result, TESTCOLUMN_B));
+
+            Assert.assertEquals("00".substring(v.length()) + v, rowKey);
+            Assert.assertEquals(i, col_a);
+            Assert.assertEquals(i + 0.0, col_b, 1e-6);
+        }
+        Assert.assertEquals(100, i);
+    }
+
+    /**
+     * load from hbase 'TESTTABLE_1' using HBaseBinary format, and store it into
 	 * 'TESTTABLE_2' using UTF-8 Plain Text format
 	 * 
 	 * @throws IOException
@@ -416,23 +488,13 @@ public class TestHBaseStorage {
 	public void testStoreToHBase_2() throws IOException {
 		prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
 		prepareTable(TESTTABLE_2, false, DataFormat.HBaseBinary);
-
-		pig.registerQuery("a = load 'hbase://"
-				+ TESTTABLE_1
-				+ "' using "
-				+ "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
-				+ TESTCOLUMN_A
-				+ " "
-				+ TESTCOLUMN_B
-				+ " "
-				+ TESTCOLUMN_C
-				+ "','-loadKey -caster HBaseBinaryConverter') as (rowKey:chararray,col_a:int, col_b:double, col_c:chararray);");
+        scanTable1(pig, DataFormat.HBaseBinary);
 		pig.store("a", TESTTABLE_2,
 				"org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
 						+ TESTCOLUMN_A + " " + TESTCOLUMN_B + " "
 						+ TESTCOLUMN_C + "')");
 
-		HTable table = new HTable(conf, TESTTABLE_2);
+        HTable table = new HTable(TESTTABLE_2);
 		ResultScanner scanner = table.getScanner(new Scan());
 		Iterator<Result> iter = scanner.iterator();
 		int i = 0;
@@ -451,21 +513,114 @@ public class TestHBaseStorage {
 		}
 		Assert.assertEquals(100, i);
 	}
+    /**
+     * load from hbase 'TESTTABLE_1' using HBaseBinary format, and store it into
+     * 'TESTTABLE_2' using UTF-8 Plain Text format projecting column c
+     *
+     * @throws IOException
+     */
+    @Test
+    public void testStoreToHBase_2_with_projection() throws IOException {
+        prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
+        prepareTable(TESTTABLE_2, false, DataFormat.UTF8PlainText);
+        scanTable1(pig, DataFormat.HBaseBinary);
+        pig.registerQuery("b = FOREACH a GENERATE rowKey, col_a, col_b;");
+        pig.store("b", TESTTABLE_2,
+                "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+                + TESTCOLUMN_A + " " + TESTCOLUMN_B + "')");
+
+        HTable table = new HTable(TESTTABLE_2);
+        ResultScanner scanner = table.getScanner(new Scan());
+        Iterator<Result> iter = scanner.iterator();
+        int i = 0;
+        for (i = 0; iter.hasNext(); ++i) {
+            Result result = iter.next();
+            String v = i + "";
+            String rowKey = new String(result.getRow());
+            int col_a = Integer.parseInt(new String(getColValue(result, TESTCOLUMN_A)));
+            double col_b = Double.parseDouble(new String(getColValue(result, TESTCOLUMN_B)));
+
+            Assert.assertEquals("00".substring(v.length()) + v, rowKey);
+            Assert.assertEquals(i, col_a);
+            Assert.assertEquals(i + 0.0, col_b, 1e-6);
+        }
+        Assert.assertEquals(100, i);
+    }
 
 	/**
+     * load from hbase 'TESTTABLE_1' using UTF-8 Plain Text format, and store it
+     * into 'TESTTABLE_2' using UTF-8 Plain Text format projecting column c
+     *
+     * @throws IOException
+     */
+    @Test
+    public void testStoreToHBase_3_with_projection_no_caster() throws IOException {
+        prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
+        prepareTable(TESTTABLE_2, false, DataFormat.UTF8PlainText);
+        scanTable1(pig, DataFormat.UTF8PlainText);
+        pig.registerQuery("b = FOREACH a GENERATE rowKey, col_a, col_b;");
+        pig.store("b", TESTTABLE_2,
+                "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+                + TESTCOLUMN_A + " " + TESTCOLUMN_B + "')");
+
+        HTable table = new HTable(TESTTABLE_2);
+        ResultScanner scanner = table.getScanner(new Scan());
+        Iterator<Result> iter = scanner.iterator();
+        int i = 0;
+        for (i = 0; iter.hasNext(); ++i) {
+            Result result = iter.next();
+            String v = i + "";
+            String rowKey = new String(result.getRow());
+
+            String col_a = new String(getColValue(result, TESTCOLUMN_A));
+            String col_b = new String(getColValue(result, TESTCOLUMN_B));
+
+            Assert.assertEquals("00".substring(v.length()) + v, rowKey);
+            Assert.assertEquals(i + "", col_a);
+            Assert.assertEquals(i + 0.0 + "", col_b);
+        }
+        Assert.assertEquals(100, i);
+    }
+
+
+    private void scanTable1(PigServer pig, DataFormat dataFormat) throws IOException {
+        scanTable1(pig, dataFormat, "");
+    }
+
+    private void scanTable1(PigServer pig, DataFormat dataFormat, String extraParams) throws IOException {
+        pig.registerQuery("a = load 'hbase://"
+                + TESTTABLE_1
+                + "' using "
+                + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+                + TESTCOLUMN_A
+                + " "
+                + TESTCOLUMN_B
+                + " "
+                + TESTCOLUMN_C
+                + "','-loadKey "
+                + (dataFormat == DataFormat.HBaseBinary ? " -caster HBaseBinaryConverter" : "")
+                + " " + extraParams + " "
+                + "') as (rowKey:chararray,col_a:int, col_b:double, col_c:chararray);");
+    }
+    /**
 	 * Prepare a table in hbase for testing.
 	 * 
 	 */
 	private void prepareTable(String tableName, boolean initData,
 			DataFormat format) throws IOException {
 		// define the table schema
+        HTable table = null;
         try {
-            util.deleteTable(Bytes.toBytesBinary(tableName));
-        } catch (IOException e) {
+            deleteAllRows(tableName);
+        } catch (Exception e) {
             // It's ok, table might not exist.
         }
-        HTable table = util.createTable(Bytes.toBytesBinary(tableName),
+        try {
+            table = util.createTable(Bytes.toBytesBinary(tableName),
                 COLUMNFAMILY);
+        } catch (Exception e) {
+            table = new HTable(Bytes.toBytesBinary(tableName));
+        }
 		if (initData) {
 			for (int i = 0; i < TEST_ROW_COUNT; i++) {
 				String v = i + "";
@@ -511,6 +666,7 @@ public class TestHBaseStorage {
 	 */
     private static byte[] getColValue(Result result, String colName) {
         byte[][] colArray = Bytes.toByteArrays(colName.split(":"));
+        byte[] val = result.getValue(colArray[0], colArray[1]);
         return result.getValue(colArray[0], colArray[1]);
 
 }