You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by jc...@apache.org on 2012/03/28 01:34:00 UTC

svn commit: r1306058 - in /pig/branches/branch-0.9: CHANGES.txt src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java

Author: jcoveney
Date: Tue Mar 27 23:34:00 2012
New Revision: 1306058

URL: http://svn.apache.org/viewvc?rev=1306058&view=rev
Log:
[0.9] PIG-2619: HBaseStorage constructs a Scan with cacheBlocks = false

Modified:
    pig/branches/branch-0.9/CHANGES.txt
    pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java

Modified: pig/branches/branch-0.9/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/CHANGES.txt?rev=1306058&r1=1306057&r2=1306058&view=diff
==============================================================================
--- pig/branches/branch-0.9/CHANGES.txt (original)
+++ pig/branches/branch-0.9/CHANGES.txt Tue Mar 27 23:34:00 2012
@@ -20,6 +20,10 @@ Pig Change Log
 
 Release 0.9.3 - Unreleased
 
+IMPROVEMENTS
+
+PIG-2619: HBaseStorage constructs a Scan with cacheBlocks = false
+
 BUG FIXES
 
 PIG-2540: [piggybank] AvroStorage can't read schema on amazon s3 in elastic mapreduce (rjurney via jcoveney)

Modified: pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1306058&r1=1306057&r2=1306058&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Tue Mar 27 23:34:00 2012
@@ -5,9 +5,9 @@
  * licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -117,10 +117,10 @@ import com.google.common.collect.Lists;
  * map to a column family name. In the above examples, the <code>friends</code>
  * column family data from <code>SampleTable</code> will be written to a
  * <code>buddies</code> column family in the <code>SampleTableCopy</code> table.
- * 
+ *
  */
 public class HBaseStorage extends LoadFunc implements StoreFuncInterface, LoadPushDown, OrderedLoadFunc {
-    
+
     private static final Log LOG = LogFactory.getLog(HBaseStorage.class);
 
     private final static String STRING_CASTER = "UTF8StorageConverter";
@@ -128,7 +128,7 @@ public class HBaseStorage extends LoadFu
     private final static String CASTER_PROPERTY = "pig.hbase.caster";
     private final static String ASTERISK = "*";
     private final static String COLON = ":";
-    
+
     private List<ColumnInfo> columnInfo_ = Lists.newArrayList();
     private HTable m_table;
     private Configuration m_conf;
@@ -155,11 +155,11 @@ public class HBaseStorage extends LoadFu
     private ResourceSchema schema_;
     private RequiredFieldList requiredFieldList;
 
-    private static void populateValidOptions() { 
+    private static void populateValidOptions() {
         validOptions_.addOption("loadKey", false, "Load Key");
         validOptions_.addOption("gt", true, "Records must be greater than this value " +
                 "(binary, double-slash-escaped)");
-        validOptions_.addOption("lt", true, "Records must be less than this value (binary, double-slash-escaped)");   
+        validOptions_.addOption("lt", true, "Records must be less than this value (binary, double-slash-escaped)");
         validOptions_.addOption("gte", true, "Records must be greater than or equal to this value");
         validOptions_.addOption("lte", true, "Records must be less than or equal to this value");
         validOptions_.addOption("caching", true, "Number of rows scanners should cache");
@@ -171,7 +171,7 @@ public class HBaseStorage extends LoadFu
     /**
      * Constructor. Construct a HBase Table LoadFunc and StoreFunc to load or store the cells of the
      * provided columns.
-     * 
+     *
      * @param columnList
      *        columnlist that is a presented string delimited by space. To
      *        retreive all columns in a column family <code>Foo</code>, specify
@@ -184,26 +184,26 @@ public class HBaseStorage extends LoadFu
      *        family is specified.
      *
      * @throws ParseException when unale to parse arguments
-     * @throws IOException 
+     * @throws IOException
      */
     public HBaseStorage(String columnList) throws ParseException, IOException {
         this(columnList,"");
     }
 
     /**
-     * Constructor. Construct a HBase Table LoadFunc and StoreFunc to load or store. 
+     * Constructor. Construct a HBase Table LoadFunc and StoreFunc to load or store.
      * @param columnList
      * @param optString Loader options. Known options:<ul>
      * <li>-loadKey=(true|false)  Load the row key as the first column
      * <li>-gt=minKeyVal
-     * <li>-lt=maxKeyVal 
+     * <li>-lt=maxKeyVal
      * <li>-gte=minKeyVal
      * <li>-lte=maxKeyVal
      * <li>-limit=numRowsPerRegion max number of rows to retrieve per region
      * <li>-caching=numRows  number of rows to cache (faster scans, more memory).
      * </ul>
-     * @throws ParseException 
-     * @throws IOException 
+     * @throws ParseException
+     * @throws IOException
      */
     public HBaseStorage(String columnList, String optString) throws ParseException, IOException {
         populateValidOptions();
@@ -217,7 +217,7 @@ public class HBaseStorage extends LoadFu
             throw e;
         }
 
-        loadRowKey_ = configuredOptions_.hasOption("loadKey");  
+        loadRowKey_ = configuredOptions_.hasOption("loadKey");
         for (String colName : colNames) {
             columnInfo_.add(new ColumnInfo(colName));
         }
@@ -244,11 +244,15 @@ public class HBaseStorage extends LoadFu
 
         caching_ = Integer.valueOf(configuredOptions_.getOptionValue("caching", "100"));
         limit_ = Long.valueOf(configuredOptions_.getOptionValue("limit", "-1"));
-        initScan();	    
+        initScan();
     }
 
     private void initScan() {
         scan = new Scan();
+
+        // Map-reduce jobs should not run with cacheBlocks
+        scan.setCacheBlocks(false);
+
         // Set filters, if any.
         if (configuredOptions_.hasOption("gt")) {
             gt_ = Bytes.toBytesBinary(Utils.slashisize(configuredOptions_.getOptionValue("gt")));
@@ -397,7 +401,7 @@ public class HBaseStorage extends LoadFu
     }
 
     @Override
-    public InputFormat getInputFormat() {      
+    public InputFormat getInputFormat() {
         TableInputFormat inputFormat = new HBaseTableIFBuilder()
         .withLimit(limit_)
         .withGt(gt_)
@@ -426,7 +430,7 @@ public class HBaseStorage extends LoadFu
         HBaseConfiguration.addHbaseResources(m_conf);
 
         // Make sure the HBase, ZooKeeper, and Guava jars get shipped.
-        TableMapReduceUtil.addDependencyJars(job.getConfiguration(), 
+        TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
             org.apache.hadoop.hbase.client.HTable.class,
             com.google.common.collect.Lists.class,
             org.apache.zookeeper.ZooKeeper.class);
@@ -501,24 +505,24 @@ public class HBaseStorage extends LoadFu
      }
 
     /**
-     * Set up the caster to use for reading values out of, and writing to, HBase. 
+     * Set up the caster to use for reading values out of, and writing to, HBase.
      */
     @Override
     public LoadCaster getLoadCaster() throws IOException {
         return caster_;
     }
-    
+
     /*
      * StoreFunc Methods
      * @see org.apache.pig.StoreFuncInterface#getOutputFormat()
      */
-    
+
     @Override
     public OutputFormat getOutputFormat() throws IOException {
         if (outputFormat == null) {
             this.outputFormat = new TableOutputFormat();
             HBaseConfiguration.addHbaseResources(m_conf);
-            this.outputFormat.setConf(m_conf);            
+            this.outputFormat.setConf(m_conf);
         }
         return outputFormat;
     }
@@ -545,10 +549,10 @@ public class HBaseStorage extends LoadFu
     @Override
     public void putNext(Tuple t) throws IOException {
         ResourceFieldSchema[] fieldSchemas = (schema_ == null) ? null : schema_.getFields();
-        Put put=new Put(objToBytes(t.get(0), 
+        Put put=new Put(objToBytes(t.get(0),
                 (fieldSchemas == null) ? DataType.findType(t.get(0)) : fieldSchemas[0].getType()));
         long ts=System.currentTimeMillis();
-        
+
         if (LOG.isDebugEnabled()) {
             for (ColumnInfo columnInfo : columnInfo_) {
                 LOG.debug("putNext -- col: " + columnInfo);
@@ -587,7 +591,7 @@ public class HBaseStorage extends LoadFu
             throw new IOException(e);
         }
     }
-    
+
     @SuppressWarnings("unchecked")
     private byte[] objToBytes(Object o, byte type) throws IOException {
         LoadStoreCaster caster = (LoadStoreCaster) caster_;
@@ -600,11 +604,11 @@ public class HBaseStorage extends LoadFu
         case DataType.FLOAT: return caster.toBytes((Float) o);
         case DataType.INTEGER: return caster.toBytes((Integer) o);
         case DataType.LONG: return caster.toBytes((Long) o);
-        
-        // The type conversion here is unchecked. 
+
+        // The type conversion here is unchecked.
         // Relying on DataType.findType to do the right thing.
         case DataType.MAP: return caster.toBytes((Map<String, Object>) o);
-        
+
         case DataType.NULL: return null;
         case DataType.TUPLE: return caster.toBytes((Tuple) o);
         case DataType.ERROR: throw new IOException("Unable to determine type of " + o.getClass());
@@ -646,7 +650,7 @@ public class HBaseStorage extends LoadFu
     /*
      * LoadPushDown Methods.
      */
-    
+
     @Override
     public List<OperatorSet> getFeatures() {
         return Arrays.asList(LoadPushDown.OperatorSet.PROJECTION);
@@ -698,7 +702,7 @@ public class HBaseStorage extends LoadFu
                 loadRowKey_ = false;
             projOffset = 0;
             }
-        
+
         for (int i = projOffset; i < requiredFields.size(); i++) {
             int fieldIndex = requiredFields.get(i).getIndex();
             newColumns.add(columnInfo_.get(fieldIndex - colOffset));