You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/09/12 03:21:29 UTC
svn commit: r1522098 [5/30] - in /hive/branches/vectorization: ./
beeline/src/test/org/apache/hive/beeline/src/test/ bin/ bin/ext/
common/src/java/org/apache/hadoop/hive/common/
common/src/java/org/apache/hadoop/hive/conf/ conf/ contrib/src/java/org/ap...
Modified: hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java (original)
+++ hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java Thu Sep 12 01:21:10 2013
@@ -54,19 +54,19 @@ import org.apache.hadoop.mapred.InputFor
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hcatalog.common.HCatConstants;
-import org.apache.hcatalog.common.HCatUtil;
-import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hive.hcatalog.common.HCatConstants;
+import org.apache.hive.hcatalog.common.HCatUtil;
+import org.apache.hive.hcatalog.data.schema.HCatSchema;
import org.apache.hcatalog.hbase.HBaseBulkOutputFormat.HBaseBulkOutputCommitter;
import org.apache.hcatalog.hbase.HBaseDirectOutputFormat.HBaseDirectOutputCommitter;
import org.apache.hcatalog.hbase.snapshot.RevisionManager;
import org.apache.hcatalog.hbase.snapshot.RevisionManagerConfiguration;
import org.apache.hcatalog.hbase.snapshot.Transaction;
-import org.apache.hcatalog.mapreduce.HCatOutputFormat;
-import org.apache.hcatalog.mapreduce.HCatTableInfo;
-import org.apache.hcatalog.mapreduce.InputJobInfo;
-import org.apache.hcatalog.mapreduce.OutputJobInfo;
-import org.apache.hcatalog.mapreduce.HCatStorageHandler;
+import org.apache.hive.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.hive.hcatalog.mapreduce.HCatTableInfo;
+import org.apache.hive.hcatalog.mapreduce.InputJobInfo;
+import org.apache.hive.hcatalog.mapreduce.OutputJobInfo;
+import org.apache.hive.hcatalog.mapreduce.HCatStorageHandler;
import org.apache.thrift.TBase;
import org.apache.zookeeper.ZooKeeper;
@@ -80,531 +80,531 @@ import com.google.common.util.concurrent
*/
public class HBaseHCatStorageHandler extends HCatStorageHandler implements HiveMetaHook, Configurable {
- public final static String DEFAULT_PREFIX = "default.";
- private final static String PROPERTY_INT_OUTPUT_LOCATION = "hcat.hbase.mapreduce.intermediateOutputLocation";
+ public final static String DEFAULT_PREFIX = "default.";
+ private final static String PROPERTY_INT_OUTPUT_LOCATION = "hcat.hbase.mapreduce.intermediateOutputLocation";
- private Configuration hbaseConf;
- private Configuration jobConf;
- private HBaseAdmin admin;
-
- @Override
- public void configureInputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
- // Populate jobProperties with input table name, table columns, RM snapshot,
- // hbase-default.xml and hbase-site.xml
- Map<String, String> tableJobProperties = tableDesc.getJobProperties();
- String jobString = tableJobProperties.get(HCatConstants.HCAT_KEY_JOB_INFO);
- try {
- InputJobInfo inputJobInfo = (InputJobInfo) HCatUtil.deserialize(jobString);
- HCatTableInfo tableInfo = inputJobInfo.getTableInfo();
- String qualifiedTableName = HBaseHCatStorageHandler.getFullyQualifiedHBaseTableName(tableInfo);
- jobProperties.put(TableInputFormat.INPUT_TABLE, qualifiedTableName);
-
- Configuration jobConf = getJobConf();
- addResources(jobConf, jobProperties);
- JobConf copyOfConf = new JobConf(jobConf);
- HBaseConfiguration.addHbaseResources(copyOfConf);
- //Getting hbase delegation token in getInputSplits does not work with PIG. So need to
- //do it here
- if (jobConf instanceof JobConf) { //Should be the case
- HBaseUtil.addHBaseDelegationToken(copyOfConf);
- ((JobConf) jobConf).getCredentials().addAll(copyOfConf.getCredentials());
- }
-
- String outputSchema = jobConf.get(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA);
- jobProperties.put(TableInputFormat.SCAN_COLUMNS, getScanColumns(tableInfo, outputSchema));
-
- String serSnapshot = (String) inputJobInfo.getProperties().get(
- HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY);
- if (serSnapshot == null) {
- HCatTableSnapshot snapshot =
- HBaseRevisionManagerUtil.createSnapshot(
- RevisionManagerConfiguration.create(copyOfConf),
- qualifiedTableName, tableInfo);
- jobProperties.put(HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY,
- HCatUtil.serialize(snapshot));
- }
-
- //This adds it directly to the jobConf. Setting in jobProperties does not get propagated
- //to JobConf as of now as the jobProperties is maintained per partition
- //TODO: Remove when HCAT-308 is fixed
- addOutputDependencyJars(jobConf);
- jobProperties.put("tmpjars", jobConf.get("tmpjars"));
+ private Configuration hbaseConf;
+ private Configuration jobConf;
+ private HBaseAdmin admin;
+
+ @Override
+ public void configureInputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
+ // Populate jobProperties with input table name, table columns, RM snapshot,
+ // hbase-default.xml and hbase-site.xml
+ Map<String, String> tableJobProperties = tableDesc.getJobProperties();
+ String jobString = tableJobProperties.get(HCatConstants.HCAT_KEY_JOB_INFO);
+ try {
+ InputJobInfo inputJobInfo = (InputJobInfo) HCatUtil.deserialize(jobString);
+ HCatTableInfo tableInfo = inputJobInfo.getTableInfo();
+ String qualifiedTableName = HBaseHCatStorageHandler.getFullyQualifiedHBaseTableName(tableInfo);
+ jobProperties.put(TableInputFormat.INPUT_TABLE, qualifiedTableName);
+
+ Configuration jobConf = getJobConf();
+ addResources(jobConf, jobProperties);
+ JobConf copyOfConf = new JobConf(jobConf);
+ HBaseConfiguration.addHbaseResources(copyOfConf);
+ //Getting hbase delegation token in getInputSplits does not work with PIG. So need to
+ //do it here
+ if (jobConf instanceof JobConf) { //Should be the case
+ HBaseUtil.addHBaseDelegationToken(copyOfConf);
+ ((JobConf) jobConf).getCredentials().addAll(copyOfConf.getCredentials());
+ }
+
+ String outputSchema = jobConf.get(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA);
+ jobProperties.put(TableInputFormat.SCAN_COLUMNS, getScanColumns(tableInfo, outputSchema));
+
+ String serSnapshot = (String) inputJobInfo.getProperties().get(
+ HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY);
+ if (serSnapshot == null) {
+ HCatTableSnapshot snapshot =
+ HBaseRevisionManagerUtil.createSnapshot(
+ RevisionManagerConfiguration.create(copyOfConf),
+ qualifiedTableName, tableInfo);
+ jobProperties.put(HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY,
+ HCatUtil.serialize(snapshot));
+ }
+
+ //This adds it directly to the jobConf. Setting in jobProperties does not get propagated
+ //to JobConf as of now as the jobProperties is maintained per partition
+ //TODO: Remove when HCAT-308 is fixed
+ addOutputDependencyJars(jobConf);
+ jobProperties.put("tmpjars", jobConf.get("tmpjars"));
+
+ } catch (IOException e) {
+ throw new IllegalStateException("Error while configuring job properties", e);
+ }
+ }
+
+ @Override
+ public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
+ // Populate jobProperties with output table name, hbase-default.xml, hbase-site.xml, OutputJobInfo
+ // Populate RM transaction in OutputJobInfo
+ // In case of bulk mode, populate intermediate output location
+ Map<String, String> tableJobProperties = tableDesc.getJobProperties();
+ String jobString = tableJobProperties.get(HCatConstants.HCAT_KEY_OUTPUT_INFO);
+ try {
+ OutputJobInfo outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(jobString);
+ HCatTableInfo tableInfo = outputJobInfo.getTableInfo();
+ String qualifiedTableName = HBaseHCatStorageHandler.getFullyQualifiedHBaseTableName(tableInfo);
+ jobProperties.put(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, qualifiedTableName);
+ jobProperties.put(TableOutputFormat.OUTPUT_TABLE, qualifiedTableName);
+
+ Configuration jobConf = getJobConf();
+ addResources(jobConf, jobProperties);
+
+ Configuration copyOfConf = new Configuration(jobConf);
+ HBaseConfiguration.addHbaseResources(copyOfConf);
+
+ String txnString = outputJobInfo.getProperties().getProperty(
+ HBaseConstants.PROPERTY_WRITE_TXN_KEY);
+ Transaction txn = null;
+ if (txnString == null) {
+ txn = HBaseRevisionManagerUtil.beginWriteTransaction(qualifiedTableName, tableInfo,
+ RevisionManagerConfiguration.create(copyOfConf));
+ String serializedTxn = HCatUtil.serialize(txn);
+ outputJobInfo.getProperties().setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY,
+ serializedTxn);
+ } else {
+ txn = (Transaction) HCatUtil.deserialize(txnString);
+ }
+ if (isBulkMode(outputJobInfo)) {
+ String tableLocation = tableInfo.getTableLocation();
+ String location = new Path(tableLocation, "REVISION_" + txn.getRevisionNumber())
+ .toString();
+ outputJobInfo.getProperties().setProperty(PROPERTY_INT_OUTPUT_LOCATION, location);
+ // We are writing out an intermediate sequenceFile hence
+ // location is not passed in OutputJobInfo.getLocation()
+ // TODO replace this with a mapreduce constant when available
+ jobProperties.put("mapred.output.dir", location);
+ jobProperties.put("mapred.output.committer.class", HBaseBulkOutputCommitter.class.getName());
+ } else {
+ jobProperties.put("mapred.output.committer.class", HBaseDirectOutputCommitter.class.getName());
+ }
+
+ jobProperties.put(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo));
+ addOutputDependencyJars(jobConf);
+ jobProperties.put("tmpjars", jobConf.get("tmpjars"));
+
+ } catch (IOException e) {
+ throw new IllegalStateException("Error while configuring job properties", e);
+ }
+ }
+
+ /*
+ * @return instance of HiveAuthorizationProvider
+ *
+ * @throws HiveException
+ *
+ * @see org.apache.hive.hcatalog.storagehandler.HCatStorageHandler#
+ * getAuthorizationProvider()
+ */
+ @Override
+ public HiveAuthorizationProvider getAuthorizationProvider()
+ throws HiveException {
+
+ HBaseAuthorizationProvider hbaseAuth = new HBaseAuthorizationProvider();
+ hbaseAuth.init(getConf());
+ return hbaseAuth;
+ }
+
+ /*
+ * @param table
+ *
+ * @throws MetaException
+ *
+ * @see org.apache.hive.hcatalog.storagehandler.HCatStorageHandler
+ * #commitCreateTable(org.apache.hadoop.hive.metastore.api.Table)
+ */
+ @Override
+ public void commitCreateTable(Table table) throws MetaException {
+ }
+
+ /*
+ * @param instance of table
+ *
+ * @param deleteData
+ *
+ * @throws MetaException
+ *
+ * @see org.apache.hive.hcatalog.storagehandler.HCatStorageHandler
+ * #commitDropTable(org.apache.hadoop.hive.metastore.api.Table, boolean)
+ */
+ @Override
+ public void commitDropTable(Table tbl, boolean deleteData)
+ throws MetaException {
+ checkDeleteTable(tbl);
+
+ }
+
+ /*
+ * @param instance of table
+ *
+ * @throws MetaException
+ *
+ * @see org.apache.hive.hcatalog.storagehandler.HCatStorageHandler
+ * #preCreateTable(org.apache.hadoop.hive.metastore.api.Table)
+ */
+ @Override
+ public void preCreateTable(Table tbl) throws MetaException {
+ boolean isExternal = MetaStoreUtils.isExternalTable(tbl);
+
+ hbaseConf = getConf();
+
+ if (tbl.getSd().getLocation() != null) {
+ throw new MetaException("LOCATION may not be specified for HBase.");
+ }
+
+ try {
+ String tableName = getFullyQualifiedHBaseTableName(tbl);
+ String hbaseColumnsMapping = tbl.getParameters().get(
+ HBaseSerDe.HBASE_COLUMNS_MAPPING);
+
+ if (hbaseColumnsMapping == null) {
+ throw new MetaException(
+ "No hbase.columns.mapping defined in table"
+ + " properties.");
+ }
+
+ List<String> hbaseColumnFamilies = new ArrayList<String>();
+ List<String> hbaseColumnQualifiers = new ArrayList<String>();
+ List<byte[]> hbaseColumnFamiliesBytes = new ArrayList<byte[]>();
+ int iKey = HBaseUtil.parseColumnMapping(hbaseColumnsMapping,
+ hbaseColumnFamilies, hbaseColumnFamiliesBytes,
+ hbaseColumnQualifiers, null);
+
+ HTableDescriptor tableDesc;
+ Set<String> uniqueColumnFamilies = new HashSet<String>();
+ if (!getHBaseAdmin().tableExists(tableName)) {
+ // if it is not an external table then create one
+ if (!isExternal) {
+ // Create the column descriptors
+ tableDesc = new HTableDescriptor(tableName);
+ uniqueColumnFamilies.addAll(hbaseColumnFamilies);
+ uniqueColumnFamilies.remove(hbaseColumnFamilies.get(iKey));
+
+ for (String columnFamily : uniqueColumnFamilies) {
+ HColumnDescriptor familyDesc = new HColumnDescriptor(Bytes
+ .toBytes(columnFamily));
+ familyDesc.setMaxVersions(Integer.MAX_VALUE);
+ tableDesc.addFamily(familyDesc);
+ }
- } catch (IOException e) {
- throw new IllegalStateException("Error while configuring job properties", e);
- }
- }
-
- @Override
- public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
- // Populate jobProperties with output table name, hbase-default.xml, hbase-site.xml, OutputJobInfo
- // Populate RM transaction in OutputJobInfo
- // In case of bulk mode, populate intermediate output location
- Map<String, String> tableJobProperties = tableDesc.getJobProperties();
- String jobString = tableJobProperties.get(HCatConstants.HCAT_KEY_OUTPUT_INFO);
- try {
- OutputJobInfo outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(jobString);
- HCatTableInfo tableInfo = outputJobInfo.getTableInfo();
- String qualifiedTableName = HBaseHCatStorageHandler.getFullyQualifiedHBaseTableName(tableInfo);
- jobProperties.put(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, qualifiedTableName);
- jobProperties.put(TableOutputFormat.OUTPUT_TABLE, qualifiedTableName);
-
- Configuration jobConf = getJobConf();
- addResources(jobConf, jobProperties);
-
- Configuration copyOfConf = new Configuration(jobConf);
- HBaseConfiguration.addHbaseResources(copyOfConf);
-
- String txnString = outputJobInfo.getProperties().getProperty(
- HBaseConstants.PROPERTY_WRITE_TXN_KEY);
- Transaction txn = null;
- if (txnString == null) {
- txn = HBaseRevisionManagerUtil.beginWriteTransaction(qualifiedTableName, tableInfo,
- RevisionManagerConfiguration.create(copyOfConf));
- String serializedTxn = HCatUtil.serialize(txn);
- outputJobInfo.getProperties().setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY,
- serializedTxn);
- } else {
- txn = (Transaction) HCatUtil.deserialize(txnString);
- }
- if (isBulkMode(outputJobInfo)) {
- String tableLocation = tableInfo.getTableLocation();
- String location = new Path(tableLocation, "REVISION_" + txn.getRevisionNumber())
- .toString();
- outputJobInfo.getProperties().setProperty(PROPERTY_INT_OUTPUT_LOCATION, location);
- // We are writing out an intermediate sequenceFile hence
- // location is not passed in OutputJobInfo.getLocation()
- // TODO replace this with a mapreduce constant when available
- jobProperties.put("mapred.output.dir", location);
- jobProperties.put("mapred.output.committer.class", HBaseBulkOutputCommitter.class.getName());
- } else {
- jobProperties.put("mapred.output.committer.class", HBaseDirectOutputCommitter.class.getName());
- }
-
- jobProperties.put(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo));
- addOutputDependencyJars(jobConf);
- jobProperties.put("tmpjars", jobConf.get("tmpjars"));
-
- } catch (IOException e) {
- throw new IllegalStateException("Error while configuring job properties", e);
- }
- }
-
- /*
- * @return instance of HiveAuthorizationProvider
- *
- * @throws HiveException
- *
- * @see org.apache.hcatalog.storagehandler.HCatStorageHandler#
- * getAuthorizationProvider()
- */
- @Override
- public HiveAuthorizationProvider getAuthorizationProvider()
- throws HiveException {
-
- HBaseAuthorizationProvider hbaseAuth = new HBaseAuthorizationProvider();
- hbaseAuth.init(getConf());
- return hbaseAuth;
- }
-
- /*
- * @param table
- *
- * @throws MetaException
- *
- * @see org.apache.hcatalog.storagehandler.HCatStorageHandler
- * #commitCreateTable(org.apache.hadoop.hive.metastore.api.Table)
- */
- @Override
- public void commitCreateTable(Table table) throws MetaException {
- }
-
- /*
- * @param instance of table
- *
- * @param deleteData
- *
- * @throws MetaException
- *
- * @see org.apache.hcatalog.storagehandler.HCatStorageHandler
- * #commitDropTable(org.apache.hadoop.hive.metastore.api.Table, boolean)
- */
- @Override
- public void commitDropTable(Table tbl, boolean deleteData)
- throws MetaException {
- checkDeleteTable(tbl);
-
- }
-
- /*
- * @param instance of table
- *
- * @throws MetaException
- *
- * @see org.apache.hcatalog.storagehandler.HCatStorageHandler
- * #preCreateTable(org.apache.hadoop.hive.metastore.api.Table)
- */
- @Override
- public void preCreateTable(Table tbl) throws MetaException {
- boolean isExternal = MetaStoreUtils.isExternalTable(tbl);
-
- hbaseConf = getConf();
-
- if (tbl.getSd().getLocation() != null) {
- throw new MetaException("LOCATION may not be specified for HBase.");
- }
-
- try {
- String tableName = getFullyQualifiedHBaseTableName(tbl);
- String hbaseColumnsMapping = tbl.getParameters().get(
- HBaseSerDe.HBASE_COLUMNS_MAPPING);
-
- if (hbaseColumnsMapping == null) {
- throw new MetaException(
- "No hbase.columns.mapping defined in table"
- + " properties.");
- }
-
- List<String> hbaseColumnFamilies = new ArrayList<String>();
- List<String> hbaseColumnQualifiers = new ArrayList<String>();
- List<byte[]> hbaseColumnFamiliesBytes = new ArrayList<byte[]>();
- int iKey = HBaseUtil.parseColumnMapping(hbaseColumnsMapping,
- hbaseColumnFamilies, hbaseColumnFamiliesBytes,
- hbaseColumnQualifiers, null);
-
- HTableDescriptor tableDesc;
- Set<String> uniqueColumnFamilies = new HashSet<String>();
- if (!getHBaseAdmin().tableExists(tableName)) {
- // if it is not an external table then create one
- if (!isExternal) {
- // Create the column descriptors
- tableDesc = new HTableDescriptor(tableName);
- uniqueColumnFamilies.addAll(hbaseColumnFamilies);
- uniqueColumnFamilies.remove(hbaseColumnFamilies.get(iKey));
-
- for (String columnFamily : uniqueColumnFamilies) {
- HColumnDescriptor familyDesc = new HColumnDescriptor(Bytes
- .toBytes(columnFamily));
- familyDesc.setMaxVersions(Integer.MAX_VALUE);
- tableDesc.addFamily(familyDesc);
- }
-
- getHBaseAdmin().createTable(tableDesc);
- } else {
- // an external table
- throw new MetaException("HBase table " + tableName
- + " doesn't exist while the table is "
- + "declared as an external table.");
- }
-
- } else {
- if (!isExternal) {
- throw new MetaException("Table " + tableName
- + " already exists within HBase."
- + " Use CREATE EXTERNAL TABLE instead to"
- + " register it in HCatalog.");
- }
- // make sure the schema mapping is right
- tableDesc = getHBaseAdmin().getTableDescriptor(
- Bytes.toBytes(tableName));
-
- for (int i = 0; i < hbaseColumnFamilies.size(); i++) {
- if (i == iKey) {
- continue;
- }
-
- if (!tableDesc.hasFamily(hbaseColumnFamiliesBytes.get(i))) {
- throw new MetaException("Column Family "
- + hbaseColumnFamilies.get(i)
- + " is not defined in hbase table " + tableName);
- }
- }
- }
-
- // ensure the table is online
- new HTable(hbaseConf, tableDesc.getName());
-
- //Set up table in revision manager.
- RevisionManager rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(hbaseConf);
- rm.createTable(tableName, new ArrayList<String>(uniqueColumnFamilies));
-
- } catch (MasterNotRunningException mnre) {
- throw new MetaException(StringUtils.stringifyException(mnre));
- } catch (IOException ie) {
- throw new MetaException(StringUtils.stringifyException(ie));
- } catch (IllegalArgumentException iae) {
- throw new MetaException(StringUtils.stringifyException(iae));
- }
-
- }
-
- /*
- * @param table
- *
- * @throws MetaException
- *
- * @see org.apache.hcatalog.storagehandler.HCatStorageHandler
- * #preDropTable(org.apache.hadoop.hive.metastore.api.Table)
- */
- @Override
- public void preDropTable(Table table) throws MetaException {
- }
-
- /*
- * @param table
- *
- * @throws MetaException
- *
- * @see org.apache.hcatalog.storagehandler.HCatStorageHandler
- * #rollbackCreateTable(org.apache.hadoop.hive.metastore.api.Table)
- */
- @Override
- public void rollbackCreateTable(Table table) throws MetaException {
- checkDeleteTable(table);
- }
-
- /*
- * @param table
- *
- * @throws MetaException
- *
- * @see org.apache.hcatalog.storagehandler.HCatStorageHandler
- * #rollbackDropTable(org.apache.hadoop.hive.metastore.api.Table)
- */
- @Override
- public void rollbackDropTable(Table table) throws MetaException {
- }
-
- /*
- * @return instance of HiveMetaHook
- *
- * @see org.apache.hcatalog.storagehandler.HCatStorageHandler#getMetaHook()
- */
- @Override
- public HiveMetaHook getMetaHook() {
- return this;
- }
-
- private HBaseAdmin getHBaseAdmin() throws MetaException {
- try {
- if (admin == null) {
- admin = new HBaseAdmin(this.getConf());
- }
- return admin;
- } catch (MasterNotRunningException mnre) {
- throw new MetaException(StringUtils.stringifyException(mnre));
- } catch (ZooKeeperConnectionException zkce) {
- throw new MetaException(StringUtils.stringifyException(zkce));
- }
- }
-
- private String getFullyQualifiedHBaseTableName(Table tbl) {
- String tableName = tbl.getParameters().get(HBaseSerDe.HBASE_TABLE_NAME);
- if (tableName == null) {
- tableName = tbl.getSd().getSerdeInfo().getParameters()
- .get(HBaseSerDe.HBASE_TABLE_NAME);
- }
- if (tableName == null) {
- if (tbl.getDbName().equals(MetaStoreUtils.DEFAULT_DATABASE_NAME)) {
- tableName = tbl.getTableName();
- } else {
- tableName = tbl.getDbName() + "." + tbl.getTableName();
- }
- tableName = tableName.toLowerCase();
- }
- return tableName;
- }
-
- static String getFullyQualifiedHBaseTableName(HCatTableInfo tableInfo) {
- String qualifiedName = tableInfo.getStorerInfo().getProperties()
- .getProperty(HBaseSerDe.HBASE_TABLE_NAME);
- if (qualifiedName == null) {
- String databaseName = tableInfo.getDatabaseName();
- String tableName = tableInfo.getTableName();
- if ((databaseName == null)
- || (databaseName.equals(MetaStoreUtils.DEFAULT_DATABASE_NAME))) {
- qualifiedName = tableName;
- } else {
- qualifiedName = databaseName + "." + tableName;
- }
- qualifiedName = qualifiedName.toLowerCase();
- }
- return qualifiedName;
- }
-
- @Override
- public Class<? extends InputFormat> getInputFormatClass() {
- return HBaseInputFormat.class;
- }
-
- @Override
- public Class<? extends OutputFormat> getOutputFormatClass() {
- return HBaseBaseOutputFormat.class;
- }
-
- /*
- * @return subclass of SerDe
- *
- * @throws UnsupportedOperationException
- *
- * @see
- * org.apache.hcatalog.storagehandler.HCatStorageHandler#getSerDeClass()
- */
- @Override
- public Class<? extends SerDe> getSerDeClass()
- throws UnsupportedOperationException {
- return HBaseSerDe.class;
- }
-
- public Configuration getJobConf() {
- return jobConf;
- }
-
- @Override
- public Configuration getConf() {
-
- if (hbaseConf == null) {
- hbaseConf = HBaseConfiguration.create();
- }
- return hbaseConf;
- }
-
- @Override
- public void setConf(Configuration conf) {
- //setConf is called both during DDL operations and mapred read/write jobs.
- //Creating a copy of conf for DDL and adding hbase-default and hbase-site.xml to it.
- //For jobs, maintaining a reference instead of cloning as we need to
- // 1) add hbase delegation token to the Credentials.
- // 2) set tmpjars on it. Putting in jobProperties does not get propagated to JobConf
- // in case of InputFormat as they are maintained per partition.
- //Not adding hbase-default.xml and hbase-site.xml to jobConf as it will override any
- //hbase properties set in the JobConf by the user. In configureInputJobProperties and
- //configureOutputJobProperties, we take care of adding the default properties
- //that are not already present. TODO: Change to a copy for jobs after HCAT-308 is fixed.
- jobConf = conf;
- hbaseConf = RevisionManagerConfiguration.create(HBaseConfiguration.create(conf));
- }
-
- private void checkDeleteTable(Table table) throws MetaException {
- boolean isExternal = MetaStoreUtils.isExternalTable(table);
- String tableName = getFullyQualifiedHBaseTableName(table);
- RevisionManager rm = null;
- try {
- if (!isExternal && getHBaseAdmin().tableExists(tableName)) {
- // we have created an HBase table, so we delete it to roll back;
- if (getHBaseAdmin().isTableEnabled(tableName)) {
- getHBaseAdmin().disableTable(tableName);
- }
- getHBaseAdmin().deleteTable(tableName);
-
- //Drop table in revision manager.
- rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(hbaseConf);
- rm.dropTable(tableName);
- }
- } catch (IOException ie) {
- throw new MetaException(StringUtils.stringifyException(ie));
- } finally {
- HBaseRevisionManagerUtil.closeRevisionManagerQuietly(rm);
- }
- }
-
- /**
- * Helper method for users to add the required depedency jars to distributed cache.
- * @param conf
- * @throws IOException
- */
- private void addOutputDependencyJars(Configuration conf) throws IOException {
- TableMapReduceUtil.addDependencyJars(conf,
- //ZK
- ZooKeeper.class,
- //HBase
- HTable.class,
- //Hive
- HiveException.class,
- //HCatalog jar
- HCatOutputFormat.class,
- //hcat hbase storage handler jar
- HBaseHCatStorageHandler.class,
- //hive hbase storage handler jar
- HBaseSerDe.class,
- //hive jar
- Table.class,
- //libthrift jar
- TBase.class,
- //hbase jar
- Bytes.class,
- //thrift-fb303 .jar
- FacebookBase.class,
- //guava jar
- ThreadFactoryBuilder.class);
- }
-
- /**
- * Utility method to add hbase-default.xml and hbase-site.xml properties to a new map
- * if they are not already present in the jobConf.
- * @param jobConf Job configuration
- * @param newJobProperties Map to which new properties should be added
- */
- private void addResources(Configuration jobConf,
- Map<String, String> newJobProperties) {
- Configuration conf = new Configuration(false);
- HBaseConfiguration.addHbaseResources(conf);
- RevisionManagerConfiguration.addResources(conf);
- for (Entry<String, String> entry : conf) {
- if (jobConf.get(entry.getKey()) == null)
- newJobProperties.put(entry.getKey(), entry.getValue());
- }
- }
-
- public static boolean isBulkMode(OutputJobInfo outputJobInfo) {
- //Default is false
- String bulkMode = outputJobInfo.getTableInfo().getStorerInfo().getProperties()
- .getProperty(HBaseConstants.PROPERTY_BULK_OUTPUT_MODE_KEY,
- "false");
- return "true".equals(bulkMode);
- }
-
- private String getScanColumns(HCatTableInfo tableInfo, String outputColSchema) throws IOException {
- StringBuilder builder = new StringBuilder();
- String hbaseColumnMapping = tableInfo.getStorerInfo().getProperties()
- .getProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING);
- if (outputColSchema == null) {
- String[] splits = hbaseColumnMapping.split("[,]");
- for (int i = 0; i < splits.length; i++) {
- if (!splits[i].equals(HBaseSerDe.HBASE_KEY_COL))
- builder.append(splits[i]).append(" ");
- }
+ getHBaseAdmin().createTable(tableDesc);
} else {
- HCatSchema outputSchema = (HCatSchema) HCatUtil.deserialize(outputColSchema);
- HCatSchema tableSchema = tableInfo.getDataColumns();
- List<String> outputFieldNames = outputSchema.getFieldNames();
- List<Integer> outputColumnMapping = new ArrayList<Integer>();
- for (String fieldName : outputFieldNames) {
- int position = tableSchema.getPosition(fieldName);
- outputColumnMapping.add(position);
- }
- List<String> columnFamilies = new ArrayList<String>();
- List<String> columnQualifiers = new ArrayList<String>();
- HBaseUtil.parseColumnMapping(hbaseColumnMapping, columnFamilies, null,
- columnQualifiers, null);
- for (int i = 0; i < outputColumnMapping.size(); i++) {
- int cfIndex = outputColumnMapping.get(i);
- String cf = columnFamilies.get(cfIndex);
- // We skip the key column.
- if (cf.equals(HBaseSerDe.HBASE_KEY_COL) == false) {
- String qualifier = columnQualifiers.get(i);
- builder.append(cf);
- builder.append(":");
- if (qualifier != null) {
- builder.append(qualifier);
- }
- builder.append(" ");
- }
- }
- }
- //Remove the extra space delimiter
- builder.deleteCharAt(builder.length() - 1);
- return builder.toString();
- }
+ // an external table
+ throw new MetaException("HBase table " + tableName
+ + " doesn't exist while the table is "
+ + "declared as an external table.");
+ }
+
+ } else {
+ if (!isExternal) {
+ throw new MetaException("Table " + tableName
+ + " already exists within HBase."
+ + " Use CREATE EXTERNAL TABLE instead to"
+ + " register it in HCatalog.");
+ }
+ // make sure the schema mapping is right
+ tableDesc = getHBaseAdmin().getTableDescriptor(
+ Bytes.toBytes(tableName));
+
+ for (int i = 0; i < hbaseColumnFamilies.size(); i++) {
+ if (i == iKey) {
+ continue;
+ }
+
+ if (!tableDesc.hasFamily(hbaseColumnFamiliesBytes.get(i))) {
+ throw new MetaException("Column Family "
+ + hbaseColumnFamilies.get(i)
+ + " is not defined in hbase table " + tableName);
+ }
+ }
+ }
+
+ // ensure the table is online
+ new HTable(hbaseConf, tableDesc.getName());
+
+ //Set up table in revision manager.
+ RevisionManager rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(hbaseConf);
+ rm.createTable(tableName, new ArrayList<String>(uniqueColumnFamilies));
+
+ } catch (MasterNotRunningException mnre) {
+ throw new MetaException(StringUtils.stringifyException(mnre));
+ } catch (IOException ie) {
+ throw new MetaException(StringUtils.stringifyException(ie));
+ } catch (IllegalArgumentException iae) {
+ throw new MetaException(StringUtils.stringifyException(iae));
+ }
+
+ }
+
+ /*
+ * @param table
+ *
+ * @throws MetaException
+ *
+ * @see org.apache.hive.hcatalog.storagehandler.HCatStorageHandler
+ * #preDropTable(org.apache.hadoop.hive.metastore.api.Table)
+ */
+ @Override
+ public void preDropTable(Table table) throws MetaException {
+ }
+
+ /*
+ * @param table
+ *
+ * @throws MetaException
+ *
+ * @see org.apache.hive.hcatalog.storagehandler.HCatStorageHandler
+ * #rollbackCreateTable(org.apache.hadoop.hive.metastore.api.Table)
+ */
+ @Override
+ public void rollbackCreateTable(Table table) throws MetaException {
+ checkDeleteTable(table);
+ }
+
+ /*
+ * @param table
+ *
+ * @throws MetaException
+ *
+ * @see org.apache.hive.hcatalog.storagehandler.HCatStorageHandler
+ * #rollbackDropTable(org.apache.hadoop.hive.metastore.api.Table)
+ */
+ @Override
+ public void rollbackDropTable(Table table) throws MetaException {
+ }
+
+ /*
+ * @return instance of HiveMetaHook
+ *
+ * @see org.apache.hive.hcatalog.storagehandler.HCatStorageHandler#getMetaHook()
+ */
+ @Override
+ public HiveMetaHook getMetaHook() {
+ return this;
+ }
+
+ private HBaseAdmin getHBaseAdmin() throws MetaException {
+ try {
+ if (admin == null) {
+ admin = new HBaseAdmin(this.getConf());
+ }
+ return admin;
+ } catch (MasterNotRunningException mnre) {
+ throw new MetaException(StringUtils.stringifyException(mnre));
+ } catch (ZooKeeperConnectionException zkce) {
+ throw new MetaException(StringUtils.stringifyException(zkce));
+ }
+ }
+
+ private String getFullyQualifiedHBaseTableName(Table tbl) {
+ String tableName = tbl.getParameters().get(HBaseSerDe.HBASE_TABLE_NAME);
+ if (tableName == null) {
+ tableName = tbl.getSd().getSerdeInfo().getParameters()
+ .get(HBaseSerDe.HBASE_TABLE_NAME);
+ }
+ if (tableName == null) {
+ if (tbl.getDbName().equals(MetaStoreUtils.DEFAULT_DATABASE_NAME)) {
+ tableName = tbl.getTableName();
+ } else {
+ tableName = tbl.getDbName() + "." + tbl.getTableName();
+ }
+ tableName = tableName.toLowerCase();
+ }
+ return tableName;
+ }
+
+ static String getFullyQualifiedHBaseTableName(HCatTableInfo tableInfo) {
+ String qualifiedName = tableInfo.getStorerInfo().getProperties()
+ .getProperty(HBaseSerDe.HBASE_TABLE_NAME);
+ if (qualifiedName == null) {
+ String databaseName = tableInfo.getDatabaseName();
+ String tableName = tableInfo.getTableName();
+ if ((databaseName == null)
+ || (databaseName.equals(MetaStoreUtils.DEFAULT_DATABASE_NAME))) {
+ qualifiedName = tableName;
+ } else {
+ qualifiedName = databaseName + "." + tableName;
+ }
+ qualifiedName = qualifiedName.toLowerCase();
+ }
+ return qualifiedName;
+ }
+
+ @Override
+ public Class<? extends InputFormat> getInputFormatClass() {
+ return HBaseInputFormat.class;
+ }
+
+ @Override
+ public Class<? extends OutputFormat> getOutputFormatClass() {
+ return HBaseBaseOutputFormat.class;
+ }
+
+ /*
+ * @return subclass of SerDe
+ *
+ * @throws UnsupportedOperationException
+ *
+ * @see
+ * org.apache.hive.hcatalog.storagehandler.HCatStorageHandler#getSerDeClass()
+ */
+ @Override
+ public Class<? extends SerDe> getSerDeClass()
+ throws UnsupportedOperationException {
+ return HBaseSerDe.class;
+ }
+
+ public Configuration getJobConf() {
+ return jobConf;
+ }
+
+ @Override
+ public Configuration getConf() {
+
+ if (hbaseConf == null) {
+ hbaseConf = HBaseConfiguration.create();
+ }
+ return hbaseConf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ //setConf is called both during DDL operations and mapred read/write jobs.
+ //Creating a copy of conf for DDL and adding hbase-default and hbase-site.xml to it.
+ //For jobs, maintaining a reference instead of cloning as we need to
+ // 1) add hbase delegation token to the Credentials.
+ // 2) set tmpjars on it. Putting in jobProperties does not get propagated to JobConf
+ // in case of InputFormat as they are maintained per partition.
+ //Not adding hbase-default.xml and hbase-site.xml to jobConf as it will override any
+ //hbase properties set in the JobConf by the user. In configureInputJobProperties and
+ //configureOutputJobProperties, we take care of adding the default properties
+ //that are not already present. TODO: Change to a copy for jobs after HCAT-308 is fixed.
+ jobConf = conf;
+ hbaseConf = RevisionManagerConfiguration.create(HBaseConfiguration.create(conf));
+ }
+
+ private void checkDeleteTable(Table table) throws MetaException {
+ boolean isExternal = MetaStoreUtils.isExternalTable(table);
+ String tableName = getFullyQualifiedHBaseTableName(table);
+ RevisionManager rm = null;
+ try {
+ if (!isExternal && getHBaseAdmin().tableExists(tableName)) {
+ // we have created an HBase table, so we delete it to roll back;
+ if (getHBaseAdmin().isTableEnabled(tableName)) {
+ getHBaseAdmin().disableTable(tableName);
+ }
+ getHBaseAdmin().deleteTable(tableName);
+
+ //Drop table in revision manager.
+ rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(hbaseConf);
+ rm.dropTable(tableName);
+ }
+ } catch (IOException ie) {
+ throw new MetaException(StringUtils.stringifyException(ie));
+ } finally {
+ HBaseRevisionManagerUtil.closeRevisionManagerQuietly(rm);
+ }
+ }
+
+ /**
+ * Helper method for users to add the required depedency jars to distributed cache.
+ * @param conf
+ * @throws IOException
+ */
+ private void addOutputDependencyJars(Configuration conf) throws IOException {
+ TableMapReduceUtil.addDependencyJars(conf,
+ //ZK
+ ZooKeeper.class,
+ //HBase
+ HTable.class,
+ //Hive
+ HiveException.class,
+ //HCatalog jar
+ HCatOutputFormat.class,
+ //hcat hbase storage handler jar
+ HBaseHCatStorageHandler.class,
+ //hive hbase storage handler jar
+ HBaseSerDe.class,
+ //hive jar
+ Table.class,
+ //libthrift jar
+ TBase.class,
+ //hbase jar
+ Bytes.class,
+ //thrift-fb303 .jar
+ FacebookBase.class,
+ //guava jar
+ ThreadFactoryBuilder.class);
+ }
+
+ /**
+ * Utility method to add hbase-default.xml and hbase-site.xml properties to a new map
+ * if they are not already present in the jobConf.
+ * @param jobConf Job configuration
+ * @param newJobProperties Map to which new properties should be added
+ */
+ private void addResources(Configuration jobConf,
+ Map<String, String> newJobProperties) {
+ Configuration conf = new Configuration(false);
+ HBaseConfiguration.addHbaseResources(conf);
+ RevisionManagerConfiguration.addResources(conf);
+ for (Entry<String, String> entry : conf) {
+ if (jobConf.get(entry.getKey()) == null)
+ newJobProperties.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ public static boolean isBulkMode(OutputJobInfo outputJobInfo) {
+ //Default is false
+ String bulkMode = outputJobInfo.getTableInfo().getStorerInfo().getProperties()
+ .getProperty(HBaseConstants.PROPERTY_BULK_OUTPUT_MODE_KEY,
+ "false");
+ return "true".equals(bulkMode);
+ }
+
+ private String getScanColumns(HCatTableInfo tableInfo, String outputColSchema) throws IOException {
+ StringBuilder builder = new StringBuilder();
+ String hbaseColumnMapping = tableInfo.getStorerInfo().getProperties()
+ .getProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING);
+ if (outputColSchema == null) {
+ String[] splits = hbaseColumnMapping.split("[,]");
+ for (int i = 0; i < splits.length; i++) {
+ if (!splits[i].equals(HBaseSerDe.HBASE_KEY_COL))
+ builder.append(splits[i]).append(" ");
+ }
+ } else {
+ HCatSchema outputSchema = (HCatSchema) HCatUtil.deserialize(outputColSchema);
+ HCatSchema tableSchema = tableInfo.getDataColumns();
+ List<String> outputFieldNames = outputSchema.getFieldNames();
+ List<Integer> outputColumnMapping = new ArrayList<Integer>();
+ for (String fieldName : outputFieldNames) {
+ int position = tableSchema.getPosition(fieldName);
+ outputColumnMapping.add(position);
+ }
+ List<String> columnFamilies = new ArrayList<String>();
+ List<String> columnQualifiers = new ArrayList<String>();
+ HBaseUtil.parseColumnMapping(hbaseColumnMapping, columnFamilies, null,
+ columnQualifiers, null);
+ for (int i = 0; i < outputColumnMapping.size(); i++) {
+ int cfIndex = outputColumnMapping.get(i);
+ String cf = columnFamilies.get(cfIndex);
+ // We skip the key column.
+ if (cf.equals(HBaseSerDe.HBASE_KEY_COL) == false) {
+ String qualifier = columnQualifiers.get(i);
+ builder.append(cf);
+ builder.append(":");
+ if (qualifier != null) {
+ builder.append(qualifier);
+ }
+ builder.append(" ");
+ }
+ }
+ }
+ //Remove the extra space delimiter
+ builder.deleteCharAt(builder.length() - 1);
+ return builder.toString();
+ }
}
Modified: hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java (original)
+++ hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java Thu Sep 12 01:21:10 2013
@@ -28,99 +28,99 @@ import org.apache.hadoop.hbase.client.Sc
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapred.TableSplit;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
-import org.apache.hadoop.mapred.HCatMapRedUtil;
+import org.apache.hive.hcatalog.mapreduce.HCatMapRedUtil;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.hcatalog.common.HCatConstants;
-import org.apache.hcatalog.common.HCatUtil;
-import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.apache.hive.hcatalog.common.HCatConstants;
+import org.apache.hive.hcatalog.common.HCatUtil;
+import org.apache.hive.hcatalog.mapreduce.InputJobInfo;
/**
* This class HBaseInputFormat is a wrapper class of TableInputFormat in HBase.
*/
class HBaseInputFormat implements InputFormat<ImmutableBytesWritable, Result> {
- private final TableInputFormat inputFormat;
+ private final TableInputFormat inputFormat;
- public HBaseInputFormat() {
- inputFormat = new TableInputFormat();
- }
-
- /*
- * @param instance of InputSplit
- *
- * @param instance of TaskAttemptContext
- *
- * @return RecordReader
- *
- * @throws IOException
- *
- * @throws InterruptedException
- *
- * @see
- * org.apache.hadoop.mapreduce.InputFormat#createRecordReader(org.apache
- * .hadoop.mapreduce.InputSplit,
- * org.apache.hadoop.mapreduce.TaskAttemptContext)
- */
- @Override
- public RecordReader<ImmutableBytesWritable, Result> getRecordReader(
- InputSplit split, JobConf job, Reporter reporter)
- throws IOException {
- String jobString = job.get(HCatConstants.HCAT_KEY_JOB_INFO);
- InputJobInfo inputJobInfo = (InputJobInfo) HCatUtil.deserialize(jobString);
-
- String tableName = job.get(TableInputFormat.INPUT_TABLE);
- TableSplit tSplit = (TableSplit) split;
- HbaseSnapshotRecordReader recordReader = new HbaseSnapshotRecordReader(inputJobInfo, job);
- inputFormat.setConf(job);
- Scan inputScan = inputFormat.getScan();
- // TODO: Make the caching configurable by the user
- inputScan.setCaching(200);
- inputScan.setCacheBlocks(false);
- Scan sc = new Scan(inputScan);
- sc.setStartRow(tSplit.getStartRow());
- sc.setStopRow(tSplit.getEndRow());
- recordReader.setScan(sc);
- recordReader.setHTable(new HTable(job, tableName));
- recordReader.init();
- return recordReader;
- }
-
- /*
- * @param jobContext
- *
- * @return List of InputSplit
- *
- * @throws IOException
- *
- * @throws InterruptedException
- *
- * @see
- * org.apache.hadoop.mapreduce.InputFormat#getSplits(org.apache.hadoop.mapreduce
- * .JobContext)
- */
- @Override
- public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job, int numSplits)
- throws IOException {
- inputFormat.setConf(job);
- return convertSplits(inputFormat.getSplits(HCatMapRedUtil.createJobContext(job, null,
- Reporter.NULL)));
- }
-
- private InputSplit[] convertSplits(List<org.apache.hadoop.mapreduce.InputSplit> splits) {
- InputSplit[] converted = new InputSplit[splits.size()];
- for (int i = 0; i < splits.size(); i++) {
- org.apache.hadoop.hbase.mapreduce.TableSplit tableSplit =
- (org.apache.hadoop.hbase.mapreduce.TableSplit) splits.get(i);
- TableSplit newTableSplit = new TableSplit(tableSplit.getTableName(),
- tableSplit.getStartRow(),
- tableSplit.getEndRow(), tableSplit.getRegionLocation());
- converted[i] = newTableSplit;
- }
- return converted;
+ public HBaseInputFormat() {
+ inputFormat = new TableInputFormat();
+ }
+
+ /*
+ * @param instance of InputSplit
+ *
+ * @param instance of TaskAttemptContext
+ *
+ * @return RecordReader
+ *
+ * @throws IOException
+ *
+ * @throws InterruptedException
+ *
+ * @see
+ * org.apache.hadoop.mapreduce.InputFormat#createRecordReader(org.apache
+ * .hadoop.mapreduce.InputSplit,
+ * org.apache.hadoop.mapreduce.TaskAttemptContext)
+ */
+ @Override
+ public RecordReader<ImmutableBytesWritable, Result> getRecordReader(
+ InputSplit split, JobConf job, Reporter reporter)
+ throws IOException {
+ String jobString = job.get(HCatConstants.HCAT_KEY_JOB_INFO);
+ InputJobInfo inputJobInfo = (InputJobInfo) HCatUtil.deserialize(jobString);
+
+ String tableName = job.get(TableInputFormat.INPUT_TABLE);
+ TableSplit tSplit = (TableSplit) split;
+ HbaseSnapshotRecordReader recordReader = new HbaseSnapshotRecordReader(inputJobInfo, job);
+ inputFormat.setConf(job);
+ Scan inputScan = inputFormat.getScan();
+ // TODO: Make the caching configurable by the user
+ inputScan.setCaching(200);
+ inputScan.setCacheBlocks(false);
+ Scan sc = new Scan(inputScan);
+ sc.setStartRow(tSplit.getStartRow());
+ sc.setStopRow(tSplit.getEndRow());
+ recordReader.setScan(sc);
+ recordReader.setHTable(new HTable(job, tableName));
+ recordReader.init();
+ return recordReader;
+ }
+
+ /*
+ * @param jobContext
+ *
+ * @return List of InputSplit
+ *
+ * @throws IOException
+ *
+ * @throws InterruptedException
+ *
+ * @see
+ * org.apache.hadoop.mapreduce.InputFormat#getSplits(org.apache.hadoop.mapreduce
+ * .JobContext)
+ */
+ @Override
+ public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job, int numSplits)
+ throws IOException {
+ inputFormat.setConf(job);
+ return convertSplits(inputFormat.getSplits(HCatMapRedUtil.createJobContext(job, null,
+ Reporter.NULL)));
+ }
+
+ private InputSplit[] convertSplits(List<org.apache.hadoop.mapreduce.InputSplit> splits) {
+ InputSplit[] converted = new InputSplit[splits.size()];
+ for (int i = 0; i < splits.size(); i++) {
+ org.apache.hadoop.hbase.mapreduce.TableSplit tableSplit =
+ (org.apache.hadoop.hbase.mapreduce.TableSplit) splits.get(i);
+ TableSplit newTableSplit = new TableSplit(tableSplit.getTableName(),
+ tableSplit.getStartRow(),
+ tableSplit.getEndRow(), tableSplit.getRegionLocation());
+ converted[i] = newTableSplit;
}
+ return converted;
+ }
}
Modified: hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseRevisionManagerUtil.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseRevisionManagerUtil.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseRevisionManagerUtil.java (original)
+++ hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseRevisionManagerUtil.java Thu Sep 12 01:21:10 2013
@@ -29,18 +29,18 @@ import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.hbase.HBaseSerDe;
-import org.apache.hcatalog.common.HCatConstants;
-import org.apache.hcatalog.common.HCatUtil;
-import org.apache.hcatalog.data.schema.HCatFieldSchema;
-import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hive.hcatalog.common.HCatConstants;
+import org.apache.hive.hcatalog.common.HCatUtil;
+import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hive.hcatalog.data.schema.HCatSchema;
import org.apache.hcatalog.hbase.snapshot.RevisionManager;
import org.apache.hcatalog.hbase.snapshot.RevisionManagerFactory;
import org.apache.hcatalog.hbase.snapshot.TableSnapshot;
import org.apache.hcatalog.hbase.snapshot.Transaction;
-import org.apache.hcatalog.mapreduce.HCatTableInfo;
-import org.apache.hcatalog.mapreduce.InputJobInfo;
-import org.apache.hcatalog.mapreduce.OutputJobInfo;
-import org.apache.hcatalog.mapreduce.StorerInfo;
+import org.apache.hive.hcatalog.mapreduce.HCatTableInfo;
+import org.apache.hive.hcatalog.mapreduce.InputJobInfo;
+import org.apache.hive.hcatalog.mapreduce.OutputJobInfo;
+import org.apache.hive.hcatalog.mapreduce.StorerInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,207 +51,207 @@ import org.slf4j.LoggerFactory;
*/
class HBaseRevisionManagerUtil {
- private final static Logger LOG = LoggerFactory.getLogger(HBaseRevisionManagerUtil.class);
+ private final static Logger LOG = LoggerFactory.getLogger(HBaseRevisionManagerUtil.class);
- private HBaseRevisionManagerUtil() {
- }
-
- /**
- * Creates the latest snapshot of the table.
- *
- * @param jobConf The job configuration.
- * @param hbaseTableName The fully qualified name of the HBase table.
- * @param tableInfo HCat table information
- * @return An instance of HCatTableSnapshot
- * @throws IOException Signals that an I/O exception has occurred.
- */
- static HCatTableSnapshot createSnapshot(Configuration jobConf,
- String hbaseTableName, HCatTableInfo tableInfo) throws IOException {
-
- RevisionManager rm = null;
- TableSnapshot snpt;
- try {
- rm = getOpenedRevisionManager(jobConf);
- snpt = rm.createSnapshot(hbaseTableName);
- } finally {
- closeRevisionManagerQuietly(rm);
- }
-
- HCatTableSnapshot hcatSnapshot = HBaseRevisionManagerUtil.convertSnapshot(snpt, tableInfo);
- return hcatSnapshot;
- }
-
- /**
- * Creates the snapshot using the revision specified by the user.
- *
- * @param jobConf The job configuration.
- * @param tableName The fully qualified name of the table whose snapshot is being taken.
- * @param revision The revision number to use for the snapshot.
- * @return An instance of HCatTableSnapshot.
- * @throws IOException Signals that an I/O exception has occurred.
- */
- static HCatTableSnapshot createSnapshot(Configuration jobConf,
- String tableName, long revision)
- throws IOException {
-
- TableSnapshot snpt;
- RevisionManager rm = null;
- try {
- rm = getOpenedRevisionManager(jobConf);
- snpt = rm.createSnapshot(tableName, revision);
- } finally {
- closeRevisionManagerQuietly(rm);
- }
-
- String inputJobString = jobConf.get(HCatConstants.HCAT_KEY_JOB_INFO);
- if (inputJobString == null) {
- throw new IOException(
- "InputJobInfo information not found in JobContext. "
- + "HCatInputFormat.setInput() not called?");
- }
- InputJobInfo inputInfo = (InputJobInfo) HCatUtil.deserialize(inputJobString);
- HCatTableSnapshot hcatSnapshot = HBaseRevisionManagerUtil
- .convertSnapshot(snpt, inputInfo.getTableInfo());
-
- return hcatSnapshot;
- }
-
- /**
- * Gets an instance of revision manager which is opened.
- *
- * @param jobConf The job configuration.
- * @return RevisionManager An instance of revision manager.
- * @throws IOException
- */
- static RevisionManager getOpenedRevisionManager(Configuration jobConf) throws IOException {
- return RevisionManagerFactory.getOpenedRevisionManager(jobConf);
- }
-
- static void closeRevisionManagerQuietly(RevisionManager rm) {
- if (rm != null) {
- try {
- rm.close();
- } catch (IOException e) {
- LOG.warn("Error while trying to close revision manager", e);
- }
- }
- }
-
-
- static HCatTableSnapshot convertSnapshot(TableSnapshot hbaseSnapshot,
- HCatTableInfo hcatTableInfo) throws IOException {
-
- HCatSchema hcatTableSchema = hcatTableInfo.getDataColumns();
- Map<String, String> hcatHbaseColMap = getHCatHBaseColumnMapping(hcatTableInfo);
- HashMap<String, Long> revisionMap = new HashMap<String, Long>();
-
- for (HCatFieldSchema fSchema : hcatTableSchema.getFields()) {
- if (hcatHbaseColMap.containsKey(fSchema.getName())) {
- String colFamily = hcatHbaseColMap.get(fSchema.getName());
- long revisionID = hbaseSnapshot.getRevision(colFamily);
- revisionMap.put(fSchema.getName(), revisionID);
- }
- }
-
- HCatTableSnapshot hcatSnapshot = new HCatTableSnapshot(
- hcatTableInfo.getDatabaseName(), hcatTableInfo.getTableName(), revisionMap, hbaseSnapshot.getLatestRevision());
- return hcatSnapshot;
- }
-
- static TableSnapshot convertSnapshot(HCatTableSnapshot hcatSnapshot,
- HCatTableInfo hcatTableInfo) throws IOException {
-
- HCatSchema hcatTableSchema = hcatTableInfo.getDataColumns();
- Map<String, Long> revisionMap = new HashMap<String, Long>();
- Map<String, String> hcatHbaseColMap = getHCatHBaseColumnMapping(hcatTableInfo);
- for (HCatFieldSchema fSchema : hcatTableSchema.getFields()) {
- String colFamily = hcatHbaseColMap.get(fSchema.getName());
- if (hcatSnapshot.containsColumn(fSchema.getName())) {
- long revision = hcatSnapshot.getRevision(fSchema.getName());
- revisionMap.put(colFamily, revision);
- }
- }
-
- String fullyQualifiedName = hcatSnapshot.getDatabaseName() + "."
- + hcatSnapshot.getTableName();
- return new TableSnapshot(fullyQualifiedName, revisionMap, hcatSnapshot.getLatestRevision());
-
- }
-
- /**
- * Begins a transaction in the revision manager for the given table.
- * @param qualifiedTableName Name of the table
- * @param tableInfo HCat Table information
- * @param jobConf Job Configuration
- * @return The new transaction in revision manager
- * @throws IOException
- */
- static Transaction beginWriteTransaction(String qualifiedTableName,
- HCatTableInfo tableInfo, Configuration jobConf) throws IOException {
- Transaction txn;
- RevisionManager rm = null;
- try {
- rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(jobConf);
- String hBaseColumns = tableInfo.getStorerInfo().getProperties()
- .getProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING);
- String[] splits = hBaseColumns.split("[,:]");
- Set<String> families = new HashSet<String>();
- for (int i = 0; i < splits.length; i += 2) {
- if (!splits[i].isEmpty())
- families.add(splits[i]);
- }
- txn = rm.beginWriteTransaction(qualifiedTableName, new ArrayList<String>(families));
- } finally {
- HBaseRevisionManagerUtil.closeRevisionManagerQuietly(rm);
- }
- return txn;
- }
-
- static Transaction getWriteTransaction(Configuration conf) throws IOException {
- OutputJobInfo outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(conf.get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
- return (Transaction) HCatUtil.deserialize(outputJobInfo.getProperties()
- .getProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY));
- }
-
- static void setWriteTransaction(Configuration conf, Transaction txn) throws IOException {
- OutputJobInfo outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(conf.get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
- outputJobInfo.getProperties().setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY, HCatUtil.serialize(txn));
- conf.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo));
- }
-
- /**
- * Get the Revision number that will be assigned to this job's output data
- * @param conf configuration of the job
- * @return the revision number used
- * @throws IOException
- */
- static long getOutputRevision(Configuration conf) throws IOException {
- return getWriteTransaction(conf).getRevisionNumber();
- }
-
- private static Map<String, String> getHCatHBaseColumnMapping(HCatTableInfo hcatTableInfo)
- throws IOException {
-
- HCatSchema hcatTableSchema = hcatTableInfo.getDataColumns();
- StorerInfo storeInfo = hcatTableInfo.getStorerInfo();
- String hbaseColumnMapping = storeInfo.getProperties().getProperty(
- HBaseSerDe.HBASE_COLUMNS_MAPPING);
-
- Map<String, String> hcatHbaseColMap = new HashMap<String, String>();
- List<String> columnFamilies = new ArrayList<String>();
- List<String> columnQualifiers = new ArrayList<String>();
- HBaseUtil.parseColumnMapping(hbaseColumnMapping, columnFamilies,
- null, columnQualifiers, null);
-
- for (HCatFieldSchema column : hcatTableSchema.getFields()) {
- int fieldPos = hcatTableSchema.getPosition(column.getName());
- String colFamily = columnFamilies.get(fieldPos);
- if (colFamily.equals(HBaseSerDe.HBASE_KEY_COL) == false) {
- hcatHbaseColMap.put(column.getName(), colFamily);
- }
- }
+ private HBaseRevisionManagerUtil() {
+ }
- return hcatHbaseColMap;
+ /**
+ * Creates the latest snapshot of the table.
+ *
+ * @param jobConf The job configuration.
+ * @param hbaseTableName The fully qualified name of the HBase table.
+ * @param tableInfo HCat table information
+ * @return An instance of HCatTableSnapshot
+ * @throws IOException Signals that an I/O exception has occurred.
+ */
+ static HCatTableSnapshot createSnapshot(Configuration jobConf,
+ String hbaseTableName, HCatTableInfo tableInfo) throws IOException {
+
+ RevisionManager rm = null;
+ TableSnapshot snpt;
+ try {
+ rm = getOpenedRevisionManager(jobConf);
+ snpt = rm.createSnapshot(hbaseTableName);
+ } finally {
+ closeRevisionManagerQuietly(rm);
+ }
+
+ HCatTableSnapshot hcatSnapshot = HBaseRevisionManagerUtil.convertSnapshot(snpt, tableInfo);
+ return hcatSnapshot;
+ }
+
+ /**
+ * Creates the snapshot using the revision specified by the user.
+ *
+ * @param jobConf The job configuration.
+ * @param tableName The fully qualified name of the table whose snapshot is being taken.
+ * @param revision The revision number to use for the snapshot.
+ * @return An instance of HCatTableSnapshot.
+ * @throws IOException Signals that an I/O exception has occurred.
+ */
+ static HCatTableSnapshot createSnapshot(Configuration jobConf,
+ String tableName, long revision)
+ throws IOException {
+
+ TableSnapshot snpt;
+ RevisionManager rm = null;
+ try {
+ rm = getOpenedRevisionManager(jobConf);
+ snpt = rm.createSnapshot(tableName, revision);
+ } finally {
+ closeRevisionManagerQuietly(rm);
+ }
+
+ String inputJobString = jobConf.get(HCatConstants.HCAT_KEY_JOB_INFO);
+ if (inputJobString == null) {
+ throw new IOException(
+ "InputJobInfo information not found in JobContext. "
+ + "HCatInputFormat.setInput() not called?");
+ }
+ InputJobInfo inputInfo = (InputJobInfo) HCatUtil.deserialize(inputJobString);
+ HCatTableSnapshot hcatSnapshot = HBaseRevisionManagerUtil
+ .convertSnapshot(snpt, inputInfo.getTableInfo());
+
+ return hcatSnapshot;
+ }
+
+ /**
+ * Gets an instance of revision manager which is opened.
+ *
+ * @param jobConf The job configuration.
+ * @return RevisionManager An instance of revision manager.
+ * @throws IOException
+ */
+ static RevisionManager getOpenedRevisionManager(Configuration jobConf) throws IOException {
+ return RevisionManagerFactory.getOpenedRevisionManager(jobConf);
+ }
+
+ static void closeRevisionManagerQuietly(RevisionManager rm) {
+ if (rm != null) {
+ try {
+ rm.close();
+ } catch (IOException e) {
+ LOG.warn("Error while trying to close revision manager", e);
+ }
+ }
+ }
+
+
+ static HCatTableSnapshot convertSnapshot(TableSnapshot hbaseSnapshot,
+ HCatTableInfo hcatTableInfo) throws IOException {
+
+ HCatSchema hcatTableSchema = hcatTableInfo.getDataColumns();
+ Map<String, String> hcatHbaseColMap = getHCatHBaseColumnMapping(hcatTableInfo);
+ HashMap<String, Long> revisionMap = new HashMap<String, Long>();
+
+ for (HCatFieldSchema fSchema : hcatTableSchema.getFields()) {
+ if (hcatHbaseColMap.containsKey(fSchema.getName())) {
+ String colFamily = hcatHbaseColMap.get(fSchema.getName());
+ long revisionID = hbaseSnapshot.getRevision(colFamily);
+ revisionMap.put(fSchema.getName(), revisionID);
+ }
+ }
+
+ HCatTableSnapshot hcatSnapshot = new HCatTableSnapshot(
+ hcatTableInfo.getDatabaseName(), hcatTableInfo.getTableName(), revisionMap, hbaseSnapshot.getLatestRevision());
+ return hcatSnapshot;
+ }
+
+ static TableSnapshot convertSnapshot(HCatTableSnapshot hcatSnapshot,
+ HCatTableInfo hcatTableInfo) throws IOException {
+
+ HCatSchema hcatTableSchema = hcatTableInfo.getDataColumns();
+ Map<String, Long> revisionMap = new HashMap<String, Long>();
+ Map<String, String> hcatHbaseColMap = getHCatHBaseColumnMapping(hcatTableInfo);
+ for (HCatFieldSchema fSchema : hcatTableSchema.getFields()) {
+ String colFamily = hcatHbaseColMap.get(fSchema.getName());
+ if (hcatSnapshot.containsColumn(fSchema.getName())) {
+ long revision = hcatSnapshot.getRevision(fSchema.getName());
+ revisionMap.put(colFamily, revision);
+ }
+ }
+
+ String fullyQualifiedName = hcatSnapshot.getDatabaseName() + "."
+ + hcatSnapshot.getTableName();
+ return new TableSnapshot(fullyQualifiedName, revisionMap, hcatSnapshot.getLatestRevision());
+
+ }
+
+ /**
+ * Begins a transaction in the revision manager for the given table.
+ * @param qualifiedTableName Name of the table
+ * @param tableInfo HCat Table information
+ * @param jobConf Job Configuration
+ * @return The new transaction in revision manager
+ * @throws IOException
+ */
+ static Transaction beginWriteTransaction(String qualifiedTableName,
+ HCatTableInfo tableInfo, Configuration jobConf) throws IOException {
+ Transaction txn;
+ RevisionManager rm = null;
+ try {
+ rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(jobConf);
+ String hBaseColumns = tableInfo.getStorerInfo().getProperties()
+ .getProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING);
+ String[] splits = hBaseColumns.split("[,:]");
+ Set<String> families = new HashSet<String>();
+ for (int i = 0; i < splits.length; i += 2) {
+ if (!splits[i].isEmpty())
+ families.add(splits[i]);
+ }
+ txn = rm.beginWriteTransaction(qualifiedTableName, new ArrayList<String>(families));
+ } finally {
+ HBaseRevisionManagerUtil.closeRevisionManagerQuietly(rm);
+ }
+ return txn;
+ }
+
+ static Transaction getWriteTransaction(Configuration conf) throws IOException {
+ OutputJobInfo outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(conf.get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
+ return (Transaction) HCatUtil.deserialize(outputJobInfo.getProperties()
+ .getProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY));
+ }
+
+ static void setWriteTransaction(Configuration conf, Transaction txn) throws IOException {
+ OutputJobInfo outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(conf.get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
+ outputJobInfo.getProperties().setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY, HCatUtil.serialize(txn));
+ conf.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo));
+ }
+
+ /**
+ * Get the Revision number that will be assigned to this job's output data
+ * @param conf configuration of the job
+ * @return the revision number used
+ * @throws IOException
+ */
+ static long getOutputRevision(Configuration conf) throws IOException {
+ return getWriteTransaction(conf).getRevisionNumber();
+ }
+
+ private static Map<String, String> getHCatHBaseColumnMapping(HCatTableInfo hcatTableInfo)
+ throws IOException {
+
+ HCatSchema hcatTableSchema = hcatTableInfo.getDataColumns();
+ StorerInfo storeInfo = hcatTableInfo.getStorerInfo();
+ String hbaseColumnMapping = storeInfo.getProperties().getProperty(
+ HBaseSerDe.HBASE_COLUMNS_MAPPING);
+
+ Map<String, String> hcatHbaseColMap = new HashMap<String, String>();
+ List<String> columnFamilies = new ArrayList<String>();
+ List<String> columnQualifiers = new ArrayList<String>();
+ HBaseUtil.parseColumnMapping(hbaseColumnMapping, columnFamilies,
+ null, columnQualifiers, null);
+
+ for (HCatFieldSchema column : hcatTableSchema.getFields()) {
+ int fieldPos = hcatTableSchema.getPosition(column.getName());
+ String colFamily = columnFamilies.get(fieldPos);
+ if (colFamily.equals(HBaseSerDe.HBASE_KEY_COL) == false) {
+ hcatHbaseColMap.put(column.getName(), colFamily);
+ }
}
+ return hcatHbaseColMap;
+ }
+
}
Modified: hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseUtil.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseUtil.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseUtil.java (original)
+++ hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseUtil.java Thu Sep 12 01:21:10 2013
@@ -29,131 +29,131 @@ import org.apache.hadoop.mapred.JobConf;
class HBaseUtil {
- private HBaseUtil() {
- }
-
- /**
- * Parses the HBase columns mapping to identify the column families, qualifiers
- * and also caches the byte arrays corresponding to them. One of the HCat table
- * columns maps to the HBase row key, by default the first column.
- *
- * @param columnMapping - the column mapping specification to be parsed
- * @param colFamilies - the list of HBase column family names
- * @param colFamiliesBytes - the corresponding byte array
- * @param colQualifiers - the list of HBase column qualifier names
- * @param colQualifiersBytes - the corresponding byte array
- * @return the row key index in the column names list
- * @throws IOException
- */
- static int parseColumnMapping(
- String columnMapping,
- List<String> colFamilies,
- List<byte[]> colFamiliesBytes,
- List<String> colQualifiers,
- List<byte[]> colQualifiersBytes) throws IOException {
-
- int rowKeyIndex = -1;
-
- if (colFamilies == null || colQualifiers == null) {
- throw new IllegalArgumentException("Error: caller must pass in lists for the column families " +
- "and qualifiers.");
- }
+ private HBaseUtil() {
+ }
- colFamilies.clear();
- colQualifiers.clear();
+ /**
+ * Parses the HBase columns mapping to identify the column families, qualifiers
+ * and also caches the byte arrays corresponding to them. One of the HCat table
+ * columns maps to the HBase row key, by default the first column.
+ *
+ * @param columnMapping - the column mapping specification to be parsed
+ * @param colFamilies - the list of HBase column family names
+ * @param colFamiliesBytes - the corresponding byte array
+ * @param colQualifiers - the list of HBase column qualifier names
+ * @param colQualifiersBytes - the corresponding byte array
+ * @return the row key index in the column names list
+ * @throws IOException
+ */
+ static int parseColumnMapping(
+ String columnMapping,
+ List<String> colFamilies,
+ List<byte[]> colFamiliesBytes,
+ List<String> colQualifiers,
+ List<byte[]> colQualifiersBytes) throws IOException {
+
+ int rowKeyIndex = -1;
+
+ if (colFamilies == null || colQualifiers == null) {
+ throw new IllegalArgumentException("Error: caller must pass in lists for the column families " +
+ "and qualifiers.");
+ }
- if (columnMapping == null) {
- throw new IllegalArgumentException("Error: hbase.columns.mapping missing for this HBase table.");
- }
+ colFamilies.clear();
+ colQualifiers.clear();
- if (columnMapping.equals("") || columnMapping.equals(HBaseSerDe.HBASE_KEY_COL)) {
- throw new IllegalArgumentException("Error: hbase.columns.mapping specifies only the HBase table"
- + " row key. A valid Hive-HBase table must specify at least one additional column.");
- }
+ if (columnMapping == null) {
+ throw new IllegalArgumentException("Error: hbase.columns.mapping missing for this HBase table.");
+ }
- String[] mapping = columnMapping.split(",");
+ if (columnMapping.equals("") || columnMapping.equals(HBaseSerDe.HBASE_KEY_COL)) {
+ throw new IllegalArgumentException("Error: hbase.columns.mapping specifies only the HBase table"
+ + " row key. A valid Hive-HBase table must specify at least one additional column.");
+ }
- for (int i = 0; i < mapping.length; i++) {
- String elem = mapping[i];
- int idxFirst = elem.indexOf(":");
- int idxLast = elem.lastIndexOf(":");
-
- if (idxFirst < 0 || !(idxFirst == idxLast)) {
- throw new IllegalArgumentException("Error: the HBase columns mapping contains a badly formed " +
- "column family, column qualifier specification.");
- }
-
- if (elem.equals(HBaseSerDe.HBASE_KEY_COL)) {
- rowKeyIndex = i;
- colFamilies.add(elem);
- colQualifiers.add(null);
- } else {
- String[] parts = elem.split(":");
- assert (parts.length > 0 && parts.length <= 2);
- colFamilies.add(parts[0]);
-
- if (parts.length == 2) {
- colQualifiers.add(parts[1]);
- } else {
- colQualifiers.add(null);
- }
- }
- }
+ String[] mapping = columnMapping.split(",");
- if (rowKeyIndex == -1) {
- colFamilies.add(0, HBaseSerDe.HBASE_KEY_COL);
- colQualifiers.add(0, null);
- rowKeyIndex = 0;
+ for (int i = 0; i < mapping.length; i++) {
+ String elem = mapping[i];
+ int idxFirst = elem.indexOf(":");
+ int idxLast = elem.lastIndexOf(":");
+
+ if (idxFirst < 0 || !(idxFirst == idxLast)) {
+ throw new IllegalArgumentException("Error: the HBase columns mapping contains a badly formed " +
+ "column family, column qualifier specification.");
+ }
+
+ if (elem.equals(HBaseSerDe.HBASE_KEY_COL)) {
+ rowKeyIndex = i;
+ colFamilies.add(elem);
+ colQualifiers.add(null);
+ } else {
+ String[] parts = elem.split(":");
+ assert (parts.length > 0 && parts.length <= 2);
+ colFamilies.add(parts[0]);
+
+ if (parts.length == 2) {
+ colQualifiers.add(parts[1]);
+ } else {
+ colQualifiers.add(null);
}
+ }
+ }
- if (colFamilies.size() != colQualifiers.size()) {
- throw new IOException("Error in parsing the hbase columns mapping.");
- }
+ if (rowKeyIndex == -1) {
+ colFamilies.add(0, HBaseSerDe.HBASE_KEY_COL);
+ colQualifiers.add(0, null);
+ rowKeyIndex = 0;
+ }
- // populate the corresponding byte [] if the client has passed in a non-null list
- if (colFamiliesBytes != null) {
- colFamiliesBytes.clear();
-
- for (String fam : colFamilies) {
- colFamiliesBytes.add(Bytes.toBytes(fam));
- }
- }
+ if (colFamilies.size() != colQualifiers.size()) {
+ throw new IOException("Error in parsing the hbase columns mapping.");
+ }
- if (colQualifiersBytes != null) {
- colQualifiersBytes.clear();
+ // populate the corresponding byte [] if the client has passed in a non-null list
+ if (colFamiliesBytes != null) {
+ colFamiliesBytes.clear();
+
+ for (String fam : colFamilies) {
+ colFamiliesBytes.add(Bytes.toBytes(fam));
+ }
+ }
- for (String qual : colQualifiers) {
- if (qual == null) {
- colQualifiersBytes.add(null);
- } else {
- colQualifiersBytes.add(Bytes.toBytes(qual));
- }
- }
- }
+ if (colQualifiersBytes != null) {
+ colQualifiersBytes.clear();
- if (colFamiliesBytes != null && colQualifiersBytes != null) {
- if (colFamiliesBytes.size() != colQualifiersBytes.size()) {
- throw new IOException("Error in caching the bytes for the hbase column families " +
- "and qualifiers.");
- }
+ for (String qual : colQualifiers) {
+ if (qual == null) {
+ colQualifiersBytes.add(null);
+ } else {
+ colQualifiersBytes.add(Bytes.toBytes(qual));
}
+ }
+ }
- return rowKeyIndex;
+ if (colFamiliesBytes != null && colQualifiersBytes != null) {
+ if (colFamiliesBytes.size() != colQualifiersBytes.size()) {
+ throw new IOException("Error in caching the bytes for the hbase column families " +
+ "and qualifiers.");
+ }
}
- /**
- * Get delegation token from hbase and add it to JobConf
- * @param job
- * @throws IOException
- */
- static void addHBaseDelegationToken(JobConf job) throws IOException {
- if (User.isHBaseSecurityEnabled(job)) {
- try {
- User.getCurrent().obtainAuthTokenForJob(job);
- } catch (InterruptedException e) {
- throw new IOException("Error while obtaining hbase delegation token", e);
- }
- }
+ return rowKeyIndex;
+ }
+
+ /**
+ * Get delegation token from hbase and add it to JobConf
+ * @param job
+ * @throws IOException
+ */
+ static void addHBaseDelegationToken(JobConf job) throws IOException {
+ if (User.isHBaseSecurityEnabled(job)) {
+ try {
+ User.getCurrent().obtainAuthTokenForJob(job);
+ } catch (InterruptedException e) {
+ throw new IOException("Error while obtaining hbase delegation token", e);
+ }
}
+ }
}
Modified: hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HCatTableSnapshot.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HCatTableSnapshot.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HCatTableSnapshot.java (original)
+++ hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HCatTableSnapshot.java Thu Sep 12 01:21:10 2013
@@ -30,63 +30,63 @@ import java.util.Map;
*/
public class HCatTableSnapshot implements Serializable {
- private static final long serialVersionUID = 1L;
- private String tableName;
- private String databaseName;
- private Map<String, Long> columnMap;
- private long latestRevision;
-
- HCatTableSnapshot(String databaseName, String tableName, Map<String, Long> columnMap, long latestRevision) {
- this.tableName = tableName;
- this.databaseName = databaseName;
- this.columnMap = columnMap;
- this.latestRevision = latestRevision;
- }
-
- /**
- * @return The name of the table in the snapshot.
- */
- public String getTableName() {
- return this.tableName;
- }
-
- /**
- * @return The name of the database to which the table snapshot belongs.
- */
- public String getDatabaseName() {
- return this.databaseName;
- }
-
- /**
- * @return The revision number of a column in a snapshot.
- */
- long getRevision(String column) {
- if (columnMap.containsKey(column))
- return this.columnMap.get(column);
- return latestRevision;
- }
-
- /**
- * The method checks if the snapshot contains information about a data column.
- *
- * @param column The data column of the table
- * @return true, if successful
- */
- boolean containsColumn(String column) {
- return this.columnMap.containsKey(column);
- }
-
- /**
- * @return latest committed revision when snapshot was taken
- */
- long getLatestRevision() {
- return latestRevision;
- }
-
- @Override
- public String toString() {
- String snapshot = " Database Name: " + this.databaseName + " Table Name : " + tableName +
- "Latest Revision: " + latestRevision + " Column revision : " + columnMap.toString();
- return snapshot;
- }
+ private static final long serialVersionUID = 1L;
+ private String tableName;
+ private String databaseName;
+ private Map<String, Long> columnMap;
+ private long latestRevision;
+
+ HCatTableSnapshot(String databaseName, String tableName, Map<String, Long> columnMap, long latestRevision) {
+ this.tableName = tableName;
+ this.databaseName = databaseName;
+ this.columnMap = columnMap;
+ this.latestRevision = latestRevision;
+ }
+
+ /**
+ * @return The name of the table in the snapshot.
+ */
+ public String getTableName() {
+ return this.tableName;
+ }
+
+ /**
+ * @return The name of the database to which the table snapshot belongs.
+ */
+ public String getDatabaseName() {
+ return this.databaseName;
+ }
+
+ /**
+ * @return The revision number of a column in a snapshot.
+ */
+ long getRevision(String column) {
+ if (columnMap.containsKey(column))
+ return this.columnMap.get(column);
+ return latestRevision;
+ }
+
+ /**
+ * The method checks if the snapshot contains information about a data column.
+ *
+ * @param column The data column of the table
+ * @return true, if successful
+ */
+ boolean containsColumn(String column) {
+ return this.columnMap.containsKey(column);
+ }
+
+ /**
+ * @return latest committed revision when snapshot was taken
+ */
+ long getLatestRevision() {
+ return latestRevision;
+ }
+
+ @Override
+ public String toString() {
+ String snapshot = " Database Name: " + this.databaseName + " Table Name : " + tableName +
+ "Latest Revision: " + latestRevision + " Column revision : " + columnMap.toString();
+ return snapshot;
+ }
}