You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by dv...@apache.org on 2011/07/16 17:16:13 UTC
svn commit: r1147434 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
test/org/apache/pig/test/TestHBaseStorageParams.java
Author: dvryaboy
Date: Sat Jul 16 15:16:13 2011
New Revision: 1147434
URL: http://svn.apache.org/viewvc?rev=1147434&view=rev
Log:
PIG-1946: HBaseStorage constructor syntax is error prone (billgraham via dvryaboy)
Added:
pig/trunk/test/org/apache/pig/test/TestHBaseStorageParams.java
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1147434&r1=1147433&r2=1147434&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sat Jul 16 15:16:13 2011
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-1946: HBaseStorage constructor syntax is error prone (billgraham via dvryaboy)
+
PIG-2001: DefaultTuple(List) constructor is inefficient, causes List.size()
System.arraycopy() calls (though they are 0 byte copies),
DefaultTuple(int) constructor is a bit misleading wrt time
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1147434&r1=1147433&r2=1147434&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Sat Jul 16 15:16:13 2011
@@ -21,6 +21,8 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.io.DataInput;
import java.io.DataOutput;
+import java.lang.reflect.Array;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -142,6 +144,8 @@ public class HBaseStorage extends LoadFu
private final static Options validOptions_ = new Options();
private final static CommandLineParser parser_ = new GnuParser();
private boolean loadRowKey_;
+ private String delimiter_;
+ private boolean ignoreWhitespace_;
private final long limit_;
private final int caching_;
private final boolean noWAL_;
@@ -166,6 +170,8 @@ public class HBaseStorage extends LoadFu
validOptions_.addOption("lte", true, "Records must be less than or equal to this value");
validOptions_.addOption("caching", true, "Number of rows scanners should cache");
validOptions_.addOption("limit", true, "Per-region limit");
+ validOptions_.addOption("delim", true, "Column delimiter");
+ validOptions_.addOption("ignoreWhitespace", true, "Ignore spaces when parsing columns");
validOptions_.addOption("caster", true, "Caster to use for converting values. A class name, " +
"HBaseBinaryConverter, or Utf8StorageConverter. For storage, casters must implement LoadStoreCaster.");
validOptions_.addOption("noWAL", false, "Sets the write ahead to false for faster loading. To be used with extreme caution since this could result in data loss (see http://hbase.apache.org/book.html#perf.hbase.client.putwal).");
@@ -176,17 +182,17 @@ public class HBaseStorage extends LoadFu
* provided columns.
*
* @param columnList
- * columnlist that is a presented string delimited by space. To
- * retreive all columns in a column family <code>Foo</code>, specify
- * a column as either <code>Foo:</code> or <code>Foo:*</code>. To fetch
- * only columns in the CF that start with <I>bar</I>, specify
+ * columnlist that is a presented string delimited by space and/or
+ * commas. To retreive all columns in a column family <code>Foo</code>,
+ * specify a column as either <code>Foo:</code> or <code>Foo:*</code>.
+ * To fetch only columns in the CF that start with <I>bar</I>, specify
* <code>Foo:bar*</code>. The resulting tuple will always be the size
* of the number of tokens in <code>columnList</code>. Items in the
* tuple will be scalar values when a full column descriptor is
* specified, or a map of column descriptors to values when a column
* family is specified.
*
- * @throws ParseException when unale to parse arguments
+ * @throws ParseException when unable to parse arguments
* @throws IOException
*/
public HBaseStorage(String columnList) throws ParseException, IOException {
@@ -203,6 +209,8 @@ public class HBaseStorage extends LoadFu
* <li>-gte=minKeyVal
* <li>-lte=maxKeyVal
* <li>-limit=numRowsPerRegion max number of rows to retrieve per region
+ * <li>-delim=char delimiter to use when parsing column names (default is space or comma)
+ * <li>-ignoreWhitespace=(true|false) ignore spaces when parsing column names (default true)
* <li>-caching=numRows number of rows to cache (faster scans, more memory).
* <li>-noWAL=(true|false) Sets the write ahead to false for faster loading.
* To be used with extreme caution, since this could result in data loss
@@ -213,21 +221,32 @@ public class HBaseStorage extends LoadFu
*/
public HBaseStorage(String columnList, String optString) throws ParseException, IOException {
populateValidOptions();
- String[] colNames = columnList.split(" ");
String[] optsArr = optString.split(" ");
try {
configuredOptions_ = parser_.parse(validOptions_, optsArr);
} catch (ParseException e) {
HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp( "[-loadKey] [-gt] [-gte] [-lt] [-lte] [-columnPrefix] [-caching] [-caster] [-noWAL] [-limit]", validOptions_ );
+ formatter.printHelp( "[-loadKey] [-gt] [-gte] [-lt] [-lte] [-columnPrefix] [-caching] [-caster] [-noWAL] [-limit] [-delim] [-ignoreWhitespace]", validOptions_ );
throw e;
}
- loadRowKey_ = configuredOptions_.hasOption("loadKey");
- for (String colName : colNames) {
- columnInfo_.add(new ColumnInfo(colName));
+ loadRowKey_ = configuredOptions_.hasOption("loadKey");
+
+ delimiter_ = ",";
+ if (configuredOptions_.getOptionValue("delim") != null) {
+ delimiter_ = configuredOptions_.getOptionValue("delim");
+ }
+
+ ignoreWhitespace_ = true;
+ if (configuredOptions_.hasOption("ignoreWhitespace")) {
+ String value = configuredOptions_.getOptionValue("ignoreWhitespace");
+ if (!"true".equalsIgnoreCase(value)) {
+ ignoreWhitespace_ = false;
+ }
}
+ columnInfo_ = parseColumnList(columnList, delimiter_, ignoreWhitespace_);
+
m_conf = HBaseConfiguration.create();
String defaultCaster = UDFContext.getUDFContext().getClientSystemProps().getProperty(CASTER_PROPERTY, STRING_CASTER);
String casterOption = configuredOptions_.getOptionValue("caster", defaultCaster);
@@ -239,7 +258,7 @@ public class HBaseStorage extends LoadFu
try {
caster_ = (LoadCaster) PigContext.instantiateFuncFromSpec(casterOption);
} catch (ClassCastException e) {
- LOG.error("Congifured caster does not implement LoadCaster interface.");
+ LOG.error("Configured caster does not implement LoadCaster interface.");
throw new IOException(e);
} catch (RuntimeException e) {
LOG.error("Configured caster class not found.", e);
@@ -254,6 +273,44 @@ public class HBaseStorage extends LoadFu
initScan();
}
+ /**
+ *
+ * @param columnList
+ * @param delimiter
+ * @param ignoreWhitespace
+ * @return
+ */
+ private List<ColumnInfo> parseColumnList(String columnList,
+ String delimiter,
+ boolean ignoreWhitespace) {
+ List<ColumnInfo> columnInfo = new ArrayList<ColumnInfo>();
+
+ // Default behavior is to allow combinations of spaces and delimiter
+ // which defaults to a comma. Setting to not ignore whitespace will
+ // include the whitespace in the columns names
+ String[] colNames = columnList.split(delimiter);
+ if(ignoreWhitespace) {
+ List<String> columns = new ArrayList<String>();
+
+ for (String colName : colNames) {
+ String[] subColNames = colName.split(" ");
+
+ for (String subColName : subColNames) {
+ subColName = subColName.trim();
+ if (subColName.length() > 0) columns.add(subColName);
+ }
+ }
+
+ colNames = columns.toArray(new String[columns.size()]);
+ }
+
+ for (String colName : colNames) {
+ columnInfo.add(new ColumnInfo(colName));
+ }
+
+ return columnInfo;
+ }
+
private void initScan() {
scan = new Scan();
// Set filters, if any.
@@ -282,13 +339,13 @@ public class HBaseStorage extends LoadFu
// all column family filters roll up to one parent OR filter
if (allColumnFilters == null) {
allColumnFilters = new FilterList(FilterList.Operator.MUST_PASS_ONE);
- }
+ }
if (LOG.isInfoEnabled()) {
LOG.info("Adding family:prefix filters with values " +
Bytes.toString(colInfo.getColumnFamily()) + COLON +
Bytes.toString(colInfo.getColumnPrefix()));
- }
+ }
// each column family filter consists of a FamilyFilter AND a PrefixFilter
FilterList thisColumnFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL);
@@ -323,6 +380,17 @@ public class HBaseStorage extends LoadFu
scan.setFilter(scanFilter);
}
+ /**
+ * Returns the ColumnInfo list for so external objects can inspect it. This
+ * is available for unit testing. Ideally, the unit tests and the main source
+ * would each mirror the same package structure and this method could be package
+ * private.
+ * @return
+ */
+ public List<ColumnInfo> getColumnInfoList() {
+ return columnInfo_;
+ }
+
@Override
public Tuple getNext() throws IOException {
try {
@@ -749,7 +817,7 @@ public class HBaseStorage extends LoadFu
* Map being added to the tuple, while the last results in a scalar. The 3rd
* form results in a prefix-filtered Map.
*/
- private class ColumnInfo {
+ public class ColumnInfo {
final String originalColumnName; // always set
final byte[] columnFamily; // always set
Added: pig/trunk/test/org/apache/pig/test/TestHBaseStorageParams.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestHBaseStorageParams.java?rev=1147434&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestHBaseStorageParams.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestHBaseStorageParams.java Sat Jul 16 15:16:13 2011
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.pig.test;
+
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.hadoop.hbase.HBaseStorage;
+import org.apache.pig.impl.util.UDFContext;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Properties;
+
+public class TestHBaseStorageParams {
+
+ private static final Log LOG = LogFactory.getLog(TestHBaseStorageParams.class);
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ // This is needed by HBaseStorage
+ UDFContext.getUDFContext().setClientSystemProps(new Properties());
+ }
+
+ @Test
+ public void testColumnParsingWithSpaces1() throws IOException, ParseException {
+ HBaseStorage storage = new HBaseStorage("foo:a foo:b foo:c");
+ doColumnParseTest(storage, "foo:a", "foo:b", "foo:c");
+ }
+
+ @Test
+ public void testColumnParsingWithSpaces2() throws IOException, ParseException {
+ HBaseStorage storage = new HBaseStorage("foo:a foo:b foo:c");
+ doColumnParseTest(storage, "foo:a", "foo:b", "foo:c");
+ }
+
+ @Test
+ public void testColumnParsingWithCommas1() throws IOException, ParseException {
+ HBaseStorage storage = new HBaseStorage("foo:a,foo:b,foo:c");
+ doColumnParseTest(storage, "foo:a", "foo:b", "foo:c");
+ }
+
+ @Test
+ public void testColumnParsingWithCommas2() throws IOException, ParseException {
+ HBaseStorage storage = new HBaseStorage("foo:a, foo:b , foo:c");
+ doColumnParseTest(storage, "foo:a", "foo:b", "foo:c");
+ }
+
+ @Test
+ public void testColumnParsingWithDelim() throws IOException, ParseException {
+ HBaseStorage storage = new HBaseStorage("foo:a%foo:b % foo:c,d", "-delim %");
+ doColumnParseTest(storage, "foo:a", "foo:b", "foo:c,d");
+ }
+
+ @Test
+ public void testColumnParsingWithDelimWithSpaces()
+ throws IOException, ParseException {
+
+ HBaseStorage storage =
+ new HBaseStorage("foo:a%foo:b % foo:c,d", "-delim % -ignoreWhitespace false");
+ doColumnParseTest(storage, "foo:a", "foo:b ", " foo:c,d");
+ }
+
+ private void doColumnParseTest(HBaseStorage storage, String... names) {
+ Assert.assertEquals("Wrong column count",
+ names.length, storage.getColumnInfoList().size());
+
+ for (String name : names) {
+ String[] cfAndName = name.split(":");
+ HBaseStorage.ColumnInfo col = storage.getColumnInfoList().remove(0);
+
+ Assert.assertEquals("Wrong CF",
+ cfAndName[0], new String(col.getColumnFamily()));
+ Assert.assertEquals("Wrong column descriptor",
+ cfAndName[1], new String(col.getColumnName()));
+ Assert.assertNull("Column prefix should be null", col.getColumnPrefix());
+ }
+ }
+}