You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by to...@apache.org on 2011/11/02 23:05:40 UTC
svn commit: r1196854 - in /incubator/hcatalog/trunk: ./
storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/
storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/
Author: toffer
Date: Wed Nov 2 23:05:39 2011
New Revision: 1196854
URL: http://svn.apache.org/viewvc?rev=1196854&view=rev
Log:
HCATALOG-143 Projection pushdown for HBaseInputStorageDriver (avandana via toffer)
Modified:
incubator/hcatalog/trunk/CHANGES.txt
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputStorageDriver.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseSerDeResultConverter.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ResultConverter.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseSerDeResultConverter.java
Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1196854&r1=1196853&r2=1196854&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Wed Nov 2 23:05:39 2011
@@ -23,6 +23,8 @@ Trunk (unreleased changes)
INCOMPATIBLE CHANGES
NEW FEATURES
+ HCAT-143. Projection pushdown for HBaseInputStorageDriver (avandana via toffer)
+
HCAT-121. TextStorageOutputDriver for Pig (daijyc via hashutosh)
HCAT-129. HBase Storage Driver Test doesn't use unique test dir for warehouse (toffer via khorgath)
Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputStorageDriver.java?rev=1196854&r1=1196853&r2=1196854&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputStorageDriver.java (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputStorageDriver.java Wed Nov 2 23:05:39 2011
@@ -52,49 +52,45 @@ public class HBaseInputStorageDriver ext
private HCatSchema outputColSchema;
private HCatSchema dataSchema;
private Configuration jobConf;
-
+ private String scanColumns;
+
/*
* @param JobContext
- *
+ *
* @param hcatProperties
- *
+ *
* @see org.apache.hcatalog.mapreduce.HCatInputStorageDriver
* #initialize(org.apache.hadoop.mapreduce.JobContext, java.util.Properties)
*/
@Override
- public void initialize(JobContext context, Properties hcatProperties) {
+ public void initialize(JobContext context, Properties hcatProperties) throws IOException {
jobConf = context.getConfiguration();
- try {
- String jobString = context.getConfiguration().get(
- HCatConstants.HCAT_KEY_JOB_INFO);
- if (jobString == null) {
- throw new IOException(
- "InputJobInfo information not found in JobContext. "
- + "HCatInputFormat.setInput() not called?");
- }
- InputJobInfo jobInfo = (InputJobInfo) HCatUtil
- .deserialize(jobString);
- tableInfo = jobInfo.getTableInfo();
- dataSchema = tableInfo.getDataColumns();
- List<FieldSchema> fields = HCatUtil
- .getFieldSchemaList(outputColSchema.getFields());
- hcatProperties.setProperty(Constants.LIST_COLUMNS,
- MetaStoreUtils.getColumnNamesFromFieldSchema(fields));
- hcatProperties.setProperty(Constants.LIST_COLUMN_TYPES,
- MetaStoreUtils.getColumnTypesFromFieldSchema(fields));
- converter = new HBaseSerDeResultConverter(dataSchema,
- outputColSchema, hcatProperties);
- } catch (Exception e) {
- e.printStackTrace();
+ String jobString = jobConf.get(HCatConstants.HCAT_KEY_JOB_INFO);
+ if (jobString == null) {
+ throw new IOException(
+ "InputJobInfo information not found in JobContext. "
+ + "HCatInputFormat.setInput() not called?");
}
-
+ InputJobInfo jobInfo = (InputJobInfo) HCatUtil.deserialize(jobString);
+ tableInfo = jobInfo.getTableInfo();
+ dataSchema = tableInfo.getDataColumns();
+ List<FieldSchema> fields = HCatUtil.getFieldSchemaList(dataSchema
+ .getFields());
+ hcatProperties.setProperty(Constants.LIST_COLUMNS,
+ MetaStoreUtils.getColumnNamesFromFieldSchema(fields));
+ hcatProperties.setProperty(Constants.LIST_COLUMN_TYPES,
+ MetaStoreUtils.getColumnTypesFromFieldSchema(fields));
+ converter = new HBaseSerDeResultConverter(dataSchema, outputColSchema,
+ hcatProperties);
+ scanColumns = converter.getHBaseScanColumns();
+
}
-
+
/*
* @param hcatProperties
- *
+ *
* @return InputFormat
- *
+ *
* @see org.apache.hcatalog.mapreduce.HCatInputStorageDriver
* #getInputFormat(java.util.Properties)
*/
@@ -103,22 +99,23 @@ public class HBaseInputStorageDriver ext
Properties hcatProperties) {
HBaseInputFormat tableInputFormat = new HBaseInputFormat();
jobConf.set(TableInputFormat.INPUT_TABLE, tableInfo.getTableName());
+ jobConf.set(TableInputFormat.SCAN_COLUMNS, scanColumns);
tableInputFormat.setConf(jobConf);
// TODO: Make the caching configurable by the user
tableInputFormat.getScan().setCaching(200);
tableInputFormat.getScan().setCacheBlocks(false);
return tableInputFormat;
}
-
+
/*
* @param baseKey
- *
+ *
* @param baseValue
- *
+ *
* @return HCatRecord
- *
+ *
* @throws IOException
- *
+ *
* @see
* org.apache.hcatalog.mapreduce.HCatInputStorageDriver#convertToHCatRecord
* (org.apache.hadoop.io.WritableComparable, org.apache.hadoop.io.Writable)
@@ -128,14 +125,14 @@ public class HBaseInputStorageDriver ext
Writable baseValue) throws IOException {
return this.converter.convert((Result) baseValue);
}
-
+
/*
* @param jobContext
- *
+ *
* @param howlSchema
- *
+ *
* @throws IOException
- *
+ *
* @see org.apache.hcatalog.mapreduce.HCatInputStorageDriver#
* setOutputSchema(org.apache.hadoop.mapreduce.JobContext,
* org.apache.hcatalog.data.schema.HCatSchema)
@@ -143,16 +140,16 @@ public class HBaseInputStorageDriver ext
@Override
public void setOutputSchema(JobContext jobContext, HCatSchema howlSchema)
throws IOException {
- outputColSchema = howlSchema;
+ this.outputColSchema = howlSchema;
}
-
+
/*
* @param jobContext
- *
+ *
* @param partitionValues
- *
+ *
* @throws IOException
- *
+ *
* @see org.apache.hcatalog.mapreduce.HCatInputStorageDriver
* #setPartitionValues(org.apache.hadoop.mapreduce.JobContext,
* java.util.Map)
@@ -161,14 +158,14 @@ public class HBaseInputStorageDriver ext
public void setPartitionValues(JobContext jobContext,
Map<String, String> partitionValues) throws IOException {
}
-
+
/*
* @param jobContext
- *
+ *
* @param hcatSchema
- *
+ *
* @throws IOException
- *
+ *
* @see org.apache.hcatalog.mapreduce.HCatInputStorageDriver
* #setOriginalSchema(org.apache.hadoop.mapreduce.JobContext,
* org.apache.hcatalog.data.schema.HCatSchema)
Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseSerDeResultConverter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseSerDeResultConverter.java?rev=1196854&r1=1196853&r2=1196854&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseSerDeResultConverter.java (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseSerDeResultConverter.java Wed Nov 2 23:05:39 2011
@@ -63,6 +63,7 @@ class HBaseSerDeResultConverter implemen
private HCatSchema outputSchema;
private StructObjectInspector hCatRecordOI;
private StructObjectInspector lazyHBaseRowOI;
+ private String hbaseColumnMapping;
private final Long outputVersion;
/**
@@ -75,8 +76,8 @@ class HBaseSerDeResultConverter implemen
HCatSchema outputSchema,
Properties hcatProperties) throws IOException {
- hcatProperties.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING,
- hcatProperties.getProperty(HBaseConstants.PROPERTY_COLUMN_MAPPING_KEY));
+ hbaseColumnMapping = hcatProperties.getProperty(HBaseConstants.PROPERTY_COLUMN_MAPPING_KEY);
+ hcatProperties.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING,hbaseColumnMapping);
if(hcatProperties.containsKey(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY))
outputVersion = Long.parseLong(hcatProperties.getProperty(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY));
@@ -264,4 +265,44 @@ class HBaseSerDeResultConverter implemen
throw new IOException("Unknown field schema type");
}
}
+
+ public String getHBaseScanColumns() throws IOException {
+ StringBuilder sb = new StringBuilder();
+ if(hbaseColumnMapping == null){
+ throw new IOException("HBase column mapping found to be null.");
+ }
+
+ List<String> outputFieldNames = this.outputSchema.getFieldNames();
+ List<Integer> outputColumnMapping = new ArrayList<Integer>();
+ for(String fieldName: outputFieldNames){
+ int position = schema.getPosition(fieldName);
+ outputColumnMapping.add(position);
+ }
+
+ try {
+ List<String> columnFamilies = new ArrayList<String>();
+ List<String> columnQualifiers = new ArrayList<String>();
+ HBaseSerDe.parseColumnMapping(hbaseColumnMapping, columnFamilies, null, columnQualifiers, null);
+ for(int i = 0; i < outputColumnMapping.size(); i++){
+ int cfIndex = outputColumnMapping.get(i);
+ String cf = columnFamilies.get(cfIndex);
+ // We skip the key column.
+ if (cf.equals(HBaseSerDe.HBASE_KEY_COL) == false) {
+ String qualifier = columnQualifiers.get(i);
+ sb.append(cf);
+ sb.append(":");
+ if (qualifier != null) {
+ sb.append(qualifier);
+ }
+ sb.append(" ");
+ }
+ }
+
+ } catch (SerDeException e) {
+
+ throw new IOException(e);
+ }
+
+ return sb.toString();
+ }
}
Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ResultConverter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ResultConverter.java?rev=1196854&r1=1196853&r2=1196854&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ResultConverter.java (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ResultConverter.java Wed Nov 2 23:05:39 2011
@@ -47,4 +47,11 @@ interface ResultConverter {
*/
HCatRecord convert(Result result) throws IOException;
+ /**
+ * Returns the hbase columns that are required for the scan.
+ * @return String containing hbase columns delimited by space.
+ * @throws IOException
+ */
+ String getHBaseScanColumns() throws IOException;
+
}
Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java?rev=1196854&r1=1196853&r2=1196854&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java Wed Nov 2 23:05:39 2011
@@ -31,7 +31,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hive.hbase.HBaseSerDe;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
@@ -59,12 +58,11 @@ import org.apache.hcatalog.mapreduce.Inp
import org.junit.Test;
public class TestHBaseInputStorageDriver extends SkeletonHBaseTest {
-
+
private final byte[] FAMILY = Bytes.toBytes("testFamily");
private final byte[] QUALIFIER1 = Bytes.toBytes("testQualifier1");
private final byte[] QUALIFIER2 = Bytes.toBytes("testQualifier2");
- private final String tableName = "mytesttable";
-
+
List<Put> generatePuts(int num) {
List<Put> myPuts = new ArrayList<Put>();
for (int i = 0; i < num; i++) {
@@ -77,16 +75,16 @@ public class TestHBaseInputStorageDriver
}
return myPuts;
}
-
+
private void registerHBaseTable(String tableName) throws Exception {
-
+
String databaseName = MetaStoreUtils.DEFAULT_DATABASE_NAME;
HiveMetaStoreClient client = getCluster().getHiveMetaStoreClient();
try {
client.dropTable(databaseName, tableName);
} catch (Exception e) {
} // can fail with NoSuchObjectException
-
+
Table tbl = new Table();
tbl.setDbName(databaseName);
tbl.setTableName(tableName);
@@ -101,7 +99,7 @@ public class TestHBaseInputStorageDriver
tableParams.put(Constants.SERIALIZATION_FORMAT, "9");
tableParams.put(Constants.SERIALIZATION_NULL_FORMAT, "NULL");
tbl.setParameters(tableParams);
-
+
StorageDescriptor sd = new StorageDescriptor();
sd.setCols(HCatUtil.getFieldSchemaList(getSchema().getFields()));
sd.setBucketCols(new ArrayList<String>(3));
@@ -113,21 +111,21 @@ public class TestHBaseInputStorageDriver
sd.getSerdeInfo().setSerializationLib(HBaseSerDe.class.getName());
sd.setInputFormat(HBaseInputFormat.class.getName());
sd.setOutputFormat("NotRequired");
-
+
tbl.setSd(sd);
client.createTable(tbl);
-
+
}
-
- public void populateTable() throws IOException {
+
+ public void populateTable(String tableName) throws IOException {
List<Put> myPuts = generatePuts(10);
HTable table = new HTable(getHbaseConf(), Bytes.toBytes(tableName));
table.put(myPuts);
}
-
+
@Test
public void TestHBaseTableReadMR() throws Exception {
-
+ String tableName = "testtableone";
Configuration conf = new Configuration();
// include hbase config in conf file
for (Map.Entry<String, String> el : getHbaseConf()) {
@@ -135,14 +133,14 @@ public class TestHBaseInputStorageDriver
conf.set(el.getKey(), el.getValue());
}
}
-
+
conf.set(HCatConstants.HCAT_KEY_HIVE_CONF,
HCatUtil.serialize(getHiveConf().getAllProperties()));
-
+
// create Hbase table using admin
createTable(tableName, new String[] { "testFamily" });
registerHBaseTable(tableName);
- populateTable();
+ populateTable(tableName);
// output settings
Path outputDir = new Path(getTestDir(), "mapred/testHbaseTableMRRead");
FileSystem fs = getFileSystem();
@@ -153,14 +151,10 @@ public class TestHBaseInputStorageDriver
Job job = new Job(conf, "hbase-mr-read-test");
job.setJarByClass(this.getClass());
job.setMapperClass(MapReadHTable.class);
-
- job.getConfiguration().set(TableInputFormat.INPUT_TABLE, tableName);
-
job.setInputFormatClass(HCatInputFormat.class);
InputJobInfo inputJobInfo = InputJobInfo.create(
MetaStoreUtils.DEFAULT_DATABASE_NAME, tableName, null, null,
null);
- HCatInputFormat.setOutputSchema(job, getSchema());
HCatInputFormat.setInput(job, inputJobInfo);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, outputDir);
@@ -172,13 +166,60 @@ public class TestHBaseInputStorageDriver
assertTrue(job.waitForCompletion(true));
assertTrue(MapReadHTable.error == false);
}
-
- public static class MapReadHTable
+
+ @Test
+ public void TestHBaseTableProjectionReadMR() throws Exception {
+
+ String tableName = "testtabletwo";
+ Configuration conf = new Configuration();
+ // include hbase config in conf file
+ for (Map.Entry<String, String> el : getHbaseConf()) {
+ if (el.getKey().startsWith("hbase.")) {
+ conf.set(el.getKey(), el.getValue());
+ }
+ }
+
+ conf.set(HCatConstants.HCAT_KEY_HIVE_CONF,
+ HCatUtil.serialize(getHiveConf().getAllProperties()));
+
+ // create Hbase table using admin
+ createTable(tableName, new String[] { "testFamily" });
+ registerHBaseTable(tableName);
+ populateTable(tableName);
+ // output settings
+ Path outputDir = new Path(getTestDir(), "mapred/testHBaseTableProjectionReadMR");
+ FileSystem fs = getFileSystem();
+ if (fs.exists(outputDir)) {
+ fs.delete(outputDir, true);
+ }
+ // create job
+ Job job = new Job(conf, "hbase-column-projection");
+ job.setJarByClass(this.getClass());
+ job.setMapperClass(MapReadProjHTable.class);
+ job.setInputFormatClass(HCatInputFormat.class);
+ InputJobInfo inputJobInfo = InputJobInfo.create(
+ MetaStoreUtils.DEFAULT_DATABASE_NAME, tableName, null, null,
+ null);
+ HCatInputFormat.setOutputSchema(job, getProjectionSchema());
+ HCatInputFormat.setInput(job, inputJobInfo);
+ job.setOutputFormatClass(TextOutputFormat.class);
+ TextOutputFormat.setOutputPath(job, outputDir);
+ job.setMapOutputKeyClass(BytesWritable.class);
+ job.setMapOutputValueClass(Text.class);
+ job.setOutputKeyClass(BytesWritable.class);
+ job.setOutputValueClass(Text.class);
+ job.setNumReduceTasks(0);
+ assertTrue(job.waitForCompletion(true));
+ assertTrue(MapReadHTable.error == false);
+ }
+
+
+ static class MapReadHTable
extends
Mapper<ImmutableBytesWritable, HCatRecord, WritableComparable, Text> {
-
+
static boolean error = false;
-
+
@Override
public void map(ImmutableBytesWritable key, HCatRecord value,
Context context) throws IOException, InterruptedException {
@@ -186,15 +227,34 @@ public class TestHBaseInputStorageDriver
&& (value.get(0).toString()).startsWith("testRow")
&& (value.get(1).toString()).startsWith("testQualifier1")
&& (value.get(2).toString()).startsWith("testQualifier2");
-
+
if (correctValues == false) {
error = true;
}
}
}
-
+
+ static class MapReadProjHTable
+ extends
+ Mapper<ImmutableBytesWritable, HCatRecord, WritableComparable, Text> {
+
+ static boolean error = false;
+
+ @Override
+ public void map(ImmutableBytesWritable key, HCatRecord value,
+ Context context) throws IOException, InterruptedException {
+ boolean correctValues = (value.size() == 2)
+ && (value.get(0).toString()).startsWith("testRow")
+ && (value.get(1).toString()).startsWith("testQualifier1");
+
+ if (correctValues == false) {
+ error = true;
+ }
+ }
+ }
+
private HCatSchema getSchema() throws HCatException {
-
+
HCatSchema schema = new HCatSchema(new ArrayList<HCatFieldSchema>());
schema.append(new HCatFieldSchema("key", HCatFieldSchema.Type.STRING,
""));
@@ -204,4 +264,14 @@ public class TestHBaseInputStorageDriver
HCatFieldSchema.Type.STRING, ""));
return schema;
}
+
+ private HCatSchema getProjectionSchema() throws HCatException {
+
+ HCatSchema schema = new HCatSchema(new ArrayList<HCatFieldSchema>());
+ schema.append(new HCatFieldSchema("key", HCatFieldSchema.Type.STRING,
+ ""));
+ schema.append(new HCatFieldSchema("testqualifier1",
+ HCatFieldSchema.Type.STRING, ""));
+ return schema;
+ }
}
Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseSerDeResultConverter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseSerDeResultConverter.java?rev=1196854&r1=1196853&r2=1196854&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseSerDeResultConverter.java (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseSerDeResultConverter.java Wed Nov 2 23:05:39 2011
@@ -39,6 +39,7 @@ import java.util.Map;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
/**
* Test HBaseSerdeResultConverter by manually creating records to convert to and from HBase objects
@@ -175,4 +176,19 @@ public class TestHBaseSerDeResultConvert
put.get(Bytes.toBytes("my_family2"),
Bytes.toBytes("two")).get(0).getTimestamp());
}
+
+ @Test
+ public void testScanColumns() throws IOException{
+ HCatSchema schema = createHCatSchema();
+ HBaseSerDeResultConverter converter = new HBaseSerDeResultConverter(schema,
+ null,
+ createProperties());
+
+ String result = converter.getHBaseScanColumns();
+ String scanColumns = "my_family:my_qualifier1 my_family:my_qualifier2 my_family2: ";
+
+ assertTrue(scanColumns.equals(result));
+
+
+ }
}