You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by dv...@apache.org on 2011/07/16 17:16:13 UTC

svn commit: r1147434 - in /pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java test/org/apache/pig/test/TestHBaseStorageParams.java

Author: dvryaboy
Date: Sat Jul 16 15:16:13 2011
New Revision: 1147434

URL: http://svn.apache.org/viewvc?rev=1147434&view=rev
Log:
PIG-1946: HBaseStorage constructor syntax is error prone (billgraham via dvryaboy)

Added:
    pig/trunk/test/org/apache/pig/test/TestHBaseStorageParams.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1147434&r1=1147433&r2=1147434&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sat Jul 16 15:16:13 2011
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-1946: HBaseStorage constructor syntax is error prone (billgraham via dvryaboy)
+
 PIG-2001: DefaultTuple(List) constructor is inefficient, causes List.size()
  System.arraycopy() calls (though they are 0 byte copies), 
  DefaultTuple(int) constructor is a bit misleading wrt time 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1147434&r1=1147433&r2=1147434&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Sat Jul 16 15:16:13 2011
@@ -21,6 +21,8 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.DataInput;
 import java.io.DataOutput;
+import java.lang.reflect.Array;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -142,6 +144,8 @@ public class HBaseStorage extends LoadFu
     private final static Options validOptions_ = new Options();
     private final static CommandLineParser parser_ = new GnuParser();
     private boolean loadRowKey_;
+    private String delimiter_;
+    private boolean ignoreWhitespace_;
     private final long limit_;
     private final int caching_;
     private final boolean noWAL_;
@@ -166,6 +170,8 @@ public class HBaseStorage extends LoadFu
         validOptions_.addOption("lte", true, "Records must be less than or equal to this value");
         validOptions_.addOption("caching", true, "Number of rows scanners should cache");
         validOptions_.addOption("limit", true, "Per-region limit");
+        validOptions_.addOption("delim", true, "Column delimiter");
+        validOptions_.addOption("ignoreWhitespace", true, "Ignore spaces when parsing columns");
         validOptions_.addOption("caster", true, "Caster to use for converting values. A class name, " +
                 "HBaseBinaryConverter, or Utf8StorageConverter. For storage, casters must implement LoadStoreCaster.");
         validOptions_.addOption("noWAL", false, "Sets the write ahead to false for faster loading. To be used with extreme caution since this could result in data loss (see http://hbase.apache.org/book.html#perf.hbase.client.putwal).");
@@ -176,17 +182,17 @@ public class HBaseStorage extends LoadFu
      * provided columns.
      * 
      * @param columnList
-     *        columnlist that is a presented string delimited by space. To
-     *        retreive all columns in a column family <code>Foo</code>, specify
-     *        a column as either <code>Foo:</code> or <code>Foo:*</code>. To fetch
-     *        only columns in the CF that start with <I>bar</I>, specify
+     *        columnlist that is a presented string delimited by space and/or
+     *        commas. To retreive all columns in a column family <code>Foo</code>,
+     *        specify a column as either <code>Foo:</code> or <code>Foo:*</code>.
+     *        To fetch only columns in the CF that start with <I>bar</I>, specify
      *        <code>Foo:bar*</code>. The resulting tuple will always be the size
      *        of the number of tokens in <code>columnList</code>. Items in the
      *        tuple will be scalar values when a full column descriptor is
      *        specified, or a map of column descriptors to values when a column
      *        family is specified.
      *
-     * @throws ParseException when unale to parse arguments
+     * @throws ParseException when unable to parse arguments
      * @throws IOException 
      */
     public HBaseStorage(String columnList) throws ParseException, IOException {
@@ -203,6 +209,8 @@ public class HBaseStorage extends LoadFu
      * <li>-gte=minKeyVal
      * <li>-lte=maxKeyVal
      * <li>-limit=numRowsPerRegion max number of rows to retrieve per region
+     * <li>-delim=char delimiter to use when parsing column names (default is space or comma)
+     * <li>-ignoreWhitespace=(true|false) ignore spaces when parsing column names (default true)
      * <li>-caching=numRows  number of rows to cache (faster scans, more memory).
      * <li>-noWAL=(true|false) Sets the write ahead to false for faster loading.
      * To be used with extreme caution, since this could result in data loss
@@ -213,21 +221,32 @@ public class HBaseStorage extends LoadFu
      */
     public HBaseStorage(String columnList, String optString) throws ParseException, IOException {
         populateValidOptions();
-        String[] colNames = columnList.split(" ");
         String[] optsArr = optString.split(" ");
         try {
             configuredOptions_ = parser_.parse(validOptions_, optsArr);
         } catch (ParseException e) {
             HelpFormatter formatter = new HelpFormatter();
-            formatter.printHelp( "[-loadKey] [-gt] [-gte] [-lt] [-lte] [-columnPrefix] [-caching] [-caster] [-noWAL] [-limit]", validOptions_ );
+            formatter.printHelp( "[-loadKey] [-gt] [-gte] [-lt] [-lte] [-columnPrefix] [-caching] [-caster] [-noWAL] [-limit] [-delim] [-ignoreWhitespace]", validOptions_ );
             throw e;
         }
 
-        loadRowKey_ = configuredOptions_.hasOption("loadKey");  
-        for (String colName : colNames) {
-            columnInfo_.add(new ColumnInfo(colName));
+        loadRowKey_ = configuredOptions_.hasOption("loadKey");
+
+        delimiter_ = ",";
+        if (configuredOptions_.getOptionValue("delim") != null) {
+          delimiter_ = configuredOptions_.getOptionValue("delim");
+        }
+
+        ignoreWhitespace_ = true;
+        if (configuredOptions_.hasOption("ignoreWhitespace")) {
+          String value = configuredOptions_.getOptionValue("ignoreWhitespace");
+          if (!"true".equalsIgnoreCase(value)) {
+            ignoreWhitespace_ = false;
+          }
         }
 
+        columnInfo_ = parseColumnList(columnList, delimiter_, ignoreWhitespace_);
+
         m_conf = HBaseConfiguration.create();
         String defaultCaster = UDFContext.getUDFContext().getClientSystemProps().getProperty(CASTER_PROPERTY, STRING_CASTER);
         String casterOption = configuredOptions_.getOptionValue("caster", defaultCaster);
@@ -239,7 +258,7 @@ public class HBaseStorage extends LoadFu
             try {
               caster_ = (LoadCaster) PigContext.instantiateFuncFromSpec(casterOption);
             } catch (ClassCastException e) {
-                LOG.error("Congifured caster does not implement LoadCaster interface.");
+                LOG.error("Configured caster does not implement LoadCaster interface.");
                 throw new IOException(e);
             } catch (RuntimeException e) {
                 LOG.error("Configured caster class not found.", e);
@@ -254,6 +273,44 @@ public class HBaseStorage extends LoadFu
         initScan();	    
     }
 
+    /**
+     *
+     * @param columnList
+     * @param delimiter
+     * @param ignoreWhitespace
+     * @return
+     */
+    private List<ColumnInfo> parseColumnList(String columnList,
+                                             String delimiter,
+                                             boolean ignoreWhitespace) {
+        List<ColumnInfo> columnInfo = new ArrayList<ColumnInfo>();
+
+        // Default behavior is to allow combinations of spaces and delimiter
+        // which defaults to a comma. Setting to not ignore whitespace will
+        // include the whitespace in the columns names
+        String[] colNames = columnList.split(delimiter);
+        if(ignoreWhitespace) {
+            List<String> columns = new ArrayList<String>();
+
+            for (String colName : colNames) {
+                String[] subColNames = colName.split(" ");
+
+                for (String subColName : subColNames) {
+                    subColName = subColName.trim();
+                    if (subColName.length() > 0) columns.add(subColName);
+                }
+            }
+
+            colNames = columns.toArray(new String[columns.size()]);
+        }
+
+        for (String colName : colNames) {
+            columnInfo.add(new ColumnInfo(colName));
+        }
+
+        return columnInfo;
+    }
+
     private void initScan() {
         scan = new Scan();
         // Set filters, if any.
@@ -282,13 +339,13 @@ public class HBaseStorage extends LoadFu
                 // all column family filters roll up to one parent OR filter
                 if (allColumnFilters == null) {
                     allColumnFilters = new FilterList(FilterList.Operator.MUST_PASS_ONE);
-        }
+                }
 
                 if (LOG.isInfoEnabled()) {
                     LOG.info("Adding family:prefix filters with values " +
                         Bytes.toString(colInfo.getColumnFamily()) + COLON +
                         Bytes.toString(colInfo.getColumnPrefix()));
-    }
+                }
 
                 // each column family filter consists of a FamilyFilter AND a PrefixFilter
                 FilterList thisColumnFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL);
@@ -323,6 +380,17 @@ public class HBaseStorage extends LoadFu
         scan.setFilter(scanFilter);
     }
 
+  /**
+   * Returns the ColumnInfo list for so external objects can inspect it. This
+   * is available for unit testing. Ideally, the unit tests and the main source
+   * would each mirror the same package structure and this method could be package
+   * private.
+   * @return
+   */
+    public List<ColumnInfo> getColumnInfoList() {
+      return columnInfo_;
+    }
+
     @Override
     public Tuple getNext() throws IOException {
         try {
@@ -749,7 +817,7 @@ public class HBaseStorage extends LoadFu
      * Map being added to the tuple, while the last results in a scalar. The 3rd
      * form results in a prefix-filtered Map.
      */
-    private class ColumnInfo {
+    public class ColumnInfo {
 
         final String originalColumnName;  // always set
         final byte[] columnFamily; // always set

Added: pig/trunk/test/org/apache/pig/test/TestHBaseStorageParams.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestHBaseStorageParams.java?rev=1147434&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestHBaseStorageParams.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestHBaseStorageParams.java Sat Jul 16 15:16:13 2011
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.pig.test;
+
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.hadoop.hbase.HBaseStorage;
+import org.apache.pig.impl.util.UDFContext;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Properties;
+
+public class TestHBaseStorageParams {
+
+    private static final Log LOG = LogFactory.getLog(TestHBaseStorageParams.class);
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+      // This is needed by HBaseStorage
+      UDFContext.getUDFContext().setClientSystemProps(new Properties());
+    }
+
+    @Test
+    public void testColumnParsingWithSpaces1() throws IOException, ParseException {
+      HBaseStorage storage = new HBaseStorage("foo:a foo:b foo:c");
+      doColumnParseTest(storage, "foo:a", "foo:b", "foo:c");
+    }
+
+    @Test
+    public void testColumnParsingWithSpaces2() throws IOException, ParseException {
+      HBaseStorage storage = new HBaseStorage("foo:a foo:b  foo:c");
+      doColumnParseTest(storage, "foo:a", "foo:b", "foo:c");
+    }
+
+    @Test
+    public void testColumnParsingWithCommas1() throws IOException, ParseException {
+      HBaseStorage storage = new HBaseStorage("foo:a,foo:b,foo:c");
+      doColumnParseTest(storage, "foo:a", "foo:b", "foo:c");
+    }
+
+    @Test
+    public void testColumnParsingWithCommas2() throws IOException, ParseException {
+      HBaseStorage storage = new HBaseStorage("foo:a,  foo:b , foo:c");
+      doColumnParseTest(storage, "foo:a", "foo:b", "foo:c");
+    }
+
+    @Test
+    public void testColumnParsingWithDelim() throws IOException, ParseException {
+      HBaseStorage storage = new HBaseStorage("foo:a%foo:b % foo:c,d", "-delim %");
+      doColumnParseTest(storage, "foo:a", "foo:b", "foo:c,d");
+    }
+
+    @Test
+    public void testColumnParsingWithDelimWithSpaces()
+      throws IOException, ParseException {
+
+      HBaseStorage storage =
+        new HBaseStorage("foo:a%foo:b % foo:c,d", "-delim % -ignoreWhitespace false");
+      doColumnParseTest(storage, "foo:a", "foo:b ", " foo:c,d");
+    }
+
+    private void doColumnParseTest(HBaseStorage storage, String... names) {
+      Assert.assertEquals("Wrong column count",
+        names.length, storage.getColumnInfoList().size());
+
+      for (String name : names) {
+        String[] cfAndName = name.split(":");
+        HBaseStorage.ColumnInfo col = storage.getColumnInfoList().remove(0);
+
+        Assert.assertEquals("Wrong CF",
+          cfAndName[0], new String(col.getColumnFamily()));
+        Assert.assertEquals("Wrong column descriptor",
+          cfAndName[1], new String(col.getColumnName()));
+        Assert.assertNull("Column prefix should be null", col.getColumnPrefix());
+      }
+    }
+}