You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by dv...@apache.org on 2010/08/30 22:47:32 UTC
svn commit: r990936 - in /hadoop/pig/trunk: ./ ivy/ lib/ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/hbase/ src/org/apache/pig/builtin/
src/org/apache/pig/data/ src/org/apache/pig/impl/util/
test/org/apache/pig/test/
Author: dvryaboy
Date: Mon Aug 30 20:47:32 2010
New Revision: 990936
URL: http://svn.apache.org/viewvc?rev=990936&view=rev
Log:
PIG-1205: Enhance HBaseStorage-- Make it support loading row key and implement StoreFunc
Added:
hadoop/pig/trunk/lib/hbase-0.20.6-test.jar (with props)
hadoop/pig/trunk/lib/hbase-0.20.6.jar (with props)
hadoop/pig/trunk/src/org/apache/pig/LoadStoreCaster.java
hadoop/pig/trunk/src/org/apache/pig/StoreCaster.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseBinaryConverter.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseTableInputFormat.java
Removed:
hadoop/pig/trunk/lib/hbase-0.20.0-test.jar
hadoop/pig/trunk/lib/hbase-0.20.0.jar
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/build.xml
hadoop/pig/trunk/ivy/libraries.properties
hadoop/pig/trunk/ivy/pig.pom
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
hadoop/pig/trunk/src/org/apache/pig/builtin/Utf8StorageConverter.java
hadoop/pig/trunk/src/org/apache/pig/data/DataType.java
hadoop/pig/trunk/src/org/apache/pig/impl/util/Utils.java
hadoop/pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=990936&r1=990935&r2=990936&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Mon Aug 30 20:47:32 2010
@@ -26,6 +26,8 @@ PIG-1249: Safe-guards against misconfigu
IMPROVEMENTS
+PIG-1205: Enhance HBaseStorage-- Make it support loading row key and implement StoreFunc (zjffdu and dvryaboy)
+
PIG-1568: Optimization rule FilterAboveForeach is too restrictive and doesn't
handle project * correctly (xuefuz via daijy)
Modified: hadoop/pig/trunk/build.xml
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/build.xml?rev=990936&r1=990935&r2=990936&view=diff
==============================================================================
--- hadoop/pig/trunk/build.xml (original)
+++ hadoop/pig/trunk/build.xml Mon Aug 30 20:47:32 2010
@@ -52,8 +52,8 @@
<property name="build.encoding" value="UTF8" />
<!-- TODO with only one version of hadoop in the lib folder we do not need that anymore -->
<property name="automaton.jarfile" value="automaton.jar" />
- <property name="hbase.jarfile" value="hbase-0.20.0.jar" />
- <property name="hbase.test.jarfile" value="hbase-0.20.0-test.jar" />
+ <property name="hbase.jarfile" value="hbase-0.20.6.jar" />
+ <property name="hbase.test.jarfile" value="hbase-0.20.6-test.jar" />
<property name="zookeeper.jarfile" value="zookeeper-hbase-1329.jar" />
<!-- javac properties -->
Modified: hadoop/pig/trunk/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/ivy/libraries.properties?rev=990936&r1=990935&r2=990936&view=diff
==============================================================================
--- hadoop/pig/trunk/ivy/libraries.properties (original)
+++ hadoop/pig/trunk/ivy/libraries.properties Mon Aug 30 20:47:32 2010
@@ -26,27 +26,28 @@ commons-lang.version=2.4
checkstyle.version=4.2
ivy.version=2.2.0-rc1
+guava.version=r06
hadoop-core.version=0.20.2
hadoop-test.version=0.20.2
+hbase.version=0.20.6
hsqldb.version=1.8.0.10
+jackson.version=1.0.1
javacc.version=4.2
+jdiff.version=1.0.9
jetty-util.version=6.1.14
jline.version=0.9.94
+joda-time.version=1.6
jsch.version=0.1.38
junit.version=4.5
-jdiff.version=1.0.9
+jython.version=2.5.0
log4j.version=1.2.14
+rats-lib.version=0.5.1
+
slf4j-api.version=1.4.3
slf4j-log4j12.version=1.4.3
-rats-lib.version=0.5.1
-
xerces.version=1.4.4
-jackson.version=1.0.1
-joda-time.version=1.6
-jython.version=2.5.0
-guava.version=r06
Modified: hadoop/pig/trunk/ivy/pig.pom
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/ivy/pig.pom?rev=990936&r1=990935&r2=990936&view=diff
==============================================================================
--- hadoop/pig/trunk/ivy/pig.pom (original)
+++ hadoop/pig/trunk/ivy/pig.pom Mon Aug 30 20:47:32 2010
@@ -75,10 +75,10 @@
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-test</artifactId>
- <version>${hbase-test.version}</version>
+ <version>${hbase.version}</version>
</dependency>
- <dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
@@ -96,16 +96,15 @@
<version>${joda-time.version}</version>
</dependency>
- <dependency>
- <groupId>org.python</groupId>
- <artifactId>jython</artifactId>
- <version>${jython.version}</version>
- </dependency>
+ <dependency>
+ <groupId>org.python</groupId>
+ <artifactId>jython</artifactId>
+ <version>${jython.version}</version>
+ </dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
-
</dependencies>
</project>
Added: hadoop/pig/trunk/lib/hbase-0.20.6-test.jar
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/lib/hbase-0.20.6-test.jar?rev=990936&view=auto
==============================================================================
Binary file - no diff available.
Propchange: hadoop/pig/trunk/lib/hbase-0.20.6-test.jar
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: hadoop/pig/trunk/lib/hbase-0.20.6.jar
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/lib/hbase-0.20.6.jar?rev=990936&view=auto
==============================================================================
Binary file - no diff available.
Propchange: hadoop/pig/trunk/lib/hbase-0.20.6.jar
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: hadoop/pig/trunk/src/org/apache/pig/LoadStoreCaster.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/LoadStoreCaster.java?rev=990936&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/LoadStoreCaster.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/LoadStoreCaster.java Mon Aug 30 20:47:32 2010
@@ -0,0 +1,32 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig;
+
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+
+/**
+ * This is just a union interface of LoadCaster and StoreCaster,
+ * made available for simplicity.
+ * @since Pig 0.8
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface LoadStoreCaster extends LoadCaster, StoreCaster {
+
+}
Added: hadoop/pig/trunk/src/org/apache/pig/StoreCaster.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/StoreCaster.java?rev=990936&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/StoreCaster.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/StoreCaster.java Mon Aug 30 20:47:32 2010
@@ -0,0 +1,37 @@
+package org.apache.pig;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+
+/**
+ * An interface that provides methods for converting Pig internal types to byte[].
+ * It is intended to be used by StoreFunc implementations.
+ * @since Pig 0.8
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving // Because we still don't have the map casts quite right
+public interface StoreCaster extends LoadCaster {
+ public byte[] toBytes(DataBag bag) throws IOException;
+
+ public byte[] toBytes(String s) throws IOException;
+
+ public byte[] toBytes(Double d) throws IOException;
+
+ public byte[] toBytes(Float f) throws IOException;
+
+ public byte[] toBytes(Integer i) throws IOException;
+
+ public byte[] toBytes(Long l) throws IOException;
+
+ public byte[] toBytes(Map<String, Object> m) throws IOException;
+
+ public byte[] toBytes(Tuple t) throws IOException;
+
+ public byte[] toBytes(DataByteArray a) throws IOException;
+}
Added: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseBinaryConverter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseBinaryConverter.java?rev=990936&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseBinaryConverter.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseBinaryConverter.java Mon Aug 30 20:47:32 2010
@@ -0,0 +1,151 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.hbase;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.pig.LoadStoreCaster;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+
+public class HBaseBinaryConverter implements LoadStoreCaster {
+
+ @Override
+ public String bytesToCharArray(byte[] b) throws IOException {
+ return Bytes.toString(b);
+ }
+
+ @Override
+ public Double bytesToDouble(byte[] b) throws IOException {
+ if (Bytes.SIZEOF_DOUBLE > b.length){
+ return Bytes.toDouble(Bytes.padHead(b, Bytes.SIZEOF_DOUBLE - b.length));
+ } else {
+ return Bytes.toDouble(Bytes.head(b, Bytes.SIZEOF_DOUBLE));
+ }
+ }
+
+ @Override
+ public Float bytesToFloat(byte[] b) throws IOException {
+ if (Bytes.SIZEOF_FLOAT > b.length){
+ return Bytes.toFloat(Bytes.padHead(b, Bytes.SIZEOF_FLOAT - b.length));
+ } else {
+ return Bytes.toFloat(Bytes.head(b, Bytes.SIZEOF_FLOAT));
+ }
+ }
+
+ @Override
+ public Integer bytesToInteger(byte[] b) throws IOException {
+ if (Bytes.SIZEOF_INT > b.length){
+ return Bytes.toInt(Bytes.padHead(b, Bytes.SIZEOF_INT - b.length));
+ } else {
+ return Bytes.toInt(Bytes.head(b, Bytes.SIZEOF_INT));
+ }
+ }
+
+ @Override
+ public Long bytesToLong(byte[] b) throws IOException {
+ if (Bytes.SIZEOF_LONG > b.length){
+ return Bytes.toLong(Bytes.padHead(b, Bytes.SIZEOF_LONG - b.length));
+ } else {
+ return Bytes.toLong(Bytes.head(b, Bytes.SIZEOF_LONG));
+ }
+ }
+
+ /**
+ * NOT IMPLEMENTED
+ */
+ @Override
+ public Map<String, Object> bytesToMap(byte[] b) throws IOException {
+ throw new ExecException("Can't generate a Map from byte[]");
+ }
+
+ /**
+ * NOT IMPLEMENTED
+ */
+ @Override
+ public Tuple bytesToTuple(byte[] b, ResourceFieldSchema fieldSchema) throws IOException {
+ throw new ExecException("Can't generate a Tuple from byte[]");
+ }
+
+ /**
+ * NOT IMPLEMENTED
+ */
+ @Override
+ public DataBag bytesToBag(byte[] b, ResourceFieldSchema fieldSchema) throws IOException {
+ throw new ExecException("Can't generate DataBags from byte[]");
+ }
+
+ /**
+ * NOT IMPLEMENTED
+ */
+ @Override
+ public byte[] toBytes(DataBag bag) throws IOException {
+ throw new ExecException("Cant' generate bytes from DataBag");
+ }
+
+ @Override
+ public byte[] toBytes(String s) throws IOException {
+ return Bytes.toBytes(s);
+ }
+
+ @Override
+ public byte[] toBytes(Double d) throws IOException {
+ return Bytes.toBytes(d);
+ }
+
+ @Override
+ public byte[] toBytes(Float f) throws IOException {
+ return Bytes.toBytes(f);
+ }
+
+ @Override
+ public byte[] toBytes(Integer i) throws IOException {
+ return Bytes.toBytes(i);
+ }
+
+ @Override
+ public byte[] toBytes(Long l) throws IOException {
+ return Bytes.toBytes(l);
+ }
+
+ /**
+ * NOT IMPLEMENTED
+ */
+ @Override
+ public byte[] toBytes(Map<String, Object> m) throws IOException {
+ throw new IOException("Can't generate bytes from Map");
+ }
+
+ /**
+ * NOT IMPLEMENTED
+ */
+ @Override
+ public byte[] toBytes(Tuple t) throws IOException {
+ throw new IOException("Can't generate bytes from Tuple");
+ }
+
+ @Override
+ public byte[] toBytes(DataByteArray a) throws IOException {
+ return a.get();
+ }
+}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=990936&r1=990935&r2=990936&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Mon Aug 30 20:47:32 2010
@@ -19,116 +19,438 @@ package org.apache.pig.backend.hadoop.hb
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
-
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.pig.LoadCaster;
import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadPushDown;
+import org.apache.pig.LoadStoreCaster;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.backend.hadoop.hbase.HBaseTableInputFormat.HBaseTableIFBuilder;
+import org.apache.pig.builtin.Utf8StorageConverter;
+import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.Utils;
+
+import com.google.common.collect.Lists;
/**
- * A Hbase Loader
+ * A HBase implementation of LoadFunc and StoreFunc
+ *
+ * TODO(dmitriy) test that all this stuff works
+ * TODO(dmitriy) documentation
*/
-public class HBaseStorage extends LoadFunc {
+public class HBaseStorage extends LoadFunc implements StoreFuncInterface, LoadPushDown {
+
+ private static final Log LOG = LogFactory.getLog(HBaseStorage.class);
+
+ private final static String STRING_CASTER = "UTF8StorageConverter";
+ private final static String BYTE_CASTER = "HBaseBinaryConverter";
+ private final static String CASTER_PROPERTY = "pig.hbase.caster";
+
+ private List<byte[]> columnList_ = Lists.newArrayList();
+ private HTable m_table;
+ private HBaseConfiguration m_conf;
+ private RecordReader reader;
+ private RecordWriter writer;
+ private Scan scan;
+
+ private final CommandLine configuredOptions_;
+ private final static Options validOptions_ = new Options();
+ private final static CommandLineParser parser_ = new GnuParser();
+ private boolean loadRowKey_;
+ private final long limit_;
+ private final int caching_;
+
+ protected transient byte[] gt_;
+ protected transient byte[] gte_;
+ protected transient byte[] lt_;
+ protected transient byte[] lte_;
+
+ private LoadCaster caster_;
+
+ private ResourceSchema schema_;
+
+ private static void populateValidOptions() {
+ validOptions_.addOption("loadKey", false, "Load Key");
+ validOptions_.addOption("gt", true, "Records must be greater than this value " +
+ "(binary, double-slash-escaped)");
+ validOptions_.addOption("lt", true, "Records must be less than this value (binary, double-slash-escaped)");
+ validOptions_.addOption("gte", true, "Records must be greater than or equal to this value");
+ validOptions_.addOption("lte", true, "Records must be less than or equal to this value");
+ validOptions_.addOption("caching", true, "Number of rows scanners should cache");
+ validOptions_.addOption("limit", true, "Per-region limit");
+ validOptions_.addOption("caster", true, "Caster to use for converting values. A class name, " +
+ "HBaseBinaryConverter, or Utf8StorageConverter. For storage, casters must implement LoadStoreCaster.");
+ }
+
+ /**
+ * Constructor. Construct a HBase Table LoadFunc and StoreFunc to load or store the cells of the
+ * provided columns.
+ *
+ * @param columnList
+ * columnlist that is a presented string delimited by space.
+ * @throws ParseException when unale to parse arguments
+ * @throws IOException
+ */
+ public HBaseStorage(String columnList) throws ParseException, IOException {
+ this(columnList,"");
+ }
+
+ /**
+ * Constructor. Construct a HBase Table LoadFunc and StoreFunc to load or store.
+ * @param columnList
+ * @param optString Loader options. Known options:<ul>
+ * <li>-loadKey=(true|false) Load the row key as the first column
+ * <li>-gt=minKeyVal
+ * <li>-lt=maxKeyVal
+ * <li>-gte=minKeyVal
+ * <li>-lte=maxKeyVal
+ * <li>-caching=numRows number of rows to cache (faster scans, more memory).
+ * </ul>
+ * @throws ParseException
+ * @throws IOException
+ */
+ public HBaseStorage(String columnList, String optString) throws ParseException, IOException {
+ populateValidOptions();
+ String[] colNames = columnList.split(" ");
+ String[] optsArr = optString.split(" ");
+ try {
+ configuredOptions_ = parser_.parse(validOptions_, optsArr);
+ } catch (ParseException e) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp( "[-loadKey] [-gt] [-gte] [-lt] [-lte] [-caching] [-caster]", validOptions_ );
+ throw e;
+ }
+
+ loadRowKey_ = configuredOptions_.hasOption("loadKey");
+ for (String colName : colNames) {
+ columnList_.add(Bytes.toBytes(colName));
+ }
+
+ m_conf = new HBaseConfiguration();
+ String defaultCaster = m_conf.get(CASTER_PROPERTY, STRING_CASTER);
+ String casterOption = configuredOptions_.getOptionValue("caster", defaultCaster);
+ if (STRING_CASTER.equalsIgnoreCase(casterOption)) {
+ caster_ = new Utf8StorageConverter();
+ } else if (BYTE_CASTER.equalsIgnoreCase(casterOption)) {
+ caster_ = new HBaseBinaryConverter();
+ } else {
+ try {
+ @SuppressWarnings("unchecked")
+ Class<LoadCaster> casterClass = (Class<LoadCaster>) Class.forName(casterOption);
+ caster_ = casterClass.newInstance();
+ } catch (ClassCastException e) {
+ LOG.error("Congifured caster does not implement LoadCaster interface.");
+ throw new IOException(e);
+ } catch (ClassNotFoundException e) {
+ LOG.error("Configured caster class not found.", e);
+ throw new IOException(e);
+ } catch (InstantiationException e) {
+ LOG.error("Unable to instantiate configured caster " + casterOption, e);
+ throw new IOException(e);
+ } catch (IllegalAccessException e) {
+ LOG.error("Illegal Access Exception for configured caster " + casterOption, e);
+ throw new IOException(e);
+ }
+ }
+
+ caching_ = Integer.valueOf(configuredOptions_.getOptionValue("caching", "100"));
+ limit_ = Long.valueOf(configuredOptions_.getOptionValue("limit", "-1"));
+ initScan();
+ }
+
+ private void initScan() {
+ scan = new Scan();
+ // Set filters, if any.
+ if (configuredOptions_.hasOption("gt")) {
+ gt_ = Bytes.toBytesBinary(Utils.slashisize(configuredOptions_.getOptionValue("gt")));
+ addFilter(CompareOp.GREATER, gt_);
+ }
+ if (configuredOptions_.hasOption("lt")) {
+ lt_ = Bytes.toBytesBinary(Utils.slashisize(configuredOptions_.getOptionValue("lt")));
+ addFilter(CompareOp.LESS, lt_);
+ }
+ if (configuredOptions_.hasOption("gte")) {
+ gte_ = Bytes.toBytesBinary(Utils.slashisize(configuredOptions_.getOptionValue("gte")));
+ addFilter(CompareOp.GREATER_OR_EQUAL, gte_);
+ }
+ if (configuredOptions_.hasOption("lte")) {
+ lte_ = Bytes.toBytesBinary(Utils.slashisize(configuredOptions_.getOptionValue("lte")));
+ addFilter(CompareOp.LESS_OR_EQUAL, lte_);
+ }
+ }
+
+ private void addFilter(CompareOp op, byte[] val) {
+ LOG.info("Adding filter " + op.toString() + " with value " + Bytes.toStringBinary(val));
+ FilterList scanFilter = (FilterList) scan.getFilter();
+ if (scanFilter == null) {
+ scanFilter = new FilterList();
+ }
+ scanFilter.addFilter(new RowFilter(op, new BinaryComparator(val)));
+ scan.setFilter(scanFilter);
+ }
- private byte[][] m_cols;
- private HTable m_table;
- private Configuration m_conf=new Configuration();
- private RecordReader reader;
- private Scan scan=new Scan();
-
- private static final Log LOG = LogFactory.getLog(HBaseStorage.class);
-
- /**
- * Constructor. Construct a HBase Table loader to load the cells of the
- * provided columns.
- *
- * @param columnList
- * columnlist that is a presented string delimited by space.
- */
- public HBaseStorage(String columnList) {
- String[] colNames = columnList.split(" ");
- m_cols = new byte[colNames.length][];
- for (int i = 0; i < m_cols.length; i++) {
- m_cols[i] = Bytes.toBytes(colNames[i]);
- scan.addColumn(m_cols[i]);
- }
- }
-
-
- @Override
- public Tuple getNext() throws IOException {
- try {
- if (reader.nextKeyValue()) {
- ImmutableBytesWritable rowKey = (ImmutableBytesWritable) reader
- .getCurrentKey();
- Result result = (Result) reader.getCurrentValue();
- Tuple tuple=TupleFactory.getInstance().newTuple(m_cols.length);
- for (int i=0;i<m_cols.length;++i){
- tuple.set(i, new DataByteArray(result.getValue(m_cols[i])));
- }
- return tuple;
- }
- } catch (InterruptedException e) {
- throw new IOException(e);
- }
- return null;
- }
-
- @Override
- public InputFormat getInputFormat() {
- TableInputFormat inputFormat = new TableInputFormat();
- inputFormat.setConf(m_conf);
- return inputFormat;
- }
-
- @Override
- public void prepareToRead(RecordReader reader, PigSplit split) {
- this.reader = reader;
- }
+ @Override
+ public Tuple getNext() throws IOException {
+ try {
+ if (reader.nextKeyValue()) {
+ ImmutableBytesWritable rowKey = (ImmutableBytesWritable) reader
+ .getCurrentKey();
+ Result result = (Result) reader.getCurrentValue();
+ int tupleSize=columnList_.size();
+ if (loadRowKey_){
+ tupleSize++;
+ }
+ Tuple tuple=TupleFactory.getInstance().newTuple(tupleSize);
+
+ int startIndex=0;
+ if (loadRowKey_){
+ tuple.set(0, new DataByteArray(rowKey.get()));
+ startIndex++;
+ }
+ for (int i=0;i<columnList_.size();++i){
+ tuple.set(i+startIndex, new DataByteArray(result.getValue(columnList_.get(i))));
+ }
+ return tuple;
+ }
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ return null;
+ }
+
+ @Override
+ public InputFormat getInputFormat() {
+ TableInputFormat inputFormat = new HBaseTableIFBuilder()
+ .withLimit(limit_)
+ .withGt(gt_)
+ .withGte(gte_)
+ .withLt(lt_)
+ .withLte(lte_)
+ .withConf(m_conf)
+ .build();
+ return inputFormat;
+ }
- @Override
+ @Override
+ public void prepareToRead(RecordReader reader, PigSplit split) {
+ this.reader = reader;
+ }
+
+ @Override
public void setLocation(String location, Job job) throws IOException {
+ String tablename = location;
if (location.startsWith("hbase://")){
- m_conf.set(TableInputFormat.INPUT_TABLE, location.substring(8));
- }else{
- m_conf.set(TableInputFormat.INPUT_TABLE, location);
+ tablename = location.substring(8);
+ }
+ if (m_table == null) {
+ m_table = new HTable(m_conf, tablename);
}
+ m_table.setScannerCaching(caching_);
+ m_conf.set(TableInputFormat.INPUT_TABLE, tablename);
+ scan.addColumns(columnList_.toArray(new byte[0][]));
m_conf.set(TableInputFormat.SCAN, convertScanToString(scan));
}
- @Override
- public String relativeToAbsolutePath(String location, Path curDir)
- throws IOException {
- return location;
- }
-
- private static String convertScanToString(Scan scan) {
-
- try {
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(out);
- scan.write(dos);
- return Base64.encodeBytes(out.toByteArray());
- } catch (IOException e) {
- LOG.error(e);
- return "";
- }
+ @Override
+ public String relativeToAbsolutePath(String location, Path curDir)
+ throws IOException {
+ return location;
+ }
+
+ private static String convertScanToString(Scan scan) {
+ try {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(out);
+ scan.write(dos);
+ return Base64.encodeBytes(out.toByteArray());
+ } catch (IOException e) {
+ LOG.error(e);
+ return "";
+ }
+
+ }
+
+ /**
+ * Set up the caster to use for reading values out of, and writing to, HBase.
+ */
+ @Override
+ public LoadCaster getLoadCaster() throws IOException {
+ return caster_;
+ }
+
+ /*
+ * StoreFunc Methods
+ * @see org.apache.pig.StoreFuncInterface#getOutputFormat()
+ */
+
+ @Override
+ public OutputFormat getOutputFormat() throws IOException {
+ TableOutputFormat outputFormat = new TableOutputFormat();
+ return outputFormat;
+ }
+
+ @Override
+ public void checkSchema(ResourceSchema s) throws IOException {
+ if (! (caster_ instanceof LoadStoreCaster)) {
+ LOG.error("Caster must implement LoadStoreCaster for writing to HBase.");
+ throw new IOException("Bad Caster " + caster_.getClass());
+ }
+ schema_ = s;
+ }
+
+ // Suppressing unchecked warnings for RecordWriter, which is not parameterized by StoreFuncInterface
+ @Override
+ public void prepareToWrite(@SuppressWarnings("rawtypes") RecordWriter writer) throws IOException {
+ this.writer = writer;
+ }
+
+ // Suppressing unchecked warnings for RecordWriter, which is not parameterized by StoreFuncInterface
+ @SuppressWarnings("unchecked")
+ @Override
+ public void putNext(Tuple t) throws IOException {
+ ResourceFieldSchema[] fieldSchemas = (schema_ == null) ? null : schema_.getFields();
+ Put put=new Put(objToBytes(t.get(0),
+ (fieldSchemas == null) ? DataType.findType(t.get(0)) : fieldSchemas[0].getType()));
+ long ts=System.currentTimeMillis();
+
+ for (byte[] col : columnList_) {
+ LOG.info("putNext -- col: " + Bytes.toStringBinary(col));
+ }
+
+ for (int i=1;i<t.size();++i){
+ put.add(columnList_.get(i-1), ts, objToBytes(t.get(i),
+ (fieldSchemas == null) ? DataType.findType(t.get(i)) : fieldSchemas[i].getType()));
+ }
+ try {
+ writer.write(null, put);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private byte[] objToBytes(Object o, byte type) throws IOException {
+ LoadStoreCaster caster = (LoadStoreCaster) caster_;
+ switch (type) {
+ case DataType.BYTEARRAY: return ((DataByteArray) o).get();
+ case DataType.BAG: return caster.toBytes((DataBag) o);
+ case DataType.CHARARRAY: return caster.toBytes((String) o);
+ case DataType.DOUBLE: return caster.toBytes((Double) o);
+ case DataType.FLOAT: return caster.toBytes((Float) o);
+ case DataType.INTEGER: return caster.toBytes((Integer) o);
+ case DataType.LONG: return caster.toBytes((Long) o);
+
+ // The type conversion here is unchecked.
+ // Relying on DataType.findType to do the right thing.
+ case DataType.MAP: return caster.toBytes((Map<String, Object>) o);
+
+ case DataType.NULL: return null;
+ case DataType.TUPLE: return caster.toBytes((Tuple) o);
+ case DataType.ERROR: throw new IOException("Unable to determine type of " + o.getClass());
+ default: throw new IOException("Unable to find a converter for tuple field " + o);
+ }
+ }
+
+ @Override
+ public String relToAbsPathForStoreLocation(String location, Path curDir)
+ throws IOException {
+ return location;
+ }
+
+ @Override
+ public void setStoreFuncUDFContextSignature(String signature) { }
+
+ @Override
+ public void setStoreLocation(String location, Job job) throws IOException {
+ if (location.startsWith("hbase://")){
+ job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, location.substring(8));
+ }else{
+ job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, location);
+ }
+ }
+
+ @Override
+ public void cleanupOnFailure(String location, Job job) throws IOException {
+
+ }
+
+ /*
+ * LoadPushDown Methods.
+ */
+
+ @Override
+ public List<OperatorSet> getFeatures() {
+ return Arrays.asList(LoadPushDown.OperatorSet.PROJECTION);
+ }
+
+ @Override
+ public RequiredFieldResponse pushProjection(
+ RequiredFieldList requiredFieldList) throws FrontendException {
+ List<RequiredField> requiredFields = requiredFieldList.getFields();
+ List<byte[]> newColumns = Lists.newArrayListWithExpectedSize(requiredFields.size());
+
+ // HBase Row Key is the first column in the schema when it's loaded,
+ // and is not included in the columnList (since it's not a proper column).
+ int offset = loadRowKey_ ? 1 : 0;
+
+ if (loadRowKey_) {
+ if (requiredFields.size() < 1 || requiredFields.get(0).getIndex() != 0) {
+ loadRowKey_ = false;
+ } else {
+ // We just processed the fact that the row key needs to be loaded.
+ requiredFields.remove(0);
+ }
+ }
+
+ for (RequiredField field : requiredFields) {
+ int fieldIndex = field.getIndex();
+ newColumns.add(columnList_.get(fieldIndex - offset));
+ }
+ LOG.info("pushProjection After Projection: loadRowKey is " + loadRowKey_) ;
+ for (byte[] col : newColumns) {
+ LOG.info("pushProjection -- col: " + Bytes.toStringBinary(col));
+ }
+ columnList_ = newColumns;
+ return new RequiredFieldResponse(true);
+ }
- }
}
Added: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseTableInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseTableInputFormat.java?rev=990936&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseTableInputFormat.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseTableInputFormat.java Mon Aug 30 20:47:32 2010
@@ -0,0 +1,197 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.hbase;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.List;
+import java.util.ListIterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.mapreduce.TableSplit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+public class HBaseTableInputFormat extends TableInputFormat {
+ private static final Log LOG = LogFactory.getLog(HBaseTableInputFormat.class);
+
+ protected final byte[] gt_;
+ protected final byte[] gte_;
+ protected final byte[] lt_;
+ protected final byte[] lte_;
+
+ public HBaseTableInputFormat() {
+ this(-1, null, null, null, null);
+ }
+
+ protected HBaseTableInputFormat(long limit, byte[] gt, byte[] gte, byte[] lt, byte[] lte) {
+ super();
+ setTableRecordReader(new HBaseTableRecordReader(limit));
+ gt_ = gt;
+ gte_ = gte;
+ lt_ = lt;
+ lte_ = lte;
+ }
+
+ public static class HBaseTableIFBuilder {
+ protected byte[] gt_;
+ protected byte[] gte_;
+ protected byte[] lt_;
+ protected byte[] lte_;
+ protected long limit_;
+ protected Configuration conf_;
+
+ public HBaseTableIFBuilder withGt(byte[] gt) { gt_ = gt; return this; }
+ public HBaseTableIFBuilder withGte(byte[] gte) { gte_ = gte; return this; }
+ public HBaseTableIFBuilder withLt(byte[] lt) { lt_ = lt; return this; }
+ public HBaseTableIFBuilder withLte(byte[] lte) { lte_ = lte; return this; }
+ public HBaseTableIFBuilder withLimit(long limit) { limit_ = limit; return this; }
+ public HBaseTableIFBuilder withConf(Configuration conf) { conf_ = conf; return this; }
+
+ public HBaseTableInputFormat build() {
+ HBaseTableInputFormat inputFormat = new HBaseTableInputFormat(limit_, gt_, gte_, lt_, lte_);
+ if (conf_ != null) inputFormat.setConf(conf_);
+ return inputFormat;
+ }
+
+ }
+
+ @Override
+ public List<InputSplit> getSplits(org.apache.hadoop.mapreduce.JobContext context)
+ throws IOException {
+ List<InputSplit> splits = super.getSplits(context);
+ ListIterator<InputSplit> splitIter = splits.listIterator();
+ while (splitIter.hasNext()) {
+ TableSplit split = (TableSplit) splitIter.next();
+ byte[] startKey = split.getStartRow();
+ byte[] endKey = split.getEndRow();
+ // Skip if the region doesn't satisfy configured options.
+ if ((skipRegion(CompareOp.LESS, startKey, lt_)) ||
+ (skipRegion(CompareOp.GREATER, endKey, gt_)) ||
+ (skipRegion(CompareOp.GREATER, endKey, gte_)) ||
+ (skipRegion(CompareOp.LESS_OR_EQUAL, startKey, lte_)) ) {
+ splitIter.remove();
+ }
+ }
+ return splits;
+ }
+
+ private boolean skipRegion(CompareOp op, byte[] key, byte[] option ) {
+
+ if (key.length == 0 || option == null)
+ return false;
+
+ BinaryComparator comp = new BinaryComparator(option);
+ RowFilter rowFilter = new RowFilter(op, comp);
+ return rowFilter.filterRowKey(key, 0, key.length);
+ }
+
+ protected class HBaseTableRecordReader extends TableRecordReader {
+
+ private long recordsSeen = 0;
+ private final long limit_;
+ private byte[] startRow_;
+ private byte[] endRow_;
+ private transient byte[] currRow_;
+
+ private BigInteger bigStart_;
+ private BigInteger bigEnd_;
+ private BigDecimal bigRange_;
+ private transient float progressSoFar_ = 0;
+
+ public HBaseTableRecordReader(long limit) {
+ limit_ = limit;
+ }
+
+ @Override
+ public void setScan(Scan scan) {
+ super.setScan(scan);
+
+ startRow_ = scan.getStartRow();
+ endRow_ = scan.getStopRow();
+ byte[] startPadded;
+ byte[] endPadded;
+ if (startRow_.length < endRow_.length) {
+ startPadded = Bytes.padTail(startRow_, endRow_.length - startRow_.length);
+ endPadded = endRow_;
+ } else if (endRow_.length < startRow_.length) {
+ startPadded = startRow_;
+ endPadded = Bytes.padTail(endRow_, startRow_.length - endRow_.length);
+ } else {
+ startPadded = startRow_;
+ endPadded = endRow_;
+ }
+ currRow_ = startRow_;
+ byte [] prependHeader = {1, 0};
+ bigStart_ = new BigInteger(Bytes.add(prependHeader, startPadded));
+ bigEnd_ = new BigInteger(Bytes.add(prependHeader, endPadded));
+ bigRange_ = new BigDecimal(bigEnd_.subtract(bigStart_));
+ LOG.info("setScan with ranges: " + bigStart_ + " - " + bigEnd_ + " ( " + bigRange_ + ")");
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (limit_ > 0 && ++recordsSeen > limit_) {
+ return false;
+ }
+ boolean hasMore = super.nextKeyValue();
+ if (hasMore) {
+ currRow_ = getCurrentKey().get();
+ }
+ return hasMore;
+
+ }
+
+ @Override
+ public float getProgress() {
+ if (currRow_ == null || currRow_.length == 0 || endRow_.length == 0 || endRow_ == HConstants.LAST_ROW) {
+ return 0;
+ }
+ byte[] lastPadded = currRow_;
+ if (currRow_.length < endRow_.length) {
+ lastPadded = Bytes.padTail(currRow_, endRow_.length - currRow_.length);
+ }
+ if (currRow_.length < startRow_.length) {
+ lastPadded = Bytes.padTail(currRow_, startRow_.length - currRow_.length);
+ }
+ byte [] prependHeader = {1, 0};
+ BigInteger bigLastRow = new BigInteger(Bytes.add(prependHeader, lastPadded));
+ if (bigLastRow.compareTo(bigEnd_) > 0) {
+ return progressSoFar_;
+ }
+ BigDecimal processed = new BigDecimal(bigLastRow.subtract(bigStart_));
+ try {
+ BigDecimal progress = processed.setScale(3).divide(bigRange_, BigDecimal.ROUND_HALF_DOWN);
+ progressSoFar_ = progress.floatValue();
+ return progressSoFar_;
+ } catch (java.lang.ArithmeticException e) {
+ return 0;
+ }
+ }
+
+ }
+}
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/Utf8StorageConverter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/Utf8StorageConverter.java?rev=990936&r1=990935&r2=990936&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/Utf8StorageConverter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/Utf8StorageConverter.java Mon Aug 30 20:47:32 2010
@@ -28,7 +28,7 @@ import java.util.Stack;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.pig.LoadCaster;
+import org.apache.pig.LoadStoreCaster;
import org.apache.pig.PigWarning;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.data.BagFactory;
@@ -45,7 +45,7 @@ import org.apache.pig.impl.util.LogUtils
* and pig data types. It is intended to be extended by load and store
* functions (such as {@link PigStorage}).
*/
-public class Utf8StorageConverter implements LoadCaster {
+public class Utf8StorageConverter implements LoadStoreCaster {
protected BagFactory mBagFactory = BagFactory.getInstance();
protected TupleFactory mTupleFactory = TupleFactory.getInstance();
@@ -287,6 +287,7 @@ public class Utf8StorageConverter implem
return field;
}
+ @Override
public DataBag bytesToBag(byte[] b, ResourceFieldSchema schema) throws IOException {
if(b == null)
return null;
@@ -305,12 +306,14 @@ public class Utf8StorageConverter implem
return db;
}
+ @Override
public String bytesToCharArray(byte[] b) throws IOException {
if(b == null)
return null;
return new String(b, "UTF-8");
}
+ @Override
public Double bytesToDouble(byte[] b) {
if(b == null)
return null;
@@ -324,7 +327,8 @@ public class Utf8StorageConverter implem
return null;
}
}
-
+
+ @Override
public Float bytesToFloat(byte[] b) throws IOException {
if(b == null)
return null;
@@ -346,6 +350,12 @@ public class Utf8StorageConverter implem
}
}
+ /**
+ * Note: NOT part of the LoadCaster interface.
+ * @param b
+ * @return
+ * @throws IOException
+ */
public Boolean bytesToBoolean(byte[] b) throws IOException {
if(b == null)
return null;
@@ -353,6 +363,7 @@ public class Utf8StorageConverter implem
return Boolean.valueOf(s);
}
+ @Override
public Integer bytesToInteger(byte[] b) throws IOException {
if(b == null)
return null;
@@ -383,7 +394,8 @@ public class Utf8StorageConverter implem
}
}
}
-
+
+ @Override
public Long bytesToLong(byte[] b) throws IOException {
if (b == null)
return null;
@@ -421,6 +433,7 @@ public class Utf8StorageConverter implem
}
}
+ @Override
@SuppressWarnings("unchecked")
public Map<String, Object> bytesToMap(byte[] b) throws IOException {
if(b == null)
@@ -443,6 +456,7 @@ public class Utf8StorageConverter implem
return map;
}
+ @Override
public Tuple bytesToTuple(byte[] b, ResourceFieldSchema fieldSchema) throws IOException {
if(b == null)
return null;
@@ -464,39 +478,49 @@ public class Utf8StorageConverter implem
return t;
}
-
+ @Override
public byte[] toBytes(DataBag bag) throws IOException {
return bag.toString().getBytes();
}
+ @Override
public byte[] toBytes(String s) throws IOException {
return s.getBytes();
}
+ @Override
public byte[] toBytes(Double d) throws IOException {
return d.toString().getBytes();
}
+ @Override
public byte[] toBytes(Float f) throws IOException {
return f.toString().getBytes();
}
+ @Override
public byte[] toBytes(Integer i) throws IOException {
return i.toString().getBytes();
}
+ @Override
public byte[] toBytes(Long l) throws IOException {
return l.toString().getBytes();
}
+ @Override
public byte[] toBytes(Map<String, Object> m) throws IOException {
return DataType.mapToString(m).getBytes();
}
+ @Override
public byte[] toBytes(Tuple t) throws IOException {
return t.toString().getBytes();
}
-
+ @Override
+ public byte[] toBytes(DataByteArray a) throws IOException {
+ return a.get();
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/data/DataType.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DataType.java?rev=990936&r1=990935&r2=990936&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DataType.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DataType.java Mon Aug 30 20:47:32 2010
@@ -457,6 +457,45 @@ public class DataType {
return 1;
}
}
+
+ public static byte[] toBytes(Object o) throws ExecException {
+ return toBytes(o, findType(o));
+ }
+
+ @SuppressWarnings("unchecked")
+ public static byte[] toBytes(Object o, byte type) throws ExecException {
+ switch (type) {
+ case BOOLEAN:
+ return ((Boolean) o).booleanValue() ? new byte[] {1} : new byte[] {0};
+ case BYTE:
+ return new byte[] {((Byte) o)};
+
+ case INTEGER:
+ case DOUBLE:
+ case FLOAT:
+ case LONG:
+ return ((Number) o).toString().getBytes();
+
+ case CHARARRAY:
+ return ((String) o).getBytes();
+ case MAP:
+ return mapToString((Map<String, Object>) o).getBytes();
+ case TUPLE:
+ return ((Tuple) o).toString().getBytes();
+ case BYTEARRAY:
+ return ((DataByteArray) o).get();
+ case BAG:
+ return ((DataBag) o).toString().getBytes();
+ case NULL:
+ return null;
+ default:
+ int errCode = 1071;
+ String msg = "Cannot convert a " + findTypeName(o) +
+ " to a ByteArray";
+ throw new ExecException(msg, errCode, PigException.INPUT);
+
+ }
+ }
/**
* Force a data object to an Integer, if possible. Any numeric type
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/util/Utils.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/util/Utils.java?rev=990936&r1=990935&r2=990936&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/util/Utils.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/util/Utils.java Mon Aug 30 20:47:32 2010
@@ -19,32 +19,35 @@ package org.apache.pig.impl.util;
import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
-
+import org.apache.pig.FileInputLoadFunc;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.PigException;
+import org.apache.pig.ResourceSchema;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.logicalLayer.parser.ParseException;
-import org.apache.pig.impl.logicalLayer.parser.QueryParser;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.PigContext;
-import org.apache.pig.LoadFunc;
import org.apache.pig.impl.io.InterStorage;
import org.apache.pig.impl.io.ReadToEndLoader;
import org.apache.pig.impl.io.TFileStorage;
-import org.apache.pig.FileInputLoadFunc;
-import org.apache.pig.PigException;
-import org.apache.pig.ResourceSchema;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.logicalLayer.parser.QueryParser;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+import com.google.common.collect.Lists;
/**
* Class with utility static methods
*/
public class Utils {
-
+
/**
* This method is a helper for classes to implement {@link java.lang.Object#equals(java.lang.Object)}
* checks if two objects are equals - two levels of checks are
@@ -69,8 +72,8 @@ public class Utils {
}
return true;
}
-
-
+
+
/**
* This method is a helper for classes to implement {@link java.lang.Object#equals(java.lang.Object)}
* The method checks whether the two arguments are both null or both not null and
@@ -92,9 +95,9 @@ public class Utils {
return false;
}
}
-
+
public static ResourceSchema getSchema(LoadFunc wrappedLoadFunc, String location, boolean checkExistence, Job job)
- throws IOException {
+ throws IOException {
Configuration conf = job.getConfiguration();
if (checkExistence) {
Path path = new Path(location);
@@ -128,7 +131,7 @@ public class Utils {
}
return new ResourceSchema(s);
}
-
+
public static Schema getSchemaFromString(String schemaString) throws ParseException {
return Utils.getSchemaFromString(schemaString, DataType.BYTEARRAY);
}
@@ -140,7 +143,7 @@ public class Utils {
Schema.setSchemaDefaultType(schema, defaultType);
return schema;
}
-
+
public static String getTmpFileCompressorName(PigContext pigContext) {
if (pigContext == null)
return InterStorage.class.getName();
@@ -153,12 +156,12 @@ public class Utils {
} else
return InterStorage.class.getName();
}
-
+
public static FileInputLoadFunc getTmpFileStorageObject(Configuration conf) throws IOException {
boolean tmpFileCompression = conf.getBoolean("pig.tmpfilecompression", false);
return tmpFileCompression ? new TFileStorage() : new InterStorage();
}
-
+
public static boolean tmpFileCompression(PigContext pigContext) {
if (pigContext == null)
return false;
@@ -183,4 +186,24 @@ public class Utils {
}
return str.toString();
}
+
+ public static FuncSpec buildSimpleFuncSpec(String className, byte...types) {
+ List<Schema.FieldSchema> fieldSchemas = Lists.newArrayListWithExpectedSize(types.length);
+ for (byte type : types) {
+ fieldSchemas.add(new Schema.FieldSchema(null, type));
+ }
+ return new FuncSpec(className, new Schema(fieldSchemas));
+ }
+
+ /**
+ * Replace sequences of two slashes ("\\") with one slash ("\")
+ * (not escaping a slash in grunt is disallowed, but a double slash doesn't get converted
+ * into a regular slash, so we have to do it instead)
+ * @param str
+ * @return
+ */
+ public static String slashisize(String str) {
+ return str.replace("\\\\", "\\");
+ }
+
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java?rev=990936&r1=990935&r2=990936&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java Mon Aug 30 20:47:32 2010
@@ -20,8 +20,6 @@ import java.io.File;
import java.io.IOException;
import java.util.Iterator;
-import junit.framework.TestCase;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
@@ -34,248 +32,562 @@ import org.apache.hadoop.hbase.MiniZooKe
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
-import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.io.FileLocalizer;
import org.junit.After;
import org.junit.AfterClass;
-import org.junit.Before;
+import org.junit.Assert;
+import org.junit.BeforeClass;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-/** {@link org.apache.pig.backend.hadoop.hbase.HBaseStorage} Test Case **/
-@RunWith(JUnit4.class)
-public class TestHBaseStorage extends TestCase {
-
- private static final Log LOG =
- LogFactory.getLog(TestHBaseStorage.class);
-
- private static MiniCluster cluster = MiniCluster.buildCluster();
- private HBaseConfiguration conf;
- private MiniHBaseCluster hbaseCluster;
- private MiniZooKeeperCluster zooKeeperCluster;
-
- private PigServer pig;
-
- final static int NUM_REGIONSERVERS = 1;
-
- // Test Table Inforamtions
- private static final String TESTTABLE = "pigtable";
- private static final String COLUMNFAMILY = "pig:";
- private static final String TESTCOLUMN_A = "pig:col_a";
- private static final String TESTCOLUMN_B = "pig:col_b";
- private static final String TESTCOLUMN_C = "pig:col_c";
- private static final HColumnDescriptor family =
- new HColumnDescriptor(COLUMNFAMILY);
- private static final int TEST_ROW_COUNT = 100;
-
- @Before
- @Override
- public void setUp() throws Exception {
- super.setUp();
-
- conf = new HBaseConfiguration(ConfigurationUtil.
- toConfiguration(cluster.getProperties()));
- conf.set("fs.default.name", cluster.getFileSystem().getUri().toString());
- Path parentdir = cluster.getFileSystem().getHomeDirectory();
- conf.set(HConstants.HBASE_DIR, parentdir.toString());
-
- // Make the thread wake frequency a little slower so other threads
- // can run
- conf.setInt("hbase.server.thread.wakefrequency", 2000);
-
- // Make lease timeout longer, lease checks less frequent
- conf.setInt("hbase.master.lease.period", 10 * 1000);
-
- // Increase the amount of time between client retries
- conf.setLong("hbase.client.pause", 15 * 1000);
-
- try {
- hBaseClusterSetup();
- } catch (Exception e) {
- if(hbaseCluster != null) {
- hbaseCluster.shutdown();
- }
- throw e;
- }
-
- pig = new PigServer(ExecType.MAPREDUCE, ConfigurationUtil.toProperties(conf));
- }
-
- @AfterClass
- public static void oneTimeTearDown() throws Exception {
- cluster.shutDown();
- }
-
- /**
- * Actually start the MiniHBase instance.
- */
- protected void hBaseClusterSetup() throws Exception {
- zooKeeperCluster = new MiniZooKeeperCluster();
- int clientPort = this.zooKeeperCluster.startup(new File("build/test"));
- conf.set("hbase.zookeeper.property.clientPort",clientPort+"");
- // start the mini cluster
- hbaseCluster = new MiniHBaseCluster(conf, NUM_REGIONSERVERS);
- // opening the META table ensures that cluster is running
- while(true){
- try{
- new HTable(conf, HConstants.META_TABLE_NAME);
- break;
- }catch(IOException e){
- Thread.sleep(1000);
- }
-
- }
- }
-
- @Override
- @After
- public void tearDown() throws Exception {
- // clear the table
- deleteTable();
- super.tearDown();
- try {
- HConnectionManager.deleteConnectionInfo(conf, true);
- if (hbaseCluster != null) {
- try {
- hbaseCluster.shutdown();
- } catch (Exception e) {
- LOG.warn("Closing mini hbase cluster", e);
- }
- }
- if (zooKeeperCluster!=null){
- try{
- zooKeeperCluster.shutdown();
- } catch (IOException e){
- LOG.warn("Closing zookeeper cluster",e);
- }
- }
- } catch (Exception e) {
- LOG.error(e);
- }
- pig.shutdown();
- }
-
- /**
- * load from hbase test
- * @throws IOException
- * @throws ExecException
- */
- @Test
- public void testLoadFromHBase() throws IOException, ExecException {
- prepareTable();
-
- pig.registerQuery("a = load 'hbase://" + TESTTABLE + "' using " +
- "org.apache.pig.backend.hadoop.hbase.HBaseStorage('" + TESTCOLUMN_A +
- " " + TESTCOLUMN_B + " " + TESTCOLUMN_C + "') as (col_a, col_b:int, col_c);");
- Iterator<Tuple> it = pig.openIterator("a");
- int count = 0;
- LOG.info("LoadFromHBase Starting");
- while(it.hasNext()){
- Tuple t = it.next();
- LOG.info("LoadFromHBase "+ t);
- String col_a = ((DataByteArray)t.get(0)).toString();
- int col_b = (Integer)t.get(1);
- String col_c = ((DataByteArray)t.get(2)).toString();
-
- assertEquals(String.valueOf(count), col_a);
- assertEquals(count, col_b);
- assertEquals("TEXT" + count, col_c);
-
- count++;
- }
- assertEquals(TEST_ROW_COUNT, count);
- System.err.println("LoadFromHBase done");
- }
-
- /**
- * load from hbase test w/o hbase:// prefix
- * @throws IOException
- * @throws ExecException
- */
- @Test
- public void testBackwardsCompatibility() throws IOException, ExecException {
- prepareTable();
- pig.registerQuery("a = load '" + TESTTABLE + "' using " +
- "org.apache.pig.backend.hadoop.hbase.HBaseStorage('" + TESTCOLUMN_A +
- " " + TESTCOLUMN_B + " " + TESTCOLUMN_C + "') as (col_a, col_b:int, col_c);");
- Iterator<Tuple> it = pig.openIterator("a");
- int count = 0;
- LOG.info("LoadFromHBase Starting");
- while(it.hasNext()){
- Tuple t = it.next();
- LOG.info("LoadFromHBase "+ t);
- String col_a = ((DataByteArray)t.get(0)).toString();
- int col_b = (Integer)t.get(1);
- String col_c = ((DataByteArray)t.get(2)).toString();
-
- assertEquals(String.valueOf(count), col_a);
- assertEquals(count, col_b);
- assertEquals("TEXT" + count, col_c);
-
- count++;
- }
- assertEquals(TEST_ROW_COUNT, count);
- System.err.println("LoadFromHBase done");
- }
-
- /**
- * Prepare a table in hbase for testing.
- *
- * @throws IOException
- */
- private void prepareTable() throws IOException {
- // define the table schema
- HTableDescriptor tabledesc = new HTableDescriptor(TESTTABLE);
- tabledesc.addFamily(family);
-
- // create the table
- HBaseAdmin admin = new HBaseAdmin(conf);
- if(admin.tableExists(TESTTABLE)) {
- deleteTable();
- }
- admin.createTable(tabledesc);
-
- // put some data into table
- HTable table = new HTable(conf, TESTTABLE);
-
- BatchUpdate batchUpdate;
-
- for(int i = 0 ; i < TEST_ROW_COUNT ; i++) {
- String v = Integer.toString(i);
- batchUpdate = new BatchUpdate(Bytes.toBytes(
- "00".substring(v.length()) + v));
- batchUpdate.put(TESTCOLUMN_A, Bytes.toBytes(v));
- batchUpdate.put(TESTCOLUMN_B, Bytes.toBytes(v));
- batchUpdate.put(TESTCOLUMN_C, Bytes.toBytes("TEXT" + i));
- table.commit(batchUpdate);
- }
- }
-
- private void deleteTable() throws IOException {
- // delete the table
- HBaseAdmin admin = new HBaseAdmin(conf);
- if(admin.tableExists(TESTTABLE)) {
- admin.disableTable(TESTTABLE);
- while(admin.isTableEnabled(TESTTABLE)) {
- try {
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- // do nothing.
- }
- }
- admin.deleteTable(TESTTABLE);
- }
- }
+public class TestHBaseStorage {
+
+ private static final Log LOG = LogFactory.getLog(TestHBaseStorage.class);
+
+ private static MiniCluster cluster = MiniCluster.buildCluster();
+ private static HBaseConfiguration conf;
+ private static MiniHBaseCluster hbaseCluster;
+ private static MiniZooKeeperCluster zooKeeperCluster;
+
+ private static PigServer pig;
+
+ final static int NUM_REGIONSERVERS = 1;
+
+ enum DataFormat {
+ HBaseBinary, UTF8PlainText,
+ }
+
+ // Test Table constants
+ private static final String TESTTABLE_1 = "pigtable_1";
+ private static final String TESTTABLE_2 = "pigtable_2";
+ private static final byte[] COLUMNFAMILY = Bytes.toBytes("pig");
+ private static final String TESTCOLUMN_A = "pig:col_a";
+ private static final String TESTCOLUMN_B = "pig:col_b";
+ private static final String TESTCOLUMN_C = "pig:col_c";
+ private static final HColumnDescriptor family = new HColumnDescriptor(
+ COLUMNFAMILY);
+ private static final int TEST_ROW_COUNT = 100;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ conf = new HBaseConfiguration();
+ conf.set("fs.default.name", cluster.getFileSystem().getUri().toString());
+ Path parentdir = cluster.getFileSystem().getHomeDirectory();
+ conf.set(HConstants.HBASE_DIR, parentdir.toString());
+
+ FSUtils.setVersion(cluster.getFileSystem(), parentdir);
+ conf.set(HConstants.REGIONSERVER_PORT, "0");
+ // disable UI or it clashes for more than one RegionServer
+ conf.set("hbase.regionserver.info.port", "-1");
+
+ // Make lease timeout longer, lease checks less frequent
+ conf.setInt("hbase.master.lease.period", 10 * 1000);
+
+ // Increase the amount of time between client retries
+ conf.setLong("hbase.client.pause", 15 * 1000);
+
+ try {
+ hBaseClusterSetup();
+ } catch (Exception e) {
+ if (hbaseCluster != null) {
+ hbaseCluster.shutdown();
+ }
+ throw e;
+ }
+
+ pig = new PigServer(ExecType.MAPREDUCE,
+ ConfigurationUtil.toProperties(conf));
+ }
+
+ @AfterClass
+ public static void oneTimeTearDown() throws Exception {
+ try {
+ HConnectionManager.deleteConnectionInfo(conf, true);
+ if (hbaseCluster != null) {
+ try {
+ hbaseCluster.shutdown();
+ } catch (Exception e) {
+ LOG.warn("Closing mini hbase cluster", e);
+ }
+ }
+ if (zooKeeperCluster != null) {
+ try {
+ zooKeeperCluster.shutdown();
+ } catch (IOException e) {
+ LOG.warn("Closing zookeeper cluster", e);
+ }
+ }
+ } catch (Exception e) {
+ LOG.error(e);
+ }
+ cluster.shutDown();
+ }
+
+ /**
+ * Actually start the MiniHBase instance.
+ */
+ protected static void hBaseClusterSetup() throws Exception {
+ zooKeeperCluster = new MiniZooKeeperCluster();
+ int clientPort = zooKeeperCluster.startup(new File("build/test"));
+ conf.set("hbase.zookeeper.property.clientPort", clientPort + "");
+ // start the mini cluster
+ hbaseCluster = new MiniHBaseCluster(conf, NUM_REGIONSERVERS);
+ // opening the META table ensures that cluster is running
+ while (true) {
+ try {
+ new HTable(conf, HConstants.META_TABLE_NAME);
+ break;
+ } catch (IOException e) {
+ Thread.sleep(1000);
+ }
+
+ }
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ // clear the table
+ deleteTable(TESTTABLE_1);
+ deleteTable(TESTTABLE_2);
+ pig.shutdown();
+ }
+
+ /**
+ * load from hbase test
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testLoadFromHBase() throws IOException {
+ prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
+ pig.registerQuery("a = load 'hbase://" + TESTTABLE_1 + "' using "
+ + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+ + TESTCOLUMN_A + " " + TESTCOLUMN_B + " " + TESTCOLUMN_C
+ + "') as (col_a, col_b, col_c);");
+ Iterator<Tuple> it = pig.openIterator("a");
+ int count = 0;
+ LOG.info("LoadFromHBase Starting");
+ while (it.hasNext()) {
+ Tuple t = it.next();
+ LOG.info("LoadFromHBase " + t);
+ String col_a = ((DataByteArray) t.get(0)).toString();
+ String col_b = ((DataByteArray) t.get(1)).toString();
+ String col_c = ((DataByteArray) t.get(2)).toString();
+
+ Assert.assertEquals(count, Integer.parseInt(col_a));
+ Assert.assertEquals(count + 0.0, Double.parseDouble(col_b), 1e-6);
+ Assert.assertEquals("Text_" + count, col_c);
+ count++;
+ }
+ Assert.assertEquals(TEST_ROW_COUNT, count);
+ LOG.info("LoadFromHBase done");
+ }
+
+ /**
+ * load from hbase test without hbase:// prefix
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testBackwardsCompatibility() throws IOException {
+ prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
+ pig.registerQuery("a = load '" + TESTTABLE_1 + "' using "
+ + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+ + TESTCOLUMN_A + " " + TESTCOLUMN_B + " " + TESTCOLUMN_C
+ + "') as (col_a, col_b, col_c);");
+ Iterator<Tuple> it = pig.openIterator("a");
+ int count = 0;
+ LOG.info("LoadFromHBase Starting");
+ while (it.hasNext()) {
+ Tuple t = it.next();
+ LOG.info("LoadFromHBase " + t);
+ String col_a = ((DataByteArray) t.get(0)).toString();
+ String col_b = ((DataByteArray) t.get(1)).toString();
+ String col_c = ((DataByteArray) t.get(2)).toString();
+
+ Assert.assertEquals(count, Integer.parseInt(col_a));
+ Assert.assertEquals(count + 0.0, Double.parseDouble(col_b), 1e-6);
+ Assert.assertEquals("Text_" + count, col_c);
+ count++;
+ }
+ Assert.assertEquals(TEST_ROW_COUNT, count);
+ LOG.info("LoadFromHBase done");
+ }
+
+ /**
+ * load from hbase test including the row key as the first column
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testLoadFromHBaseWithRowKey() throws IOException {
+ prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
+ pig.registerQuery("a = load 'hbase://" + TESTTABLE_1 + "' using "
+ + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+ + TESTCOLUMN_A + " " + TESTCOLUMN_B + " " + TESTCOLUMN_C
+ + "','-loadKey') as (rowKey,col_a, col_b, col_c);");
+ Iterator<Tuple> it = pig.openIterator("a");
+ int count = 0;
+ LOG.info("LoadFromHBase Starting");
+ while (it.hasNext()) {
+ Tuple t = it.next();
+ LOG.info("LoadFromHBase " + t);
+ String rowKey = ((DataByteArray) t.get(0)).toString();
+ String col_a = ((DataByteArray) t.get(1)).toString();
+ String col_b = ((DataByteArray) t.get(2)).toString();
+ String col_c = ((DataByteArray) t.get(3)).toString();
+
+ Assert.assertEquals("00".substring((count + "").length()) + count,
+ rowKey);
+ Assert.assertEquals(count, Integer.parseInt(col_a));
+ Assert.assertEquals(count + 0.0, Double.parseDouble(col_b), 1e-6);
+ Assert.assertEquals("Text_" + count, col_c);
+
+ count++;
+ }
+ Assert.assertEquals(TEST_ROW_COUNT, count);
+ LOG.info("LoadFromHBase done");
+ }
+
+ /**
+ * Test Load from hbase with parameters lte and gte (01<=key<=98)
+ *
+ */
+ @Test
+ public void testLoadWithParameters_1() throws IOException {
+ prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
+
+ pig.registerQuery("a = load 'hbase://"
+ + TESTTABLE_1
+ + "' using "
+ + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+ + TESTCOLUMN_A
+ + " "
+ + TESTCOLUMN_B
+ + " "
+ + TESTCOLUMN_C
+ + "','-loadKey -gte 01 -lte 98') as (rowKey,col_a, col_b, col_c);");
+ Iterator<Tuple> it = pig.openIterator("a");
+ int count = 0;
+ int next = 1;
+ LOG.info("LoadFromHBase Starting");
+ while (it.hasNext()) {
+ Tuple t = it.next();
+ LOG.info("LoadFromHBase " + t);
+ String rowKey = ((DataByteArray) t.get(0)).toString();
+ String col_a = ((DataByteArray) t.get(1)).toString();
+ String col_b = ((DataByteArray) t.get(2)).toString();
+ String col_c = ((DataByteArray) t.get(3)).toString();
+
+ Assert.assertEquals("00".substring((next + "").length()) + next,
+ rowKey);
+ Assert.assertEquals(next, Integer.parseInt(col_a));
+ Assert.assertEquals(next + 0.0, Double.parseDouble(col_b), 1e-6);
+ Assert.assertEquals("Text_" + next, col_c);
+
+ count++;
+ next++;
+ }
+ Assert.assertEquals(TEST_ROW_COUNT - 2, count);
+ LOG.info("LoadFromHBase done");
+ }
+
+ /**
+ * Test Load from hbase with parameters lt and gt (00<key<99)
+ */
+ @Test
+ public void testLoadWithParameters_2() throws IOException {
+ prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
+
+ pig.registerQuery("a = load 'hbase://"
+ + TESTTABLE_1
+ + "' using "
+ + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+ + TESTCOLUMN_A
+ + " "
+ + TESTCOLUMN_B
+ + " "
+ + TESTCOLUMN_C
+ + "','-loadKey -gt 00 -lt 99') as (rowKey,col_a, col_b, col_c);");
+ Iterator<Tuple> it = pig.openIterator("a");
+ int count = 0;
+ int next = 1;
+ LOG.info("LoadFromHBase Starting");
+ while (it.hasNext()) {
+ Tuple t = it.next();
+ LOG.info("LoadFromHBase " + t);
+ String rowKey = ((DataByteArray) t.get(0)).toString();
+ String col_a = ((DataByteArray) t.get(1)).toString();
+ String col_b = ((DataByteArray) t.get(2)).toString();
+ String col_c = ((DataByteArray) t.get(3)).toString();
+
+ Assert.assertEquals("00".substring((next + "").length()) + next,
+ rowKey);
+ Assert.assertEquals(next, Integer.parseInt(col_a));
+ Assert.assertEquals(next + 0.0, Double.parseDouble(col_b), 1e-6);
+ Assert.assertEquals("Text_" + next, col_c);
+
+ count++;
+ next++;
+ }
+ Assert.assertEquals(TEST_ROW_COUNT - 2, count);
+ LOG.info("LoadFromHBase done");
+ }
+
+ /**
+ * Test Load from hbase with parameters limit
+ */
+ @Test
+ public void testLoadWithParameters_3() throws IOException {
+ prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
+ pig.registerQuery("a = load 'hbase://" + TESTTABLE_1 + "' using "
+ + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+ + TESTCOLUMN_A + " " + TESTCOLUMN_B + " " + TESTCOLUMN_C
+ + "','-loadKey -limit 10') as (rowKey,col_a, col_b, col_c);");
+ Iterator<Tuple> it = pig.openIterator("a");
+ int count = 0;
+ LOG.info("LoadFromHBase Starting");
+ while (it.hasNext()) {
+ Tuple t = it.next();
+ LOG.info("LoadFromHBase " + t);
+ String rowKey = ((DataByteArray) t.get(0)).toString();
+ String col_a = ((DataByteArray) t.get(1)).toString();
+ String col_b = ((DataByteArray) t.get(2)).toString();
+ String col_c = ((DataByteArray) t.get(3)).toString();
+
+ Assert.assertEquals("00".substring((count + "").length()) + count,
+ rowKey);
+ Assert.assertEquals(count, Integer.parseInt(col_a));
+ Assert.assertEquals(count + 0.0, Double.parseDouble(col_b), 1e-6);
+ Assert.assertEquals("Text_" + count, col_c);
+
+ count++;
+ }
+ // 'limit' apply for each region and here we have only one region
+ Assert.assertEquals(10, count);
+ LOG.info("LoadFromHBase done");
+ }
+
+ /**
+ * Test Load from hbase using HBaseBinaryConverter
+ */
+ @Test
+ public void testHBaseBinaryConverter() throws IOException {
+ prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
+
+ pig.registerQuery("a = load 'hbase://"
+ + TESTTABLE_1
+ + "' using "
+ + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+ + TESTCOLUMN_A
+ + " "
+ + TESTCOLUMN_B
+ + " "
+ + TESTCOLUMN_C
+ + "','-loadKey -caster HBaseBinaryConverter') as (rowKey:chararray,col_a:int, col_b:double, col_c:chararray);");
+ Iterator<Tuple> it = pig.openIterator("a");
+ int index = 0;
+ LOG.info("LoadFromHBase Starting");
+ while (it.hasNext()) {
+ Tuple t = it.next();
+ LOG.info("LoadFromHBase " + t);
+ String rowKey = (String) t.get(0);
+ int col_a = (Integer) t.get(1);
+ double col_b = (Double) t.get(2);
+ String col_c = (String) t.get(3);
+
+ Assert.assertEquals("00".substring((index + "").length()) + index,
+ rowKey);
+ Assert.assertEquals(index, col_a);
+ Assert.assertEquals(index + 0.0, col_b, 1e-6);
+ Assert.assertEquals("Text_" + index, col_c);
+ index++;
+ }
+ LOG.info("LoadFromHBase done");
+ }
+
+ /**
+ * load from hbase 'TESTTABLE_1' using HBaseBinary format, and store it into
+ * 'TESTTABLE_2' using HBaseBinaryFormat
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testStoreToHBase_1() throws IOException {
+ prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
+ prepareTable(TESTTABLE_2, false, DataFormat.HBaseBinary);
+
+ pig.registerQuery("a = load 'hbase://"
+ + TESTTABLE_1
+ + "' using "
+ + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+ + TESTCOLUMN_A
+ + " "
+ + TESTCOLUMN_B
+ + " "
+ + TESTCOLUMN_C
+ + "','-loadKey -caster HBaseBinaryConverter') as (rowKey:chararray,col_a:int, col_b:double, col_c:chararray);");
+ pig.store("a", TESTTABLE_2,
+ "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+ + TESTCOLUMN_A + " " + TESTCOLUMN_B + " "
+ + TESTCOLUMN_C + "','-caster HBaseBinaryConverter')");
+
+ HTable table = new HTable(conf, TESTTABLE_2);
+ ResultScanner scanner = table.getScanner(new Scan());
+ Iterator<Result> iter = scanner.iterator();
+ int i = 0;
+ for (i = 0; iter.hasNext(); ++i) {
+ Result result = iter.next();
+ String v = i + "";
+ String rowKey = Bytes.toString(result.getRow());
+ int col_a = Bytes
+ .toInt(result.getValue(Bytes.toBytes(TESTCOLUMN_A)));
+ double col_b = Bytes.toDouble(result.getValue(Bytes
+ .toBytes(TESTCOLUMN_B)));
+ String col_c = Bytes.toString(result.getValue(Bytes
+ .toBytes(TESTCOLUMN_C)));
+
+ Assert.assertEquals("00".substring(v.length()) + v, rowKey);
+ Assert.assertEquals(i, col_a);
+ Assert.assertEquals(i + 0.0, col_b, 1e-6);
+ Assert.assertEquals("Text_" + i, col_c);
+ }
+ Assert.assertEquals(100, i);
+ }
+
+ /**
+ * load from hbase 'TESTTABLE_1' using HBaseBinary format, and store it into
+ * 'TESTTABLE_2' using UTF-8 Plain Text format
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testStoreToHBase_2() throws IOException {
+ prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
+ prepareTable(TESTTABLE_2, false, DataFormat.HBaseBinary);
+
+ pig.registerQuery("a = load 'hbase://"
+ + TESTTABLE_1
+ + "' using "
+ + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+ + TESTCOLUMN_A
+ + " "
+ + TESTCOLUMN_B
+ + " "
+ + TESTCOLUMN_C
+ + "','-loadKey -caster HBaseBinaryConverter') as (rowKey:chararray,col_a:int, col_b:double, col_c:chararray);");
+ pig.store("a", TESTTABLE_2,
+ "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+ + TESTCOLUMN_A + " " + TESTCOLUMN_B + " "
+ + TESTCOLUMN_C + "')");
+
+ HTable table = new HTable(conf, TESTTABLE_2);
+ ResultScanner scanner = table.getScanner(new Scan());
+ Iterator<Result> iter = scanner.iterator();
+ int i = 0;
+ for (i = 0; iter.hasNext(); ++i) {
+ Result result = iter.next();
+ String v = i + "";
+ String rowKey = new String(result.getRow());
+ int col_a = Integer.parseInt(new String(result.getValue(Bytes
+ .toBytes(TESTCOLUMN_A))));
+ double col_b = Double.parseDouble(new String(result.getValue(Bytes
+ .toBytes(TESTCOLUMN_B))));
+ String col_c = new String(result.getValue(Bytes
+ .toBytes(TESTCOLUMN_C)));
+
+ Assert.assertEquals("00".substring(v.length()) + v, rowKey);
+ Assert.assertEquals(i, col_a);
+ Assert.assertEquals(i + 0.0, col_b, 1e-6);
+ Assert.assertEquals("Text_" + i, col_c);
+ }
+ Assert.assertEquals(100, i);
+ }
+
+ /**
+ * Prepare a table in hbase for testing.
+ *
+ */
+ private void prepareTable(String tableName, boolean initData,
+ DataFormat format) throws IOException {
+ // define the table schema
+ HTableDescriptor tabledesc = new HTableDescriptor(tableName);
+ tabledesc.addFamily(family);
+
+ // create the table
+ HBaseAdmin admin = new HBaseAdmin(conf);
+ deleteTable(tableName);
+ admin.createTable(tabledesc);
+
+ if (initData) {
+ // put some data into table in the increasing order of row key
+ HTable table = new HTable(conf, tableName);
+
+ for (int i = 0; i < TEST_ROW_COUNT; i++) {
+ String v = i + "";
+ if (format == DataFormat.HBaseBinary) {
+ // row key: string type
+ Put put = new Put(Bytes.toBytes("00".substring(v.length())
+ + v));
+ // col_a: int type
+ put.add(COLUMNFAMILY, Bytes.toBytes("col_a"),
+ Bytes.toBytes(i));
+ // col_b: double type
+ put.add(COLUMNFAMILY, Bytes.toBytes("col_b"),
+ Bytes.toBytes(i + 0.0));
+ // col_c: string type
+ put.add(COLUMNFAMILY, Bytes.toBytes("col_c"),
+ Bytes.toBytes("Text_" + i));
+ table.put(put);
+ } else {
+ // row key: string type
+ Put put = new Put(
+ ("00".substring(v.length()) + v).getBytes());
+ // col_a: int type
+ put.add(COLUMNFAMILY, Bytes.toBytes("col_a"),
+ (i + "").getBytes()); // int
+ // col_b: double type
+ put.add(COLUMNFAMILY, Bytes.toBytes("col_b"),
+ ((i + 0.0) + "").getBytes());
+ // col_c: string type
+ put.add(COLUMNFAMILY, Bytes.toBytes("col_c"),
+ ("Text_" + i).getBytes());
+ table.put(put);
+ }
+ }
+ table.flushCommits();
+ }
+ }
+
+ /**
+ * delete the table after testing
+ *
+ * @param tableName
+ * @throws IOException
+ */
+ private void deleteTable(String tableName) throws IOException {
+ // delete the table
+ HBaseAdmin admin = new HBaseAdmin(conf);
+ if (admin.tableExists(tableName)) {
+ admin.disableTable(tableName);
+ while (admin.isTableEnabled(tableName)) {
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException e) {
+ // do nothing.
+ }
+ }
+ admin.deleteTable(tableName);
+ }
+ }
}