You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by dv...@apache.org on 2010/08/30 22:47:32 UTC

svn commit: r990936 - in /hadoop/pig/trunk: ./ ivy/ lib/ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/hbase/ src/org/apache/pig/builtin/ src/org/apache/pig/data/ src/org/apache/pig/impl/util/ test/org/apache/pig/test/

Author: dvryaboy
Date: Mon Aug 30 20:47:32 2010
New Revision: 990936

URL: http://svn.apache.org/viewvc?rev=990936&view=rev
Log:
PIG-1205:  Enhance HBaseStorage-- Make it support loading row key and implement StoreFunc

Added:
    hadoop/pig/trunk/lib/hbase-0.20.6-test.jar   (with props)
    hadoop/pig/trunk/lib/hbase-0.20.6.jar   (with props)
    hadoop/pig/trunk/src/org/apache/pig/LoadStoreCaster.java
    hadoop/pig/trunk/src/org/apache/pig/StoreCaster.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseBinaryConverter.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseTableInputFormat.java
Removed:
    hadoop/pig/trunk/lib/hbase-0.20.0-test.jar
    hadoop/pig/trunk/lib/hbase-0.20.0.jar
Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/build.xml
    hadoop/pig/trunk/ivy/libraries.properties
    hadoop/pig/trunk/ivy/pig.pom
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/Utf8StorageConverter.java
    hadoop/pig/trunk/src/org/apache/pig/data/DataType.java
    hadoop/pig/trunk/src/org/apache/pig/impl/util/Utils.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=990936&r1=990935&r2=990936&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Mon Aug 30 20:47:32 2010
@@ -26,6 +26,8 @@ PIG-1249: Safe-guards against misconfigu
 
 IMPROVEMENTS
 
+PIG-1205:  Enhance HBaseStorage-- Make it support loading row key and implement StoreFunc (zjffdu and dvryaboy)
+
 PIG-1568: Optimization rule FilterAboveForeach is too restrictive and doesn't
 handle project * correctly (xuefuz via daijy)
 

Modified: hadoop/pig/trunk/build.xml
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/build.xml?rev=990936&r1=990935&r2=990936&view=diff
==============================================================================
--- hadoop/pig/trunk/build.xml (original)
+++ hadoop/pig/trunk/build.xml Mon Aug 30 20:47:32 2010
@@ -52,8 +52,8 @@
     <property name="build.encoding" value="UTF8" />
     <!-- TODO with only one version of hadoop in the lib folder we do not need that anymore -->
     <property name="automaton.jarfile" value="automaton.jar" />
-    <property name="hbase.jarfile" value="hbase-0.20.0.jar" />
-    <property name="hbase.test.jarfile" value="hbase-0.20.0-test.jar" />
+    <property name="hbase.jarfile" value="hbase-0.20.6.jar" />
+    <property name="hbase.test.jarfile" value="hbase-0.20.6-test.jar" />
 	<property name="zookeeper.jarfile" value="zookeeper-hbase-1329.jar" />
 	
     <!-- javac properties -->

Modified: hadoop/pig/trunk/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/ivy/libraries.properties?rev=990936&r1=990935&r2=990936&view=diff
==============================================================================
--- hadoop/pig/trunk/ivy/libraries.properties (original)
+++ hadoop/pig/trunk/ivy/libraries.properties Mon Aug 30 20:47:32 2010
@@ -26,27 +26,28 @@ commons-lang.version=2.4
 checkstyle.version=4.2
 
 ivy.version=2.2.0-rc1
+guava.version=r06
 hadoop-core.version=0.20.2
 hadoop-test.version=0.20.2
+hbase.version=0.20.6
 hsqldb.version=1.8.0.10
 
+jackson.version=1.0.1
 javacc.version=4.2
+jdiff.version=1.0.9
 jetty-util.version=6.1.14
 jline.version=0.9.94
+joda-time.version=1.6
 jsch.version=0.1.38
 junit.version=4.5
-jdiff.version=1.0.9
+jython.version=2.5.0
 
 log4j.version=1.2.14
 
+rats-lib.version=0.5.1
+
 slf4j-api.version=1.4.3
 slf4j-log4j12.version=1.4.3
 
-rats-lib.version=0.5.1
-
 xerces.version=1.4.4
 
-jackson.version=1.0.1
-joda-time.version=1.6
-jython.version=2.5.0
-guava.version=r06

Modified: hadoop/pig/trunk/ivy/pig.pom
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/ivy/pig.pom?rev=990936&r1=990935&r2=990936&view=diff
==============================================================================
--- hadoop/pig/trunk/ivy/pig.pom (original)
+++ hadoop/pig/trunk/ivy/pig.pom Mon Aug 30 20:47:32 2010
@@ -75,10 +75,10 @@
     <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-test</artifactId>
-       <version>${hbase-test.version}</version>	
+       <version>${hbase.version}</version>    
     </dependency>
    
-    <dependency>	
+    <dependency>    
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>${junit.version}</version>
@@ -96,16 +96,15 @@
         <version>${joda-time.version}</version>
     </dependency>
 
-	<dependency>
-	    <groupId>org.python</groupId>
-	    <artifactId>jython</artifactId>
-	    <version>${jython.version}</version>
-	</dependency>
+    <dependency>
+        <groupId>org.python</groupId>
+        <artifactId>jython</artifactId>
+        <version>${jython.version}</version>
+    </dependency>
     <dependency>
         <groupId>com.google.guava</groupId>
         <artifactId>guava</artifactId>
         <version>${guava.version}</version>
     </dependency>
-
   </dependencies> 
 </project>

Added: hadoop/pig/trunk/lib/hbase-0.20.6-test.jar
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/lib/hbase-0.20.6-test.jar?rev=990936&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/pig/trunk/lib/hbase-0.20.6-test.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: hadoop/pig/trunk/lib/hbase-0.20.6.jar
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/lib/hbase-0.20.6.jar?rev=990936&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/pig/trunk/lib/hbase-0.20.6.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: hadoop/pig/trunk/src/org/apache/pig/LoadStoreCaster.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/LoadStoreCaster.java?rev=990936&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/LoadStoreCaster.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/LoadStoreCaster.java Mon Aug 30 20:47:32 2010
@@ -0,0 +1,32 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig;
+
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+
+/**
+ * This is just a union interface of LoadCaster and StoreCaster, 
+ * made available for simplicity.
+ * @since Pig 0.8
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface LoadStoreCaster extends LoadCaster, StoreCaster {
+
+}

Added: hadoop/pig/trunk/src/org/apache/pig/StoreCaster.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/StoreCaster.java?rev=990936&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/StoreCaster.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/StoreCaster.java Mon Aug 30 20:47:32 2010
@@ -0,0 +1,37 @@
+package org.apache.pig;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+
+/**
+ * An interface that provides methods for converting Pig internal types to byte[].
+ * It is intended to be used by StoreFunc implementations.
+ * @since Pig 0.8
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving // Because we still don't have the map casts quite right
+public interface StoreCaster extends LoadCaster {
+    public byte[] toBytes(DataBag bag) throws IOException;
+
+    public byte[] toBytes(String s) throws IOException;
+
+    public byte[] toBytes(Double d) throws IOException;
+
+    public byte[] toBytes(Float f) throws IOException;
+
+    public byte[] toBytes(Integer i) throws IOException;
+
+    public byte[] toBytes(Long l) throws IOException;
+
+    public byte[] toBytes(Map<String, Object> m) throws IOException;
+
+    public byte[] toBytes(Tuple t) throws IOException;
+    
+    public byte[] toBytes(DataByteArray a) throws IOException;
+}

Added: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseBinaryConverter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseBinaryConverter.java?rev=990936&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseBinaryConverter.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseBinaryConverter.java Mon Aug 30 20:47:32 2010
@@ -0,0 +1,151 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.hbase;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.pig.LoadStoreCaster;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+
+public class HBaseBinaryConverter implements LoadStoreCaster {
+
+    @Override
+    public String bytesToCharArray(byte[] b) throws IOException {
+        return Bytes.toString(b);    
+    }
+
+    @Override
+    public Double bytesToDouble(byte[] b) throws IOException {
+        if (Bytes.SIZEOF_DOUBLE > b.length){ 
+            return Bytes.toDouble(Bytes.padHead(b, Bytes.SIZEOF_DOUBLE - b.length));
+        } else {
+            return Bytes.toDouble(Bytes.head(b, Bytes.SIZEOF_DOUBLE));
+        }
+    }
+
+    @Override
+    public Float bytesToFloat(byte[] b) throws IOException {
+        if (Bytes.SIZEOF_FLOAT > b.length){ 
+            return Bytes.toFloat(Bytes.padHead(b, Bytes.SIZEOF_FLOAT - b.length));
+        } else {
+            return Bytes.toFloat(Bytes.head(b, Bytes.SIZEOF_FLOAT));
+        }
+    }
+
+    @Override
+    public Integer bytesToInteger(byte[] b) throws IOException {
+        if (Bytes.SIZEOF_INT > b.length){ 
+            return Bytes.toInt(Bytes.padHead(b, Bytes.SIZEOF_INT - b.length));
+        } else {
+            return Bytes.toInt(Bytes.head(b, Bytes.SIZEOF_INT));
+        }
+    }
+
+    @Override
+    public Long bytesToLong(byte[] b) throws IOException {
+        if (Bytes.SIZEOF_LONG > b.length){ 
+            return Bytes.toLong(Bytes.padHead(b, Bytes.SIZEOF_LONG - b.length));
+        } else {
+            return Bytes.toLong(Bytes.head(b, Bytes.SIZEOF_LONG));
+        }
+    }
+
+    /**
+     * NOT IMPLEMENTED
+     */
+    @Override
+    public Map<String, Object> bytesToMap(byte[] b) throws IOException {
+        throw new ExecException("Can't generate a Map from byte[]");
+    }
+
+    /**
+     * NOT IMPLEMENTED
+     */
+    @Override
+    public Tuple bytesToTuple(byte[] b, ResourceFieldSchema fieldSchema) throws IOException {
+        throw new ExecException("Can't generate a Tuple from byte[]");
+    }
+
+    /**
+     * NOT IMPLEMENTED
+     */
+    @Override
+    public DataBag bytesToBag(byte[] b, ResourceFieldSchema fieldSchema) throws IOException {
+        throw new ExecException("Can't generate DataBags from byte[]");
+    }
+
+    /**
+     * NOT IMPLEMENTED
+     */
+    @Override
+    public byte[] toBytes(DataBag bag) throws IOException {
+        throw new ExecException("Cant' generate bytes from DataBag");
+    }
+
+    @Override
+    public byte[] toBytes(String s) throws IOException {
+        return Bytes.toBytes(s);
+    }
+
+    @Override
+    public byte[] toBytes(Double d) throws IOException {
+        return Bytes.toBytes(d);
+    }
+
+    @Override
+    public byte[] toBytes(Float f) throws IOException {
+        return Bytes.toBytes(f);
+    }
+
+    @Override
+    public byte[] toBytes(Integer i) throws IOException {
+        return Bytes.toBytes(i);
+    }
+
+    @Override
+    public byte[] toBytes(Long l) throws IOException {
+        return Bytes.toBytes(l);
+    }
+
+    /**
+     * NOT IMPLEMENTED
+     */
+    @Override
+    public byte[] toBytes(Map<String, Object> m) throws IOException {
+        throw new IOException("Can't generate bytes from Map");
+    }
+
+    /**
+     * NOT IMPLEMENTED
+     */
+    @Override
+    public byte[] toBytes(Tuple t) throws IOException {
+       throw new IOException("Can't generate bytes from Tuple");
+    }
+
+    @Override
+    public byte[] toBytes(DataByteArray a) throws IOException {
+        return a.get();
+    }
+}

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=990936&r1=990935&r2=990936&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Mon Aug 30 20:47:32 2010
@@ -19,116 +19,438 @@ package org.apache.pig.backend.hadoop.hb
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.RowFilter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
 import org.apache.hadoop.hbase.util.Base64;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.pig.LoadCaster;
 import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadPushDown;
+import org.apache.pig.LoadStoreCaster;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.backend.hadoop.hbase.HBaseTableInputFormat.HBaseTableIFBuilder;
+import org.apache.pig.builtin.Utf8StorageConverter;
+import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.Utils;
+
+import com.google.common.collect.Lists;
 
 /**
- * A Hbase Loader
+ * A HBase implementation of LoadFunc and StoreFunc
+ * 
+ * TODO(dmitriy) test that all this stuff works
+ * TODO(dmitriy) documentation
  */
-public class HBaseStorage extends LoadFunc {
+public class HBaseStorage extends LoadFunc implements StoreFuncInterface, LoadPushDown {
+    
+    private static final Log LOG = LogFactory.getLog(HBaseStorage.class);
+
+    private final static String STRING_CASTER = "UTF8StorageConverter";
+    private final static String BYTE_CASTER = "HBaseBinaryConverter";
+    private final static String CASTER_PROPERTY = "pig.hbase.caster";
+    
+    private List<byte[]> columnList_ = Lists.newArrayList();
+    private HTable m_table;
+    private HBaseConfiguration m_conf;
+    private RecordReader reader;
+    private RecordWriter writer;
+    private Scan scan;
+
+    private final CommandLine configuredOptions_;
+    private final static Options validOptions_ = new Options();
+    private final static CommandLineParser parser_ = new GnuParser();
+    private boolean loadRowKey_;
+    private final long limit_;
+    private final int caching_;
+
+    protected transient byte[] gt_;
+    protected transient byte[] gte_;
+    protected transient byte[] lt_;
+    protected transient byte[] lte_;
+
+    private LoadCaster caster_;
+
+    private ResourceSchema schema_;
+
+    private static void populateValidOptions() { 
+        validOptions_.addOption("loadKey", false, "Load Key");
+        validOptions_.addOption("gt", true, "Records must be greater than this value " +
+                "(binary, double-slash-escaped)");
+        validOptions_.addOption("lt", true, "Records must be less than this value (binary, double-slash-escaped)");   
+        validOptions_.addOption("gte", true, "Records must be greater than or equal to this value");
+        validOptions_.addOption("lte", true, "Records must be less than or equal to this value");
+        validOptions_.addOption("caching", true, "Number of rows scanners should cache");
+        validOptions_.addOption("limit", true, "Per-region limit");
+        validOptions_.addOption("caster", true, "Caster to use for converting values. A class name, " +
+                "HBaseBinaryConverter, or Utf8StorageConverter. For storage, casters must implement LoadStoreCaster.");
+    }
+
+    /**
+     * Constructor. Construct a HBase Table LoadFunc and StoreFunc to load or store the cells of the
+     * provided columns.
+     * 
+     * @param columnList
+     *            columnlist that is a presented string delimited by space.
+     * @throws ParseException when unale to parse arguments
+     * @throws IOException 
+     */
+    public HBaseStorage(String columnList) throws ParseException, IOException {
+        this(columnList,"");
+    }
+
+    /**
+     * Constructor. Construct a HBase Table LoadFunc and StoreFunc to load or store. 
+     * @param columnList
+     * @param optString Loader options. Known options:<ul>
+     * <li>-loadKey=(true|false)  Load the row key as the first column
+     * <li>-gt=minKeyVal
+     * <li>-lt=maxKeyVal 
+     * <li>-gte=minKeyVal
+     * <li>-lte=maxKeyVal
+     * <li>-caching=numRows  number of rows to cache (faster scans, more memory).
+     * </ul>
+     * @throws ParseException 
+     * @throws IOException 
+     */
+    public HBaseStorage(String columnList, String optString) throws ParseException, IOException {
+        populateValidOptions();
+        String[] colNames = columnList.split(" ");
+        String[] optsArr = optString.split(" ");
+        try {
+            configuredOptions_ = parser_.parse(validOptions_, optsArr);
+        } catch (ParseException e) {
+            HelpFormatter formatter = new HelpFormatter();
+            formatter.printHelp( "[-loadKey] [-gt] [-gte] [-lt] [-lte] [-caching] [-caster]", validOptions_ );
+            throw e;
+        }
+
+        loadRowKey_ = configuredOptions_.hasOption("loadKey");  
+        for (String colName : colNames) {
+            columnList_.add(Bytes.toBytes(colName));
+        }
+
+        m_conf = new HBaseConfiguration();
+        String defaultCaster = m_conf.get(CASTER_PROPERTY, STRING_CASTER);
+        String casterOption = configuredOptions_.getOptionValue("caster", defaultCaster);
+        if (STRING_CASTER.equalsIgnoreCase(casterOption)) {
+            caster_ = new Utf8StorageConverter();
+        } else if (BYTE_CASTER.equalsIgnoreCase(casterOption)) {
+            caster_ = new HBaseBinaryConverter();
+        } else {
+            try {
+                @SuppressWarnings("unchecked")
+                Class<LoadCaster> casterClass = (Class<LoadCaster>) Class.forName(casterOption);
+                caster_ = casterClass.newInstance();
+            } catch (ClassCastException e) {
+                LOG.error("Congifured caster does not implement LoadCaster interface.");
+                throw new IOException(e);
+            } catch (ClassNotFoundException e) {
+                LOG.error("Configured caster class not found.", e);
+                throw new IOException(e);
+            } catch (InstantiationException e) {
+                LOG.error("Unable to instantiate configured caster " + casterOption, e);
+                throw new IOException(e);
+            } catch (IllegalAccessException e) {
+                LOG.error("Illegal Access Exception for configured caster " + casterOption, e); 
+                throw new IOException(e);
+            }
+        }
+
+        caching_ = Integer.valueOf(configuredOptions_.getOptionValue("caching", "100"));
+        limit_ = Long.valueOf(configuredOptions_.getOptionValue("limit", "-1"));
+        initScan();	    
+    }
+
+    private void initScan() {
+        scan = new Scan();
+        // Set filters, if any.
+        if (configuredOptions_.hasOption("gt")) {
+            gt_ = Bytes.toBytesBinary(Utils.slashisize(configuredOptions_.getOptionValue("gt")));
+            addFilter(CompareOp.GREATER, gt_);
+        }
+        if (configuredOptions_.hasOption("lt")) {
+            lt_ = Bytes.toBytesBinary(Utils.slashisize(configuredOptions_.getOptionValue("lt")));
+            addFilter(CompareOp.LESS, lt_);
+        }
+        if (configuredOptions_.hasOption("gte")) {
+            gte_ = Bytes.toBytesBinary(Utils.slashisize(configuredOptions_.getOptionValue("gte")));
+            addFilter(CompareOp.GREATER_OR_EQUAL, gte_);
+        }
+        if (configuredOptions_.hasOption("lte")) {
+            lte_ = Bytes.toBytesBinary(Utils.slashisize(configuredOptions_.getOptionValue("lte")));
+            addFilter(CompareOp.LESS_OR_EQUAL, lte_);
+        }
+    }
+
+    private void addFilter(CompareOp op, byte[] val) {
+        LOG.info("Adding filter " + op.toString() + " with value " + Bytes.toStringBinary(val));
+        FilterList scanFilter = (FilterList) scan.getFilter();
+        if (scanFilter == null) {
+            scanFilter = new FilterList();
+        }
+        scanFilter.addFilter(new RowFilter(op, new BinaryComparator(val)));
+        scan.setFilter(scanFilter);
+    }
 
-	private byte[][] m_cols;
-	private HTable m_table;
-	private Configuration m_conf=new Configuration();
-	private RecordReader reader;
-	private Scan scan=new Scan();
-	
-	private static final Log LOG = LogFactory.getLog(HBaseStorage.class);
-
-	/**
-	 * Constructor. Construct a HBase Table loader to load the cells of the
-	 * provided columns.
-	 * 
-	 * @param columnList
-	 *            columnlist that is a presented string delimited by space.
-	 */
-	public HBaseStorage(String columnList) {
-		String[] colNames = columnList.split(" ");
-		m_cols = new byte[colNames.length][];
-		for (int i = 0; i < m_cols.length; i++) {
-			m_cols[i] = Bytes.toBytes(colNames[i]);
-			scan.addColumn(m_cols[i]);
-		}		
-	}
-
-
-	@Override
-	public Tuple getNext() throws IOException {
-		try {
-			if (reader.nextKeyValue()) {
-				ImmutableBytesWritable rowKey = (ImmutableBytesWritable) reader
-						.getCurrentKey();
-				Result result = (Result) reader.getCurrentValue();
-				Tuple tuple=TupleFactory.getInstance().newTuple(m_cols.length);
-				for (int i=0;i<m_cols.length;++i){
-					tuple.set(i, new DataByteArray(result.getValue(m_cols[i])));
-				}
-				return tuple;
-			}
-		} catch (InterruptedException e) {
-			throw new IOException(e);
-		}
-		return null;
-	}
-
-	@Override
-	public InputFormat getInputFormat() {
-		TableInputFormat inputFormat = new TableInputFormat();
-		inputFormat.setConf(m_conf);
-		return inputFormat;
-	}
-
-	@Override
-	public void prepareToRead(RecordReader reader, PigSplit split) {
-		this.reader = reader;
-	}
+    @Override
+    public Tuple getNext() throws IOException {
+        try {
+            if (reader.nextKeyValue()) {
+                ImmutableBytesWritable rowKey = (ImmutableBytesWritable) reader
+                .getCurrentKey();
+                Result result = (Result) reader.getCurrentValue();
+                int tupleSize=columnList_.size();
+                if (loadRowKey_){
+                    tupleSize++;
+                }
+                Tuple tuple=TupleFactory.getInstance().newTuple(tupleSize);
+
+                int startIndex=0;
+                if (loadRowKey_){
+                    tuple.set(0, new DataByteArray(rowKey.get()));
+                    startIndex++;
+                }
+                for (int i=0;i<columnList_.size();++i){
+                    tuple.set(i+startIndex, new DataByteArray(result.getValue(columnList_.get(i))));
+                }
+                return tuple;
+            }
+        } catch (InterruptedException e) {
+            throw new IOException(e);
+        }
+        return null;
+    }
+
+    @Override
+    public InputFormat getInputFormat() {      
+        TableInputFormat inputFormat = new HBaseTableIFBuilder()
+        .withLimit(limit_)
+        .withGt(gt_)
+        .withGte(gte_)
+        .withLt(lt_)
+        .withLte(lte_)
+        .withConf(m_conf)
+        .build();
+        return inputFormat;
+    }
 
-	@Override
+    @Override
+    public void prepareToRead(RecordReader reader, PigSplit split) {
+        this.reader = reader;
+    }
+
+    @Override
     public void setLocation(String location, Job job) throws IOException {
+        String tablename = location;
         if (location.startsWith("hbase://")){
-        	m_conf.set(TableInputFormat.INPUT_TABLE, location.substring(8));
-        }else{
-        	m_conf.set(TableInputFormat.INPUT_TABLE, location);
+           tablename = location.substring(8);
+        }
+        if (m_table == null) {
+            m_table = new HTable(m_conf, tablename);
         }
+        m_table.setScannerCaching(caching_);
+        m_conf.set(TableInputFormat.INPUT_TABLE, tablename);
+        scan.addColumns(columnList_.toArray(new byte[0][]));
         m_conf.set(TableInputFormat.SCAN, convertScanToString(scan));
     }
 
-	@Override
-	public String relativeToAbsolutePath(String location, Path curDir)
-			throws IOException {
-		return location;
-	}
-	
-	private static String convertScanToString(Scan scan) {
-
-		try {
-			ByteArrayOutputStream out = new ByteArrayOutputStream();
-			DataOutputStream dos = new DataOutputStream(out);
-			scan.write(dos);
-			return Base64.encodeBytes(out.toByteArray());
-		} catch (IOException e) {
-			LOG.error(e);
-			return "";
-		}
+    @Override
+    public String relativeToAbsolutePath(String location, Path curDir)
+    throws IOException {
+        return location;
+    }
+
+    private static String convertScanToString(Scan scan) {
+        try {
+            ByteArrayOutputStream out = new ByteArrayOutputStream();
+            DataOutputStream dos = new DataOutputStream(out);
+            scan.write(dos);
+            return Base64.encodeBytes(out.toByteArray());
+        } catch (IOException e) {
+            LOG.error(e);
+            return "";
+        }
+
+    }
+
+    /**
+     * Set up the caster to use for reading values out of, and writing to, HBase. 
+     */
+    @Override
+    public LoadCaster getLoadCaster() throws IOException {
+        return caster_;
+    }
+    
+    /*
+     * StoreFunc Methods
+     * @see org.apache.pig.StoreFuncInterface#getOutputFormat()
+     */
+    
+    @Override
+    public OutputFormat getOutputFormat() throws IOException {
+        TableOutputFormat outputFormat = new TableOutputFormat();
+        return outputFormat;
+    }
+
+    @Override
+    public void checkSchema(ResourceSchema s) throws IOException {
+        if (! (caster_ instanceof LoadStoreCaster)) {
+            LOG.error("Caster must implement LoadStoreCaster for writing to HBase.");
+            throw new IOException("Bad Caster " + caster_.getClass());
+        }
+        schema_ = s;
+    }
+
+    // Suppressing unchecked warnings for RecordWriter, which is not parameterized by StoreFuncInterface
+    @Override
+    public void prepareToWrite(@SuppressWarnings("rawtypes") RecordWriter writer) throws IOException {
+        this.writer = writer;
+    }
+
+    // Suppressing unchecked warnings for RecordWriter, which is not parameterized by StoreFuncInterface
+    @SuppressWarnings("unchecked")
+    @Override
+    public void putNext(Tuple t) throws IOException {
+        ResourceFieldSchema[] fieldSchemas = (schema_ == null) ? null : schema_.getFields();
+        Put put=new Put(objToBytes(t.get(0), 
+                (fieldSchemas == null) ? DataType.findType(t.get(0)) : fieldSchemas[0].getType()));
+        long ts=System.currentTimeMillis();
+        
+        for (byte[] col : columnList_) {
+            LOG.info("putNext -- col: " + Bytes.toStringBinary(col));
+        }
+                
+        for (int i=1;i<t.size();++i){
+            put.add(columnList_.get(i-1), ts, objToBytes(t.get(i),
+                    (fieldSchemas == null) ? DataType.findType(t.get(i)) : fieldSchemas[i].getType()));
+        }
+        try {
+            writer.write(null, put);
+        } catch (InterruptedException e) {
+            throw new IOException(e);
+        }
+    }
+    
+    @SuppressWarnings("unchecked")
+    private byte[] objToBytes(Object o, byte type) throws IOException {
+        LoadStoreCaster caster = (LoadStoreCaster) caster_;
+        switch (type) {
+        case DataType.BYTEARRAY: return ((DataByteArray) o).get();
+        case DataType.BAG: return caster.toBytes((DataBag) o);
+        case DataType.CHARARRAY: return caster.toBytes((String) o);
+        case DataType.DOUBLE: return caster.toBytes((Double) o);
+        case DataType.FLOAT: return caster.toBytes((Float) o);
+        case DataType.INTEGER: return caster.toBytes((Integer) o);
+        case DataType.LONG: return caster.toBytes((Long) o);
+        
+        // The type conversion here is unchecked. 
+        // Relying on DataType.findType to do the right thing.
+        case DataType.MAP: return caster.toBytes((Map<String, Object>) o);
+        
+        case DataType.NULL: return null;
+        case DataType.TUPLE: return caster.toBytes((Tuple) o);
+        case DataType.ERROR: throw new IOException("Unable to determine type of " + o.getClass());
+        default: throw new IOException("Unable to find a converter for tuple field " + o);
+        }
+    }
+
+    @Override
+    public String relToAbsPathForStoreLocation(String location, Path curDir)
+    throws IOException {
+        return location;
+    }
+
+    @Override
+    public void setStoreFuncUDFContextSignature(String signature) { }
+
+    @Override
+    public void setStoreLocation(String location, Job job) throws IOException {
+        if (location.startsWith("hbase://")){
+            job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, location.substring(8));
+        }else{
+            job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, location);
+        }
+    }
+
+    @Override
+    public void cleanupOnFailure(String location, Job job) throws IOException {
+
+    }
+
+    /*
+     * LoadPushDown Methods.
+     */
+    
+    @Override
+    public List<OperatorSet> getFeatures() {
+        return Arrays.asList(LoadPushDown.OperatorSet.PROJECTION);
+    }
+
+    @Override
+    public RequiredFieldResponse pushProjection(
+            RequiredFieldList requiredFieldList) throws FrontendException {
+        List<RequiredField>  requiredFields = requiredFieldList.getFields();
+        List<byte[]> newColumns = Lists.newArrayListWithExpectedSize(requiredFields.size());
+        
+        // HBase Row Key is the first column in the schema when it's loaded, 
+        // and is not included in the columnList (since it's not a proper column).
+        int offset = loadRowKey_ ? 1 : 0;
+        
+        if (loadRowKey_) {
+            if (requiredFields.size() < 1 || requiredFields.get(0).getIndex() != 0) {
+                loadRowKey_ = false;
+            } else {
+                // We just processed the fact that the row key needs to be loaded.
+                requiredFields.remove(0);
+            }
+        }
+        
+        for (RequiredField field : requiredFields) {
+            int fieldIndex = field.getIndex();
+            newColumns.add(columnList_.get(fieldIndex - offset));
+        }
+        LOG.info("pushProjection After Projection: loadRowKey is " + loadRowKey_) ;
+        for (byte[] col : newColumns) {
+            LOG.info("pushProjection -- col: " + Bytes.toStringBinary(col));
+        }
+        columnList_ = newColumns;
+        return new RequiredFieldResponse(true);
+    }
 
-	}
 }

Added: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseTableInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseTableInputFormat.java?rev=990936&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseTableInputFormat.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseTableInputFormat.java Mon Aug 30 20:47:32 2010
@@ -0,0 +1,197 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.hbase;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.List;
+import java.util.ListIterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.mapreduce.TableSplit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+public class HBaseTableInputFormat extends TableInputFormat {
+    private static final Log LOG = LogFactory.getLog(HBaseTableInputFormat.class);
+
+    protected final byte[] gt_;
+    protected final byte[] gte_;
+    protected final byte[] lt_;
+    protected final byte[] lte_;
+
+    public HBaseTableInputFormat() {
+        this(-1, null, null, null, null);
+    }
+
+    protected HBaseTableInputFormat(long limit, byte[] gt, byte[] gte, byte[] lt, byte[] lte) {
+        super();
+        setTableRecordReader(new HBaseTableRecordReader(limit));
+        gt_ = gt;
+        gte_ = gte;
+        lt_ = lt;
+        lte_ = lte;
+    }
+
+    public static class HBaseTableIFBuilder {
+        protected byte[] gt_;
+        protected byte[] gte_;
+        protected byte[] lt_;
+        protected byte[] lte_;
+        protected long limit_;
+        protected Configuration conf_;
+
+        public HBaseTableIFBuilder withGt(byte[] gt) { gt_ = gt; return this; }
+        public HBaseTableIFBuilder withGte(byte[] gte) { gte_ = gte; return this; }
+        public HBaseTableIFBuilder withLt(byte[] lt) { lt_ = lt; return this; }
+        public HBaseTableIFBuilder withLte(byte[] lte) { lte_ = lte; return this; }
+        public HBaseTableIFBuilder withLimit(long limit) { limit_ = limit; return this; }
+        public HBaseTableIFBuilder withConf(Configuration conf) { conf_ = conf; return this; }
+
+        public HBaseTableInputFormat build() {
+            HBaseTableInputFormat inputFormat = new HBaseTableInputFormat(limit_, gt_, gte_, lt_, lte_);
+            if (conf_ != null) inputFormat.setConf(conf_);
+            return inputFormat;
+        }
+
+    }
+
+    @Override
+    public List<InputSplit> getSplits(org.apache.hadoop.mapreduce.JobContext context)
+    throws IOException {
+        List<InputSplit> splits = super.getSplits(context);
+        ListIterator<InputSplit> splitIter = splits.listIterator();
+        while (splitIter.hasNext()) {
+            TableSplit split = (TableSplit) splitIter.next();
+            byte[] startKey = split.getStartRow();
+            byte[] endKey = split.getEndRow();
+            // Skip if the region doesn't satisfy configured options.
+            if ((skipRegion(CompareOp.LESS, startKey, lt_)) ||
+                    (skipRegion(CompareOp.GREATER, endKey, gt_)) ||
+                    (skipRegion(CompareOp.GREATER, endKey, gte_)) ||
+                    (skipRegion(CompareOp.LESS_OR_EQUAL, startKey, lte_)) )  {
+                splitIter.remove();
+            }
+        }
+        return splits;
+    }
+
+    private boolean skipRegion(CompareOp op, byte[] key, byte[] option ) {
+
+        if (key.length == 0 || option == null) 
+            return false;
+
+        BinaryComparator comp = new BinaryComparator(option);
+        RowFilter rowFilter = new RowFilter(op, comp);
+        return rowFilter.filterRowKey(key, 0, key.length);
+    }
+
+    protected class HBaseTableRecordReader extends TableRecordReader {
+
+        private long recordsSeen = 0;
+        private final long limit_;
+        private byte[] startRow_;
+        private byte[] endRow_;
+        private transient byte[] currRow_;
+
+        private BigInteger bigStart_;
+        private BigInteger bigEnd_;
+        private BigDecimal bigRange_;
+        private transient float progressSoFar_ = 0;
+
+        public HBaseTableRecordReader(long limit) {
+            limit_ = limit;
+        }
+
+        @Override
+        public void setScan(Scan scan) {
+            super.setScan(scan);
+
+            startRow_ = scan.getStartRow();
+            endRow_ = scan.getStopRow();
+            byte[] startPadded;
+            byte[] endPadded;
+            if (startRow_.length < endRow_.length) {
+                startPadded = Bytes.padTail(startRow_, endRow_.length - startRow_.length);
+                endPadded = endRow_;
+            } else if (endRow_.length < startRow_.length) {
+                startPadded = startRow_;
+                endPadded = Bytes.padTail(endRow_, startRow_.length - endRow_.length);
+            } else {
+                startPadded = startRow_;
+                endPadded = endRow_;
+            }
+            currRow_ = startRow_;
+            byte [] prependHeader = {1, 0};
+            bigStart_ = new BigInteger(Bytes.add(prependHeader, startPadded));
+            bigEnd_ = new BigInteger(Bytes.add(prependHeader, endPadded));
+            bigRange_ = new BigDecimal(bigEnd_.subtract(bigStart_));
+            LOG.info("setScan with ranges: " + bigStart_ + " - " + bigEnd_ + " ( " + bigRange_ + ")");
+        }
+
+        @Override
+        public boolean nextKeyValue() throws IOException, InterruptedException {
+            if (limit_ > 0 && ++recordsSeen > limit_) {
+                return false;
+            }
+            boolean hasMore = super.nextKeyValue();
+            if (hasMore) {
+                currRow_ = getCurrentKey().get();
+            }
+            return hasMore;
+
+        }
+
+        @Override
+        public float getProgress() {
+            if (currRow_ == null || currRow_.length == 0 || endRow_.length == 0 || endRow_ == HConstants.LAST_ROW) {
+                return 0;
+            }
+            byte[] lastPadded = currRow_;
+            if (currRow_.length < endRow_.length) {
+                lastPadded = Bytes.padTail(currRow_, endRow_.length - currRow_.length);
+            }
+            if (currRow_.length < startRow_.length) {
+                lastPadded = Bytes.padTail(currRow_, startRow_.length - currRow_.length);
+            }
+            byte [] prependHeader = {1, 0};
+            BigInteger bigLastRow = new BigInteger(Bytes.add(prependHeader, lastPadded));
+            if (bigLastRow.compareTo(bigEnd_) > 0) {
+                return progressSoFar_;
+            }
+            BigDecimal processed = new BigDecimal(bigLastRow.subtract(bigStart_));
+            try {
+                BigDecimal progress = processed.setScale(3).divide(bigRange_, BigDecimal.ROUND_HALF_DOWN);
+                progressSoFar_ = progress.floatValue();
+                return progressSoFar_;
+            } catch (java.lang.ArithmeticException e) {
+                return 0;
+            }            
+        }
+
+    }
+}

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/Utf8StorageConverter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/Utf8StorageConverter.java?rev=990936&r1=990935&r2=990936&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/Utf8StorageConverter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/Utf8StorageConverter.java Mon Aug 30 20:47:32 2010
@@ -28,7 +28,7 @@ import java.util.Stack;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.pig.LoadCaster;
+import org.apache.pig.LoadStoreCaster;
 import org.apache.pig.PigWarning;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.data.BagFactory;
@@ -45,7 +45,7 @@ import org.apache.pig.impl.util.LogUtils
  * and pig data types.  It is intended to be extended by load and store
  * functions (such as {@link PigStorage}). 
  */
-public class Utf8StorageConverter implements LoadCaster {
+public class Utf8StorageConverter implements LoadStoreCaster {
 
     protected BagFactory mBagFactory = BagFactory.getInstance();
     protected TupleFactory mTupleFactory = TupleFactory.getInstance();
@@ -287,6 +287,7 @@ public class Utf8StorageConverter implem
         return field;
     }
 
+    @Override
     public DataBag bytesToBag(byte[] b, ResourceFieldSchema schema) throws IOException {
         if(b == null)
             return null;
@@ -305,12 +306,14 @@ public class Utf8StorageConverter implem
         return db;
     }
 
+    @Override
     public String bytesToCharArray(byte[] b) throws IOException {
         if(b == null)
             return null;
         return new String(b, "UTF-8");
     }
 
+    @Override
     public Double bytesToDouble(byte[] b) {
         if(b == null)
             return null;
@@ -324,7 +327,8 @@ public class Utf8StorageConverter implem
             return null;
         }
     }
-
+    
+    @Override
     public Float bytesToFloat(byte[] b) throws IOException {
         if(b == null)
             return null;
@@ -346,6 +350,12 @@ public class Utf8StorageConverter implem
         }
     }
     
+    /**
+     * Note: NOT part of the LoadCaster interface.
+     * @param b
+     * @return
+     * @throws IOException
+     */
     public Boolean bytesToBoolean(byte[] b) throws IOException {
         if(b == null)
             return null;
@@ -353,6 +363,7 @@ public class Utf8StorageConverter implem
         return Boolean.valueOf(s);
     }
 
+    @Override
     public Integer bytesToInteger(byte[] b) throws IOException {
         if(b == null)
             return null;
@@ -383,7 +394,8 @@ public class Utf8StorageConverter implem
             }
         }
     }
-
+    
+    @Override
     public Long bytesToLong(byte[] b) throws IOException {
         if (b == null)
             return null;
@@ -421,6 +433,7 @@ public class Utf8StorageConverter implem
         }
     }
 
+    @Override
     @SuppressWarnings("unchecked")
     public Map<String, Object> bytesToMap(byte[] b) throws IOException {
         if(b == null)
@@ -443,6 +456,7 @@ public class Utf8StorageConverter implem
         return map;
     }
 
+    @Override
     public Tuple bytesToTuple(byte[] b, ResourceFieldSchema fieldSchema) throws IOException {
         if(b == null)
             return null;
@@ -464,39 +478,49 @@ public class Utf8StorageConverter implem
         return t;
     }
 
-
+    @Override
     public byte[] toBytes(DataBag bag) throws IOException {
         return bag.toString().getBytes();
     }
 
+    @Override
     public byte[] toBytes(String s) throws IOException {
         return s.getBytes();
     }
 
+    @Override
     public byte[] toBytes(Double d) throws IOException {
         return d.toString().getBytes();
     }
 
+    @Override
     public byte[] toBytes(Float f) throws IOException {
         return f.toString().getBytes();
     }
 
+    @Override
     public byte[] toBytes(Integer i) throws IOException {
         return i.toString().getBytes();
     }
 
+    @Override
     public byte[] toBytes(Long l) throws IOException {
         return l.toString().getBytes();
     }
 
+    @Override
     public byte[] toBytes(Map<String, Object> m) throws IOException {
         return DataType.mapToString(m).getBytes();
     }
 
+    @Override
     public byte[] toBytes(Tuple t) throws IOException {
         return t.toString().getBytes();
     }
-    
 
+    @Override
+    public byte[] toBytes(DataByteArray a) throws IOException {
+        return a.get();
+    }
 
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/data/DataType.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DataType.java?rev=990936&r1=990935&r2=990936&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DataType.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DataType.java Mon Aug 30 20:47:32 2010
@@ -457,6 +457,45 @@ public class DataType {
             return 1;
         }
     }
+    
+    public static byte[] toBytes(Object o) throws ExecException {
+        return toBytes(o, findType(o));
+    }
+
+    @SuppressWarnings("unchecked")
+    public static byte[] toBytes(Object o, byte type) throws ExecException {
+        switch (type) {
+        case BOOLEAN:
+            return ((Boolean) o).booleanValue() ? new byte[] {1} : new byte[] {0};
+        case BYTE:
+            return new byte[] {((Byte) o)};
+
+        case INTEGER:
+        case DOUBLE:
+        case FLOAT:
+        case LONG:
+            return ((Number) o).toString().getBytes();
+
+        case CHARARRAY:
+            return ((String) o).getBytes();
+        case MAP:
+            return mapToString((Map<String, Object>) o).getBytes();
+        case TUPLE:
+            return ((Tuple) o).toString().getBytes();
+        case BYTEARRAY:
+            return ((DataByteArray) o).get();
+        case BAG:
+            return ((DataBag) o).toString().getBytes();
+        case NULL:
+            return null;
+        default:
+            int errCode = 1071;
+            String msg = "Cannot convert a " + findTypeName(o) +
+            " to a ByteArray";
+            throw new ExecException(msg, errCode, PigException.INPUT);
+
+        }
+    }
 
     /**
      * Force a data object to an Integer, if possible.  Any numeric type

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/util/Utils.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/util/Utils.java?rev=990936&r1=990935&r2=990936&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/util/Utils.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/util/Utils.java Mon Aug 30 20:47:32 2010
@@ -19,32 +19,35 @@ package org.apache.pig.impl.util;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
-
+import org.apache.pig.FileInputLoadFunc;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.PigException;
+import org.apache.pig.ResourceSchema;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.logicalLayer.parser.ParseException;
-import org.apache.pig.impl.logicalLayer.parser.QueryParser;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.LoadFunc;
 import org.apache.pig.impl.io.InterStorage;
 import org.apache.pig.impl.io.ReadToEndLoader;
 import org.apache.pig.impl.io.TFileStorage;
-import org.apache.pig.FileInputLoadFunc;
-import org.apache.pig.PigException;
-import org.apache.pig.ResourceSchema;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.logicalLayer.parser.QueryParser;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+import com.google.common.collect.Lists;
 
 /**
  * Class with utility static methods
  */
 public class Utils {
-  
+
     /**
      * This method is a helper for classes to implement {@link java.lang.Object#equals(java.lang.Object)}
      * checks if two objects are equals - two levels of checks are
@@ -69,8 +72,8 @@ public class Utils {
         }
         return true;
     }
-    
-    
+
+
     /**
      * This method is a helper for classes to implement {@link java.lang.Object#equals(java.lang.Object)}
      * The method checks whether the two arguments are both null or both not null and 
@@ -92,9 +95,9 @@ public class Utils {
             return false;
         }
     }
-    
+
     public static ResourceSchema getSchema(LoadFunc wrappedLoadFunc, String location, boolean checkExistence, Job job)
-                    throws IOException {
+    throws IOException {
         Configuration conf = job.getConfiguration();
         if (checkExistence) {
             Path path = new Path(location);
@@ -128,7 +131,7 @@ public class Utils {
         }
         return new ResourceSchema(s);
     }
-    
+
     public static Schema getSchemaFromString(String schemaString) throws ParseException {
         return Utils.getSchemaFromString(schemaString, DataType.BYTEARRAY);
     }
@@ -140,7 +143,7 @@ public class Utils {
         Schema.setSchemaDefaultType(schema, defaultType);
         return schema;
     }
-    
+
     public static String getTmpFileCompressorName(PigContext pigContext) {
         if (pigContext == null)
             return InterStorage.class.getName();
@@ -153,12 +156,12 @@ public class Utils {
         } else
             return InterStorage.class.getName();
     }
-    
+
     public static FileInputLoadFunc getTmpFileStorageObject(Configuration conf) throws IOException {
         boolean tmpFileCompression = conf.getBoolean("pig.tmpfilecompression", false);
         return tmpFileCompression ? new TFileStorage() : new InterStorage();
     }
-    
+
     public static boolean tmpFileCompression(PigContext pigContext) {
         if (pigContext == null)
             return false;
@@ -183,4 +186,24 @@ public class Utils {
         }
         return str.toString();
     }
+
+    public static FuncSpec buildSimpleFuncSpec(String className, byte...types) {
+        List<Schema.FieldSchema> fieldSchemas = Lists.newArrayListWithExpectedSize(types.length);
+        for (byte type : types) {
+            fieldSchemas.add(new Schema.FieldSchema(null, type));
+        }
+        return new FuncSpec(className, new Schema(fieldSchemas));
+    }
+
+    /**
+     * Replace sequences of two slashes ("\\") with one slash ("\")
+     * (not escaping a slash in grunt is disallowed, but a double slash doesn't get converted
+     * into a regular slash, so we have to do it instead)
+     * @param str
+     * @return
+     */
+    public static String slashisize(String str) {
+        return str.replace("\\\\", "\\");
+    }
+
 }

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java?rev=990936&r1=990935&r2=990936&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java Mon Aug 30 20:47:32 2010
@@ -20,8 +20,6 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Iterator;
 
-import junit.framework.TestCase;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
@@ -34,248 +32,562 @@ import org.apache.hadoop.hbase.MiniZooKe
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.BatchUpdate;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
-import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.io.FileLocalizer;
 import org.junit.After;
 import org.junit.AfterClass;
-import org.junit.Before;
+import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
 
-/** {@link org.apache.pig.backend.hadoop.hbase.HBaseStorage} Test Case **/
-@RunWith(JUnit4.class)
-public class TestHBaseStorage extends TestCase {
-
-    private static final Log LOG =
-        LogFactory.getLog(TestHBaseStorage.class);
-    
-    private static MiniCluster cluster = MiniCluster.buildCluster();
-    private HBaseConfiguration conf;
-    private MiniHBaseCluster hbaseCluster;
-    private MiniZooKeeperCluster zooKeeperCluster;
-    
-    private PigServer pig;
-    
-    final static int NUM_REGIONSERVERS = 1;
-    
-    // Test Table Inforamtions
-    private static final String TESTTABLE = "pigtable";
-    private static final String COLUMNFAMILY = "pig:";
-    private static final String TESTCOLUMN_A = "pig:col_a";
-    private static final String TESTCOLUMN_B = "pig:col_b";
-    private static final String TESTCOLUMN_C = "pig:col_c";
-    private static final HColumnDescriptor family =
-        new HColumnDescriptor(COLUMNFAMILY);
-    private static final int TEST_ROW_COUNT = 100;
-    
-    @Before
-    @Override
-    public void setUp() throws Exception {
-        super.setUp();
-        
-        conf = new HBaseConfiguration(ConfigurationUtil.
-             toConfiguration(cluster.getProperties()));
-        conf.set("fs.default.name", cluster.getFileSystem().getUri().toString());
-        Path parentdir = cluster.getFileSystem().getHomeDirectory();
-        conf.set(HConstants.HBASE_DIR, parentdir.toString());
-        
-        // Make the thread wake frequency a little slower so other threads
-        // can run
-        conf.setInt("hbase.server.thread.wakefrequency", 2000);
-        
-        // Make lease timeout longer, lease checks less frequent
-        conf.setInt("hbase.master.lease.period", 10 * 1000);
-        
-        // Increase the amount of time between client retries
-        conf.setLong("hbase.client.pause", 15 * 1000);
-        
-        try {
-            hBaseClusterSetup();
-        } catch (Exception e) {
-            if(hbaseCluster != null) {
-                hbaseCluster.shutdown();
-            }
-            throw e;
-        }
-        
-        pig = new PigServer(ExecType.MAPREDUCE, ConfigurationUtil.toProperties(conf));
-    }
-    
-    @AfterClass
-    public static void oneTimeTearDown() throws Exception {
-        cluster.shutDown();
-    }
-    
-    /**
-     * Actually start the MiniHBase instance.
-     */
-    protected void hBaseClusterSetup() throws Exception {
-        zooKeeperCluster = new MiniZooKeeperCluster();
-        int clientPort = this.zooKeeperCluster.startup(new File("build/test"));
-        conf.set("hbase.zookeeper.property.clientPort",clientPort+"");
-      // start the mini cluster
-      hbaseCluster = new MiniHBaseCluster(conf, NUM_REGIONSERVERS);
-      // opening the META table ensures that cluster is running
-      while(true){
-    	  try{
-    		  new HTable(conf, HConstants.META_TABLE_NAME);
-    		  break;
-    	  }catch(IOException e){
-    		  Thread.sleep(1000);
-    	  }
-    	  
-      }
-    }
-
-    @Override
-    @After
-    public void tearDown() throws Exception {
-        // clear the table
-        deleteTable();
-        super.tearDown();
-        try {
-            HConnectionManager.deleteConnectionInfo(conf, true);
-            if (hbaseCluster != null) {
-                try {
-                    hbaseCluster.shutdown();
-                } catch (Exception e) {
-                    LOG.warn("Closing mini hbase cluster", e);
-                }
-            }
-            if (zooKeeperCluster!=null){
-            	try{
-            		zooKeeperCluster.shutdown();
-            	} catch (IOException e){
-            		LOG.warn("Closing zookeeper cluster",e);
-            	}
-            }
-        } catch (Exception e) {
-            LOG.error(e);
-        }
-        pig.shutdown();
-    }
-
-    /**
-     * load from hbase test
-     * @throws IOException
-     * @throws ExecException
-     */
-    @Test
-    public void testLoadFromHBase() throws IOException, ExecException {
-        prepareTable();
-
-        pig.registerQuery("a = load 'hbase://" + TESTTABLE + "' using " +
-            "org.apache.pig.backend.hadoop.hbase.HBaseStorage('" + TESTCOLUMN_A + 
-            " " + TESTCOLUMN_B + " " + TESTCOLUMN_C + "') as (col_a, col_b:int, col_c);");
-        Iterator<Tuple> it = pig.openIterator("a");
-        int count = 0;
-        LOG.info("LoadFromHBase Starting");
-        while(it.hasNext()){
-            Tuple t = it.next();
-            LOG.info("LoadFromHBase "+ t);
-            String col_a = ((DataByteArray)t.get(0)).toString();
-            int col_b = (Integer)t.get(1);
-            String col_c = ((DataByteArray)t.get(2)).toString();
-            
-            assertEquals(String.valueOf(count), col_a);
-            assertEquals(count, col_b);
-            assertEquals("TEXT" + count, col_c);
-            
-            count++;
-        }
-        assertEquals(TEST_ROW_COUNT, count);
-        System.err.println("LoadFromHBase done");
-    }
-
-    /**
-     * load from hbase test w/o hbase:// prefix
-     * @throws IOException
-     * @throws ExecException
-     */
-    @Test
-    public void testBackwardsCompatibility() throws IOException, ExecException {
-        prepareTable();
-        pig.registerQuery("a = load '" + TESTTABLE + "' using " +
-            "org.apache.pig.backend.hadoop.hbase.HBaseStorage('" + TESTCOLUMN_A + 
-            " " + TESTCOLUMN_B + " " + TESTCOLUMN_C + "') as (col_a, col_b:int, col_c);");
-        Iterator<Tuple> it = pig.openIterator("a");
-        int count = 0;
-        LOG.info("LoadFromHBase Starting");
-        while(it.hasNext()){
-            Tuple t = it.next();
-            LOG.info("LoadFromHBase "+ t);
-            String col_a = ((DataByteArray)t.get(0)).toString();
-            int col_b = (Integer)t.get(1);
-            String col_c = ((DataByteArray)t.get(2)).toString();
-            
-            assertEquals(String.valueOf(count), col_a);
-            assertEquals(count, col_b);
-            assertEquals("TEXT" + count, col_c);
-            
-            count++;
-        }
-        assertEquals(TEST_ROW_COUNT, count);
-        System.err.println("LoadFromHBase done");
-    }
-    
-    /**
-     * Prepare a table in hbase for testing.
-     * 
-     * @throws IOException
-     */
-    private void prepareTable() throws IOException {
-        // define the table schema
-        HTableDescriptor tabledesc = new HTableDescriptor(TESTTABLE);
-        tabledesc.addFamily(family);
-        
-        // create the table
-        HBaseAdmin admin = new HBaseAdmin(conf);
-        if(admin.tableExists(TESTTABLE)) {
-            deleteTable();
-        }
-        admin.createTable(tabledesc);
-        
-        // put some data into table
-        HTable table = new HTable(conf, TESTTABLE);
-        
-        BatchUpdate batchUpdate;
-        
-        for(int i = 0 ; i < TEST_ROW_COUNT ; i++) {
-            String v = Integer.toString(i);
-            batchUpdate = new BatchUpdate(Bytes.toBytes(
-                "00".substring(v.length()) + v));
-            batchUpdate.put(TESTCOLUMN_A, Bytes.toBytes(v));
-            batchUpdate.put(TESTCOLUMN_B, Bytes.toBytes(v));
-            batchUpdate.put(TESTCOLUMN_C, Bytes.toBytes("TEXT" + i));
-            table.commit(batchUpdate);
-        }
-    }
-    
-    private void deleteTable() throws IOException {
-        // delete the table
-        HBaseAdmin admin = new HBaseAdmin(conf);
-        if(admin.tableExists(TESTTABLE)) {
-            admin.disableTable(TESTTABLE);
-            while(admin.isTableEnabled(TESTTABLE)) {
-                try {
-                    Thread.sleep(3000);
-                } catch (InterruptedException e) {
-                    // do nothing.
-                }
-            }
-            admin.deleteTable(TESTTABLE);
-        }
-    }
+public class TestHBaseStorage {
+
+	private static final Log LOG = LogFactory.getLog(TestHBaseStorage.class);
+
+	private static MiniCluster cluster = MiniCluster.buildCluster();
+	private static HBaseConfiguration conf;
+	private static MiniHBaseCluster hbaseCluster;
+	private static MiniZooKeeperCluster zooKeeperCluster;
+
+	private static PigServer pig;
+
+	final static int NUM_REGIONSERVERS = 1;
+
+	enum DataFormat {
+		HBaseBinary, UTF8PlainText,
+	}
+
+	// Test Table constants
+	private static final String TESTTABLE_1 = "pigtable_1";
+	private static final String TESTTABLE_2 = "pigtable_2";
+	private static final byte[] COLUMNFAMILY = Bytes.toBytes("pig");
+	private static final String TESTCOLUMN_A = "pig:col_a";
+	private static final String TESTCOLUMN_B = "pig:col_b";
+	private static final String TESTCOLUMN_C = "pig:col_c";
+	private static final HColumnDescriptor family = new HColumnDescriptor(
+			COLUMNFAMILY);
+	private static final int TEST_ROW_COUNT = 100;
+
+	@BeforeClass
+	public static void setUp() throws Exception {
+		conf = new HBaseConfiguration();
+		conf.set("fs.default.name", cluster.getFileSystem().getUri().toString());
+		Path parentdir = cluster.getFileSystem().getHomeDirectory();
+		conf.set(HConstants.HBASE_DIR, parentdir.toString());
+
+		FSUtils.setVersion(cluster.getFileSystem(), parentdir);
+		conf.set(HConstants.REGIONSERVER_PORT, "0");
+		// disable UI or it clashes for more than one RegionServer
+		conf.set("hbase.regionserver.info.port", "-1");
+		
+		// Make lease timeout longer, lease checks less frequent
+		conf.setInt("hbase.master.lease.period", 10 * 1000);
+
+		// Increase the amount of time between client retries
+		conf.setLong("hbase.client.pause", 15 * 1000);
+
+		try {
+			hBaseClusterSetup();
+		} catch (Exception e) {
+			if (hbaseCluster != null) {
+				hbaseCluster.shutdown();
+			}
+			throw e;
+		}
+
+		pig = new PigServer(ExecType.MAPREDUCE,
+				ConfigurationUtil.toProperties(conf));
+	}
+
+	@AfterClass
+	public static void oneTimeTearDown() throws Exception {
+		try {
+			HConnectionManager.deleteConnectionInfo(conf, true);
+			if (hbaseCluster != null) {
+				try {
+					hbaseCluster.shutdown();
+				} catch (Exception e) {
+					LOG.warn("Closing mini hbase cluster", e);
+				}
+			}
+			if (zooKeeperCluster != null) {
+				try {
+					zooKeeperCluster.shutdown();
+				} catch (IOException e) {
+					LOG.warn("Closing zookeeper cluster", e);
+				}
+			}
+		} catch (Exception e) {
+			LOG.error(e);
+		}
+		cluster.shutDown();
+	}
+
+	/**
+	 * Actually start the MiniHBase instance.
+	 */
+	protected static void hBaseClusterSetup() throws Exception {
+		zooKeeperCluster = new MiniZooKeeperCluster();
+		int clientPort = zooKeeperCluster.startup(new File("build/test"));
+		conf.set("hbase.zookeeper.property.clientPort", clientPort + "");
+		// start the mini cluster
+		hbaseCluster = new MiniHBaseCluster(conf, NUM_REGIONSERVERS);
+		// opening the META table ensures that cluster is running
+		while (true) {
+			try {
+				new HTable(conf, HConstants.META_TABLE_NAME);
+				break;
+			} catch (IOException e) {
+				Thread.sleep(1000);
+			}
+
+		}
+	}
+
+	@After
+	public void tearDown() throws Exception {
+		// clear the table
+		deleteTable(TESTTABLE_1);
+		deleteTable(TESTTABLE_2);
+		pig.shutdown();
+	}
+
+	/**
+	 * load from hbase test
+	 * 
+	 * @throws IOException
+	 */
+	@Test
+	public void testLoadFromHBase() throws IOException {
+		prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
+		pig.registerQuery("a = load 'hbase://" + TESTTABLE_1 + "' using "
+				+ "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+				+ TESTCOLUMN_A + " " + TESTCOLUMN_B + " " + TESTCOLUMN_C
+				+ "') as (col_a, col_b, col_c);");
+		Iterator<Tuple> it = pig.openIterator("a");
+		int count = 0;
+		LOG.info("LoadFromHBase Starting");
+		while (it.hasNext()) {
+			Tuple t = it.next();
+			LOG.info("LoadFromHBase " + t);
+			String col_a = ((DataByteArray) t.get(0)).toString();
+			String col_b = ((DataByteArray) t.get(1)).toString();
+			String col_c = ((DataByteArray) t.get(2)).toString();
+
+			Assert.assertEquals(count, Integer.parseInt(col_a));
+			Assert.assertEquals(count + 0.0, Double.parseDouble(col_b), 1e-6);
+			Assert.assertEquals("Text_" + count, col_c);
+			count++;
+		}
+		Assert.assertEquals(TEST_ROW_COUNT, count);
+		LOG.info("LoadFromHBase done");
+	}
+
+	/**
+	 * load from hbase test without hbase:// prefix
+	 * 
+	 * @throws IOException
+	 */
+	@Test
+	public void testBackwardsCompatibility() throws IOException {
+		prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
+		pig.registerQuery("a = load '" + TESTTABLE_1 + "' using "
+				+ "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+				+ TESTCOLUMN_A + " " + TESTCOLUMN_B + " " + TESTCOLUMN_C
+				+ "') as (col_a, col_b, col_c);");
+		Iterator<Tuple> it = pig.openIterator("a");
+		int count = 0;
+		LOG.info("LoadFromHBase Starting");
+		while (it.hasNext()) {
+			Tuple t = it.next();
+			LOG.info("LoadFromHBase " + t);
+			String col_a = ((DataByteArray) t.get(0)).toString();
+			String col_b = ((DataByteArray) t.get(1)).toString();
+			String col_c = ((DataByteArray) t.get(2)).toString();
+
+			Assert.assertEquals(count, Integer.parseInt(col_a));
+			Assert.assertEquals(count + 0.0, Double.parseDouble(col_b), 1e-6);
+			Assert.assertEquals("Text_" + count, col_c);
+			count++;
+		}
+		Assert.assertEquals(TEST_ROW_COUNT, count);
+		LOG.info("LoadFromHBase done");
+	}
+
+	/**
+	 * load from hbase test including the row key as the first column
+	 * 
+	 * @throws IOException
+	 */
+	@Test
+	public void testLoadFromHBaseWithRowKey() throws IOException {
+		prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
+		pig.registerQuery("a = load 'hbase://" + TESTTABLE_1 + "' using "
+				+ "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+				+ TESTCOLUMN_A + " " + TESTCOLUMN_B + " " + TESTCOLUMN_C
+				+ "','-loadKey') as (rowKey,col_a, col_b, col_c);");
+		Iterator<Tuple> it = pig.openIterator("a");
+		int count = 0;
+		LOG.info("LoadFromHBase Starting");
+		while (it.hasNext()) {
+			Tuple t = it.next();
+			LOG.info("LoadFromHBase " + t);
+			String rowKey = ((DataByteArray) t.get(0)).toString();
+			String col_a = ((DataByteArray) t.get(1)).toString();
+			String col_b = ((DataByteArray) t.get(2)).toString();
+			String col_c = ((DataByteArray) t.get(3)).toString();
+
+			Assert.assertEquals("00".substring((count + "").length()) + count,
+					rowKey);
+			Assert.assertEquals(count, Integer.parseInt(col_a));
+			Assert.assertEquals(count + 0.0, Double.parseDouble(col_b), 1e-6);
+			Assert.assertEquals("Text_" + count, col_c);
+
+			count++;
+		}
+		Assert.assertEquals(TEST_ROW_COUNT, count);
+		LOG.info("LoadFromHBase done");
+	}
+
+	/**
+	 * Test Load from hbase with parameters lte and gte (01<=key<=98)
+	 * 
+	 */
+	@Test
+	public void testLoadWithParameters_1() throws IOException {
+		prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
+
+		pig.registerQuery("a = load 'hbase://"
+				+ TESTTABLE_1
+				+ "' using "
+				+ "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+				+ TESTCOLUMN_A
+				+ " "
+				+ TESTCOLUMN_B
+				+ " "
+				+ TESTCOLUMN_C
+				+ "','-loadKey -gte 01 -lte 98') as (rowKey,col_a, col_b, col_c);");
+		Iterator<Tuple> it = pig.openIterator("a");
+		int count = 0;
+		int next = 1;
+		LOG.info("LoadFromHBase Starting");
+		while (it.hasNext()) {
+			Tuple t = it.next();
+			LOG.info("LoadFromHBase " + t);
+			String rowKey = ((DataByteArray) t.get(0)).toString();
+			String col_a = ((DataByteArray) t.get(1)).toString();
+			String col_b = ((DataByteArray) t.get(2)).toString();
+			String col_c = ((DataByteArray) t.get(3)).toString();
+
+			Assert.assertEquals("00".substring((next + "").length()) + next,
+					rowKey);
+			Assert.assertEquals(next, Integer.parseInt(col_a));
+			Assert.assertEquals(next + 0.0, Double.parseDouble(col_b), 1e-6);
+			Assert.assertEquals("Text_" + next, col_c);
+
+			count++;
+			next++;
+		}
+		Assert.assertEquals(TEST_ROW_COUNT - 2, count);
+		LOG.info("LoadFromHBase done");
+	}
+
+	/**
+	 * Test Load from hbase with parameters lt and gt (00<key<99)
+	 */
+	@Test
+	public void testLoadWithParameters_2() throws IOException {
+		prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
+
+		pig.registerQuery("a = load 'hbase://"
+				+ TESTTABLE_1
+				+ "' using "
+				+ "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+				+ TESTCOLUMN_A
+				+ " "
+				+ TESTCOLUMN_B
+				+ " "
+				+ TESTCOLUMN_C
+				+ "','-loadKey -gt 00 -lt 99') as (rowKey,col_a, col_b, col_c);");
+		Iterator<Tuple> it = pig.openIterator("a");
+		int count = 0;
+		int next = 1;
+		LOG.info("LoadFromHBase Starting");
+		while (it.hasNext()) {
+			Tuple t = it.next();
+			LOG.info("LoadFromHBase " + t);
+			String rowKey = ((DataByteArray) t.get(0)).toString();
+			String col_a = ((DataByteArray) t.get(1)).toString();
+			String col_b = ((DataByteArray) t.get(2)).toString();
+			String col_c = ((DataByteArray) t.get(3)).toString();
+
+			Assert.assertEquals("00".substring((next + "").length()) + next,
+					rowKey);
+			Assert.assertEquals(next, Integer.parseInt(col_a));
+			Assert.assertEquals(next + 0.0, Double.parseDouble(col_b), 1e-6);
+			Assert.assertEquals("Text_" + next, col_c);
+
+			count++;
+			next++;
+		}
+		Assert.assertEquals(TEST_ROW_COUNT - 2, count);
+		LOG.info("LoadFromHBase done");
+	}
+
+	/**
+	 * Test Load from hbase with parameters limit
+	 */
+	@Test
+	public void testLoadWithParameters_3() throws IOException {
+		prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
+		pig.registerQuery("a = load 'hbase://" + TESTTABLE_1 + "' using "
+				+ "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+				+ TESTCOLUMN_A + " " + TESTCOLUMN_B + " " + TESTCOLUMN_C
+				+ "','-loadKey -limit 10') as (rowKey,col_a, col_b, col_c);");
+		Iterator<Tuple> it = pig.openIterator("a");
+		int count = 0;
+		LOG.info("LoadFromHBase Starting");
+		while (it.hasNext()) {
+			Tuple t = it.next();
+			LOG.info("LoadFromHBase " + t);
+			String rowKey = ((DataByteArray) t.get(0)).toString();
+			String col_a = ((DataByteArray) t.get(1)).toString();
+			String col_b = ((DataByteArray) t.get(2)).toString();
+			String col_c = ((DataByteArray) t.get(3)).toString();
+
+			Assert.assertEquals("00".substring((count + "").length()) + count,
+					rowKey);
+			Assert.assertEquals(count, Integer.parseInt(col_a));
+			Assert.assertEquals(count + 0.0, Double.parseDouble(col_b), 1e-6);
+			Assert.assertEquals("Text_" + count, col_c);
+
+			count++;
+		}
+		// 'limit' apply for each region and here we have only one region
+		Assert.assertEquals(10, count);
+		LOG.info("LoadFromHBase done");
+	}
+
+	/**
+	 * Test Load from hbase using HBaseBinaryConverter
+	 */
+	@Test
+	public void testHBaseBinaryConverter() throws IOException {
+		prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
+
+		pig.registerQuery("a = load 'hbase://"
+				+ TESTTABLE_1
+				+ "' using "
+				+ "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+				+ TESTCOLUMN_A
+				+ " "
+				+ TESTCOLUMN_B
+				+ " "
+				+ TESTCOLUMN_C
+				+ "','-loadKey -caster HBaseBinaryConverter') as (rowKey:chararray,col_a:int, col_b:double, col_c:chararray);");
+		Iterator<Tuple> it = pig.openIterator("a");
+		int index = 0;
+		LOG.info("LoadFromHBase Starting");
+		while (it.hasNext()) {
+			Tuple t = it.next();
+			LOG.info("LoadFromHBase " + t);
+			String rowKey = (String) t.get(0);
+			int col_a = (Integer) t.get(1);
+			double col_b = (Double) t.get(2);
+			String col_c = (String) t.get(3);
+
+			Assert.assertEquals("00".substring((index + "").length()) + index,
+					rowKey);
+			Assert.assertEquals(index, col_a);
+			Assert.assertEquals(index + 0.0, col_b, 1e-6);
+			Assert.assertEquals("Text_" + index, col_c);
+			index++;
+		}
+		LOG.info("LoadFromHBase done");
+	}
+
+	/**
+	 * load from hbase 'TESTTABLE_1' using HBaseBinary format, and store it into
+	 * 'TESTTABLE_2' using HBaseBinaryFormat
+	 * 
+	 * @throws IOException
+	 */
+	@Test
+	public void testStoreToHBase_1() throws IOException {
+		prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
+		prepareTable(TESTTABLE_2, false, DataFormat.HBaseBinary);
+
+		pig.registerQuery("a = load 'hbase://"
+				+ TESTTABLE_1
+				+ "' using "
+				+ "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+				+ TESTCOLUMN_A
+				+ " "
+				+ TESTCOLUMN_B
+				+ " "
+				+ TESTCOLUMN_C
+				+ "','-loadKey -caster HBaseBinaryConverter') as (rowKey:chararray,col_a:int, col_b:double, col_c:chararray);");
+		pig.store("a", TESTTABLE_2,
+				"org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+						+ TESTCOLUMN_A + " " + TESTCOLUMN_B + " "
+						+ TESTCOLUMN_C + "','-caster HBaseBinaryConverter')");
+
+		HTable table = new HTable(conf, TESTTABLE_2);
+		ResultScanner scanner = table.getScanner(new Scan());
+		Iterator<Result> iter = scanner.iterator();
+		int i = 0;
+		for (i = 0; iter.hasNext(); ++i) {
+			Result result = iter.next();
+			String v = i + "";
+			String rowKey = Bytes.toString(result.getRow());
+			int col_a = Bytes
+					.toInt(result.getValue(Bytes.toBytes(TESTCOLUMN_A)));
+			double col_b = Bytes.toDouble(result.getValue(Bytes
+					.toBytes(TESTCOLUMN_B)));
+			String col_c = Bytes.toString(result.getValue(Bytes
+					.toBytes(TESTCOLUMN_C)));
+
+			Assert.assertEquals("00".substring(v.length()) + v, rowKey);
+			Assert.assertEquals(i, col_a);
+			Assert.assertEquals(i + 0.0, col_b, 1e-6);
+			Assert.assertEquals("Text_" + i, col_c);
+		}
+		Assert.assertEquals(100, i);
+	}
+
+	/**
+	 * load from hbase 'TESTTABLE_1' using HBaseBinary format, and store it into
+	 * 'TESTTABLE_2' using UTF-8 Plain Text format
+	 * 
+	 * @throws IOException
+	 */
+	@Test
+	public void testStoreToHBase_2() throws IOException {
+		prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
+		prepareTable(TESTTABLE_2, false, DataFormat.HBaseBinary);
+
+		pig.registerQuery("a = load 'hbase://"
+				+ TESTTABLE_1
+				+ "' using "
+				+ "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+				+ TESTCOLUMN_A
+				+ " "
+				+ TESTCOLUMN_B
+				+ " "
+				+ TESTCOLUMN_C
+				+ "','-loadKey -caster HBaseBinaryConverter') as (rowKey:chararray,col_a:int, col_b:double, col_c:chararray);");
+		pig.store("a", TESTTABLE_2,
+				"org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+						+ TESTCOLUMN_A + " " + TESTCOLUMN_B + " "
+						+ TESTCOLUMN_C + "')");
+
+		HTable table = new HTable(conf, TESTTABLE_2);
+		ResultScanner scanner = table.getScanner(new Scan());
+		Iterator<Result> iter = scanner.iterator();
+		int i = 0;
+		for (i = 0; iter.hasNext(); ++i) {
+			Result result = iter.next();
+			String v = i + "";
+			String rowKey = new String(result.getRow());
+			int col_a = Integer.parseInt(new String(result.getValue(Bytes
+					.toBytes(TESTCOLUMN_A))));
+			double col_b = Double.parseDouble(new String(result.getValue(Bytes
+					.toBytes(TESTCOLUMN_B))));
+			String col_c = new String(result.getValue(Bytes
+					.toBytes(TESTCOLUMN_C)));
+
+			Assert.assertEquals("00".substring(v.length()) + v, rowKey);
+			Assert.assertEquals(i, col_a);
+			Assert.assertEquals(i + 0.0, col_b, 1e-6);
+			Assert.assertEquals("Text_" + i, col_c);
+		}
+		Assert.assertEquals(100, i);
+	}
+
+	/**
+	 * Prepare a table in hbase for testing.
+	 * 
+	 */
+	private void prepareTable(String tableName, boolean initData,
+			DataFormat format) throws IOException {
+		// define the table schema
+		HTableDescriptor tabledesc = new HTableDescriptor(tableName);
+		tabledesc.addFamily(family);
+
+		// create the table
+		HBaseAdmin admin = new HBaseAdmin(conf);
+		deleteTable(tableName);
+		admin.createTable(tabledesc);
+
+		if (initData) {
+			// put some data into table in the increasing order of row key
+			HTable table = new HTable(conf, tableName);
+
+			for (int i = 0; i < TEST_ROW_COUNT; i++) {
+				String v = i + "";
+				if (format == DataFormat.HBaseBinary) {
+					// row key: string type
+					Put put = new Put(Bytes.toBytes("00".substring(v.length())
+							+ v));
+					// col_a: int type
+					put.add(COLUMNFAMILY, Bytes.toBytes("col_a"),
+							Bytes.toBytes(i));
+					// col_b: double type
+					put.add(COLUMNFAMILY, Bytes.toBytes("col_b"),
+							Bytes.toBytes(i + 0.0));
+					// col_c: string type
+					put.add(COLUMNFAMILY, Bytes.toBytes("col_c"),
+							Bytes.toBytes("Text_" + i));
+					table.put(put);
+				} else {
+					// row key: string type
+					Put put = new Put(
+							("00".substring(v.length()) + v).getBytes());
+					// col_a: int type
+					put.add(COLUMNFAMILY, Bytes.toBytes("col_a"),
+							(i + "").getBytes()); // int
+					// col_b: double type
+					put.add(COLUMNFAMILY, Bytes.toBytes("col_b"),
+							((i + 0.0) + "").getBytes());
+					// col_c: string type
+					put.add(COLUMNFAMILY, Bytes.toBytes("col_c"),
+							("Text_" + i).getBytes());
+					table.put(put);
+				}
+			}
+			table.flushCommits();
+		}
+	}
+
+	/**
+	 * delete the table after testing
+	 * 
+	 * @param tableName
+	 * @throws IOException
+	 */
+	private void deleteTable(String tableName) throws IOException {
+		// delete the table
+		HBaseAdmin admin = new HBaseAdmin(conf);
+		if (admin.tableExists(tableName)) {
+			admin.disableTable(tableName);
+			while (admin.isTableEnabled(tableName)) {
+				try {
+					Thread.sleep(3000);
+				} catch (InterruptedException e) {
+					// do nothing.
+				}
+			}
+			admin.deleteTable(tableName);
+		}
+	}
 
 }