You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by jc...@apache.org on 2012/03/28 01:34:00 UTC
svn commit: r1306058 - in /pig/branches/branch-0.9: CHANGES.txt
src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
Author: jcoveney
Date: Tue Mar 27 23:34:00 2012
New Revision: 1306058
URL: http://svn.apache.org/viewvc?rev=1306058&view=rev
Log:
[0.9] PIG-2619: HBaseStorage constructs a Scan with cacheBlocks = false
Modified:
pig/branches/branch-0.9/CHANGES.txt
pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
Modified: pig/branches/branch-0.9/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/CHANGES.txt?rev=1306058&r1=1306057&r2=1306058&view=diff
==============================================================================
--- pig/branches/branch-0.9/CHANGES.txt (original)
+++ pig/branches/branch-0.9/CHANGES.txt Tue Mar 27 23:34:00 2012
@@ -20,6 +20,10 @@ Pig Change Log
Release 0.9.3 - Unreleased
+IMPROVEMENTS
+
+PIG-2619: HBaseStorage constructs a Scan with cacheBlocks = false
+
BUG FIXES
PIG-2540: [piggybank] AvroStorage can't read schema on amazon s3 in elastic mapreduce (rjurney via jcoveney)
Modified: pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1306058&r1=1306057&r2=1306058&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Tue Mar 27 23:34:00 2012
@@ -5,9 +5,9 @@
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -117,10 +117,10 @@ import com.google.common.collect.Lists;
* map to a column family name. In the above examples, the <code>friends</code>
* column family data from <code>SampleTable</code> will be written to a
* <code>buddies</code> column family in the <code>SampleTableCopy</code> table.
- *
+ *
*/
public class HBaseStorage extends LoadFunc implements StoreFuncInterface, LoadPushDown, OrderedLoadFunc {
-
+
private static final Log LOG = LogFactory.getLog(HBaseStorage.class);
private final static String STRING_CASTER = "UTF8StorageConverter";
@@ -128,7 +128,7 @@ public class HBaseStorage extends LoadFu
private final static String CASTER_PROPERTY = "pig.hbase.caster";
private final static String ASTERISK = "*";
private final static String COLON = ":";
-
+
private List<ColumnInfo> columnInfo_ = Lists.newArrayList();
private HTable m_table;
private Configuration m_conf;
@@ -155,11 +155,11 @@ public class HBaseStorage extends LoadFu
private ResourceSchema schema_;
private RequiredFieldList requiredFieldList;
- private static void populateValidOptions() {
+ private static void populateValidOptions() {
validOptions_.addOption("loadKey", false, "Load Key");
validOptions_.addOption("gt", true, "Records must be greater than this value " +
"(binary, double-slash-escaped)");
- validOptions_.addOption("lt", true, "Records must be less than this value (binary, double-slash-escaped)");
+ validOptions_.addOption("lt", true, "Records must be less than this value (binary, double-slash-escaped)");
validOptions_.addOption("gte", true, "Records must be greater than or equal to this value");
validOptions_.addOption("lte", true, "Records must be less than or equal to this value");
validOptions_.addOption("caching", true, "Number of rows scanners should cache");
@@ -171,7 +171,7 @@ public class HBaseStorage extends LoadFu
/**
* Constructor. Construct a HBase Table LoadFunc and StoreFunc to load or store the cells of the
* provided columns.
- *
+ *
* @param columnList
* columnlist that is a presented string delimited by space. To
* retreive all columns in a column family <code>Foo</code>, specify
@@ -184,26 +184,26 @@ public class HBaseStorage extends LoadFu
* family is specified.
*
* @throws ParseException when unale to parse arguments
- * @throws IOException
+ * @throws IOException
*/
public HBaseStorage(String columnList) throws ParseException, IOException {
this(columnList,"");
}
/**
- * Constructor. Construct a HBase Table LoadFunc and StoreFunc to load or store.
+ * Constructor. Construct a HBase Table LoadFunc and StoreFunc to load or store.
* @param columnList
* @param optString Loader options. Known options:<ul>
* <li>-loadKey=(true|false) Load the row key as the first column
* <li>-gt=minKeyVal
- * <li>-lt=maxKeyVal
+ * <li>-lt=maxKeyVal
* <li>-gte=minKeyVal
* <li>-lte=maxKeyVal
* <li>-limit=numRowsPerRegion max number of rows to retrieve per region
* <li>-caching=numRows number of rows to cache (faster scans, more memory).
* </ul>
- * @throws ParseException
- * @throws IOException
+ * @throws ParseException
+ * @throws IOException
*/
public HBaseStorage(String columnList, String optString) throws ParseException, IOException {
populateValidOptions();
@@ -217,7 +217,7 @@ public class HBaseStorage extends LoadFu
throw e;
}
- loadRowKey_ = configuredOptions_.hasOption("loadKey");
+ loadRowKey_ = configuredOptions_.hasOption("loadKey");
for (String colName : colNames) {
columnInfo_.add(new ColumnInfo(colName));
}
@@ -244,11 +244,15 @@ public class HBaseStorage extends LoadFu
caching_ = Integer.valueOf(configuredOptions_.getOptionValue("caching", "100"));
limit_ = Long.valueOf(configuredOptions_.getOptionValue("limit", "-1"));
- initScan();
+ initScan();
}
private void initScan() {
scan = new Scan();
+
+ // Map-reduce jobs should not run with cacheBlocks
+ scan.setCacheBlocks(false);
+
// Set filters, if any.
if (configuredOptions_.hasOption("gt")) {
gt_ = Bytes.toBytesBinary(Utils.slashisize(configuredOptions_.getOptionValue("gt")));
@@ -397,7 +401,7 @@ public class HBaseStorage extends LoadFu
}
@Override
- public InputFormat getInputFormat() {
+ public InputFormat getInputFormat() {
TableInputFormat inputFormat = new HBaseTableIFBuilder()
.withLimit(limit_)
.withGt(gt_)
@@ -426,7 +430,7 @@ public class HBaseStorage extends LoadFu
HBaseConfiguration.addHbaseResources(m_conf);
// Make sure the HBase, ZooKeeper, and Guava jars get shipped.
- TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
+ TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
org.apache.hadoop.hbase.client.HTable.class,
com.google.common.collect.Lists.class,
org.apache.zookeeper.ZooKeeper.class);
@@ -501,24 +505,24 @@ public class HBaseStorage extends LoadFu
}
/**
- * Set up the caster to use for reading values out of, and writing to, HBase.
+ * Set up the caster to use for reading values out of, and writing to, HBase.
*/
@Override
public LoadCaster getLoadCaster() throws IOException {
return caster_;
}
-
+
/*
* StoreFunc Methods
* @see org.apache.pig.StoreFuncInterface#getOutputFormat()
*/
-
+
@Override
public OutputFormat getOutputFormat() throws IOException {
if (outputFormat == null) {
this.outputFormat = new TableOutputFormat();
HBaseConfiguration.addHbaseResources(m_conf);
- this.outputFormat.setConf(m_conf);
+ this.outputFormat.setConf(m_conf);
}
return outputFormat;
}
@@ -545,10 +549,10 @@ public class HBaseStorage extends LoadFu
@Override
public void putNext(Tuple t) throws IOException {
ResourceFieldSchema[] fieldSchemas = (schema_ == null) ? null : schema_.getFields();
- Put put=new Put(objToBytes(t.get(0),
+ Put put=new Put(objToBytes(t.get(0),
(fieldSchemas == null) ? DataType.findType(t.get(0)) : fieldSchemas[0].getType()));
long ts=System.currentTimeMillis();
-
+
if (LOG.isDebugEnabled()) {
for (ColumnInfo columnInfo : columnInfo_) {
LOG.debug("putNext -- col: " + columnInfo);
@@ -587,7 +591,7 @@ public class HBaseStorage extends LoadFu
throw new IOException(e);
}
}
-
+
@SuppressWarnings("unchecked")
private byte[] objToBytes(Object o, byte type) throws IOException {
LoadStoreCaster caster = (LoadStoreCaster) caster_;
@@ -600,11 +604,11 @@ public class HBaseStorage extends LoadFu
case DataType.FLOAT: return caster.toBytes((Float) o);
case DataType.INTEGER: return caster.toBytes((Integer) o);
case DataType.LONG: return caster.toBytes((Long) o);
-
- // The type conversion here is unchecked.
+
+ // The type conversion here is unchecked.
// Relying on DataType.findType to do the right thing.
case DataType.MAP: return caster.toBytes((Map<String, Object>) o);
-
+
case DataType.NULL: return null;
case DataType.TUPLE: return caster.toBytes((Tuple) o);
case DataType.ERROR: throw new IOException("Unable to determine type of " + o.getClass());
@@ -646,7 +650,7 @@ public class HBaseStorage extends LoadFu
/*
* LoadPushDown Methods.
*/
-
+
@Override
public List<OperatorSet> getFeatures() {
return Arrays.asList(LoadPushDown.OperatorSet.PROJECTION);
@@ -698,7 +702,7 @@ public class HBaseStorage extends LoadFu
loadRowKey_ = false;
projOffset = 0;
}
-
+
for (int i = projOffset; i < requiredFields.size(); i++) {
int fieldIndex = requiredFields.get(i).getIndex();
newColumns.add(columnInfo_.get(fieldIndex - colOffset));