You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/09/12 03:21:29 UTC

svn commit: r1522098 [5/30] - in /hive/branches/vectorization: ./ beeline/src/test/org/apache/hive/beeline/src/test/ bin/ bin/ext/ common/src/java/org/apache/hadoop/hive/common/ common/src/java/org/apache/hadoop/hive/conf/ conf/ contrib/src/java/org/ap...

Modified: hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java (original)
+++ hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java Thu Sep 12 01:21:10 2013
@@ -54,19 +54,19 @@ import org.apache.hadoop.mapred.InputFor
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hcatalog.common.HCatConstants;
-import org.apache.hcatalog.common.HCatUtil;
-import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hive.hcatalog.common.HCatConstants;
+import org.apache.hive.hcatalog.common.HCatUtil;
+import org.apache.hive.hcatalog.data.schema.HCatSchema;
 import org.apache.hcatalog.hbase.HBaseBulkOutputFormat.HBaseBulkOutputCommitter;
 import org.apache.hcatalog.hbase.HBaseDirectOutputFormat.HBaseDirectOutputCommitter;
 import org.apache.hcatalog.hbase.snapshot.RevisionManager;
 import org.apache.hcatalog.hbase.snapshot.RevisionManagerConfiguration;
 import org.apache.hcatalog.hbase.snapshot.Transaction;
-import org.apache.hcatalog.mapreduce.HCatOutputFormat;
-import org.apache.hcatalog.mapreduce.HCatTableInfo;
-import org.apache.hcatalog.mapreduce.InputJobInfo;
-import org.apache.hcatalog.mapreduce.OutputJobInfo;
-import org.apache.hcatalog.mapreduce.HCatStorageHandler;
+import org.apache.hive.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.hive.hcatalog.mapreduce.HCatTableInfo;
+import org.apache.hive.hcatalog.mapreduce.InputJobInfo;
+import org.apache.hive.hcatalog.mapreduce.OutputJobInfo;
+import org.apache.hive.hcatalog.mapreduce.HCatStorageHandler;
 import org.apache.thrift.TBase;
 import org.apache.zookeeper.ZooKeeper;
 
@@ -80,531 +80,531 @@ import com.google.common.util.concurrent
  */
 public class HBaseHCatStorageHandler extends HCatStorageHandler implements HiveMetaHook, Configurable {
 
-    public final static String DEFAULT_PREFIX = "default.";
-    private final static String PROPERTY_INT_OUTPUT_LOCATION = "hcat.hbase.mapreduce.intermediateOutputLocation";
+  public final static String DEFAULT_PREFIX = "default.";
+  private final static String PROPERTY_INT_OUTPUT_LOCATION = "hcat.hbase.mapreduce.intermediateOutputLocation";
 
-    private Configuration hbaseConf;
-    private Configuration jobConf;
-    private HBaseAdmin admin;
-
-    @Override
-    public void configureInputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
-        // Populate jobProperties with input table name, table columns, RM snapshot,
-        // hbase-default.xml and hbase-site.xml
-        Map<String, String> tableJobProperties = tableDesc.getJobProperties();
-        String jobString = tableJobProperties.get(HCatConstants.HCAT_KEY_JOB_INFO);
-        try {
-            InputJobInfo inputJobInfo = (InputJobInfo) HCatUtil.deserialize(jobString);
-            HCatTableInfo tableInfo = inputJobInfo.getTableInfo();
-            String qualifiedTableName = HBaseHCatStorageHandler.getFullyQualifiedHBaseTableName(tableInfo);
-            jobProperties.put(TableInputFormat.INPUT_TABLE, qualifiedTableName);
-
-            Configuration jobConf = getJobConf();
-            addResources(jobConf, jobProperties);
-            JobConf copyOfConf = new JobConf(jobConf);
-            HBaseConfiguration.addHbaseResources(copyOfConf);
-            //Getting hbase delegation token in getInputSplits does not work with PIG. So need to
-            //do it here
-            if (jobConf instanceof JobConf) { //Should be the case
-                HBaseUtil.addHBaseDelegationToken(copyOfConf);
-                ((JobConf) jobConf).getCredentials().addAll(copyOfConf.getCredentials());
-            }
-
-            String outputSchema = jobConf.get(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA);
-            jobProperties.put(TableInputFormat.SCAN_COLUMNS, getScanColumns(tableInfo, outputSchema));
-
-            String serSnapshot = (String) inputJobInfo.getProperties().get(
-                HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY);
-            if (serSnapshot == null) {
-                HCatTableSnapshot snapshot =
-                    HBaseRevisionManagerUtil.createSnapshot(
-                        RevisionManagerConfiguration.create(copyOfConf),
-                        qualifiedTableName, tableInfo);
-                jobProperties.put(HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY,
-                    HCatUtil.serialize(snapshot));
-            }
-
-            //This adds it directly to the jobConf. Setting in jobProperties does not get propagated
-            //to JobConf as of now as the jobProperties is maintained per partition
-            //TODO: Remove when HCAT-308 is fixed
-            addOutputDependencyJars(jobConf);
-            jobProperties.put("tmpjars", jobConf.get("tmpjars"));
+  private Configuration hbaseConf;
+  private Configuration jobConf;
+  private HBaseAdmin admin;
+
+  @Override
+  public void configureInputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
+    // Populate jobProperties with input table name, table columns, RM snapshot,
+    // hbase-default.xml and hbase-site.xml
+    Map<String, String> tableJobProperties = tableDesc.getJobProperties();
+    String jobString = tableJobProperties.get(HCatConstants.HCAT_KEY_JOB_INFO);
+    try {
+      InputJobInfo inputJobInfo = (InputJobInfo) HCatUtil.deserialize(jobString);
+      HCatTableInfo tableInfo = inputJobInfo.getTableInfo();
+      String qualifiedTableName = HBaseHCatStorageHandler.getFullyQualifiedHBaseTableName(tableInfo);
+      jobProperties.put(TableInputFormat.INPUT_TABLE, qualifiedTableName);
+
+      Configuration jobConf = getJobConf();
+      addResources(jobConf, jobProperties);
+      JobConf copyOfConf = new JobConf(jobConf);
+      HBaseConfiguration.addHbaseResources(copyOfConf);
+      //Getting hbase delegation token in getInputSplits does not work with PIG. So need to
+      //do it here
+      if (jobConf instanceof JobConf) { //Should be the case
+        HBaseUtil.addHBaseDelegationToken(copyOfConf);
+        ((JobConf) jobConf).getCredentials().addAll(copyOfConf.getCredentials());
+      }
+
+      String outputSchema = jobConf.get(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA);
+      jobProperties.put(TableInputFormat.SCAN_COLUMNS, getScanColumns(tableInfo, outputSchema));
+
+      String serSnapshot = (String) inputJobInfo.getProperties().get(
+        HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY);
+      if (serSnapshot == null) {
+        HCatTableSnapshot snapshot =
+          HBaseRevisionManagerUtil.createSnapshot(
+            RevisionManagerConfiguration.create(copyOfConf),
+            qualifiedTableName, tableInfo);
+        jobProperties.put(HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY,
+          HCatUtil.serialize(snapshot));
+      }
+
+      //This adds it directly to the jobConf. Setting in jobProperties does not get propagated
+      //to JobConf as of now as the jobProperties is maintained per partition
+      //TODO: Remove when HCAT-308 is fixed
+      addOutputDependencyJars(jobConf);
+      jobProperties.put("tmpjars", jobConf.get("tmpjars"));
+
+    } catch (IOException e) {
+      throw new IllegalStateException("Error while configuring job properties", e);
+    }
+  }
+
+  @Override
+  public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
+    // Populate jobProperties with output table name, hbase-default.xml, hbase-site.xml, OutputJobInfo
+    // Populate RM transaction in OutputJobInfo
+    // In case of bulk mode, populate intermediate output location
+    Map<String, String> tableJobProperties = tableDesc.getJobProperties();
+    String jobString = tableJobProperties.get(HCatConstants.HCAT_KEY_OUTPUT_INFO);
+    try {
+      OutputJobInfo outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(jobString);
+      HCatTableInfo tableInfo = outputJobInfo.getTableInfo();
+      String qualifiedTableName = HBaseHCatStorageHandler.getFullyQualifiedHBaseTableName(tableInfo);
+      jobProperties.put(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, qualifiedTableName);
+      jobProperties.put(TableOutputFormat.OUTPUT_TABLE, qualifiedTableName);
+
+      Configuration jobConf = getJobConf();
+      addResources(jobConf, jobProperties);
+
+      Configuration copyOfConf = new Configuration(jobConf);
+      HBaseConfiguration.addHbaseResources(copyOfConf);
+
+      String txnString = outputJobInfo.getProperties().getProperty(
+        HBaseConstants.PROPERTY_WRITE_TXN_KEY);
+      Transaction txn = null;
+      if (txnString == null) {
+        txn = HBaseRevisionManagerUtil.beginWriteTransaction(qualifiedTableName, tableInfo,
+          RevisionManagerConfiguration.create(copyOfConf));
+        String serializedTxn = HCatUtil.serialize(txn);
+        outputJobInfo.getProperties().setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY,
+          serializedTxn);
+      } else {
+        txn = (Transaction) HCatUtil.deserialize(txnString);
+      }
+      if (isBulkMode(outputJobInfo)) {
+        String tableLocation = tableInfo.getTableLocation();
+        String location = new Path(tableLocation, "REVISION_" + txn.getRevisionNumber())
+          .toString();
+        outputJobInfo.getProperties().setProperty(PROPERTY_INT_OUTPUT_LOCATION, location);
+        // We are writing out an intermediate sequenceFile hence
+        // location is not passed in OutputJobInfo.getLocation()
+        // TODO replace this with a mapreduce constant when available
+        jobProperties.put("mapred.output.dir", location);
+        jobProperties.put("mapred.output.committer.class", HBaseBulkOutputCommitter.class.getName());
+      } else {
+        jobProperties.put("mapred.output.committer.class", HBaseDirectOutputCommitter.class.getName());
+      }
+
+      jobProperties.put(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo));
+      addOutputDependencyJars(jobConf);
+      jobProperties.put("tmpjars", jobConf.get("tmpjars"));
+
+    } catch (IOException e) {
+      throw new IllegalStateException("Error while configuring job properties", e);
+    }
+  }
+
+  /*
+  * @return instance of HiveAuthorizationProvider
+  *
+  * @throws HiveException
+  *
+  * @see org.apache.hive.hcatalog.storagehandler.HCatStorageHandler#
+  * getAuthorizationProvider()
+  */
+  @Override
+  public HiveAuthorizationProvider getAuthorizationProvider()
+    throws HiveException {
+
+    HBaseAuthorizationProvider hbaseAuth = new HBaseAuthorizationProvider();
+    hbaseAuth.init(getConf());
+    return hbaseAuth;
+  }
+
+  /*
+   * @param table
+   *
+   * @throws MetaException
+   *
+   * @see org.apache.hive.hcatalog.storagehandler.HCatStorageHandler
+   * #commitCreateTable(org.apache.hadoop.hive.metastore.api.Table)
+   */
+  @Override
+  public void commitCreateTable(Table table) throws MetaException {
+  }
+
+  /*
+   * @param instance of table
+   *
+   * @param deleteData
+   *
+   * @throws MetaException
+   *
+   * @see org.apache.hive.hcatalog.storagehandler.HCatStorageHandler
+   * #commitDropTable(org.apache.hadoop.hive.metastore.api.Table, boolean)
+   */
+  @Override
+  public void commitDropTable(Table tbl, boolean deleteData)
+    throws MetaException {
+    checkDeleteTable(tbl);
+
+  }
+
+  /*
+   * @param instance of table
+   *
+   * @throws MetaException
+   *
+   * @see org.apache.hive.hcatalog.storagehandler.HCatStorageHandler
+   * #preCreateTable(org.apache.hadoop.hive.metastore.api.Table)
+   */
+  @Override
+  public void preCreateTable(Table tbl) throws MetaException {
+    boolean isExternal = MetaStoreUtils.isExternalTable(tbl);
+
+    hbaseConf = getConf();
+
+    if (tbl.getSd().getLocation() != null) {
+      throw new MetaException("LOCATION may not be specified for HBase.");
+    }
+
+    try {
+      String tableName = getFullyQualifiedHBaseTableName(tbl);
+      String hbaseColumnsMapping = tbl.getParameters().get(
+        HBaseSerDe.HBASE_COLUMNS_MAPPING);
+
+      if (hbaseColumnsMapping == null) {
+        throw new MetaException(
+          "No hbase.columns.mapping defined in table"
+            + " properties.");
+      }
+
+      List<String> hbaseColumnFamilies = new ArrayList<String>();
+      List<String> hbaseColumnQualifiers = new ArrayList<String>();
+      List<byte[]> hbaseColumnFamiliesBytes = new ArrayList<byte[]>();
+      int iKey = HBaseUtil.parseColumnMapping(hbaseColumnsMapping,
+        hbaseColumnFamilies, hbaseColumnFamiliesBytes,
+        hbaseColumnQualifiers, null);
+
+      HTableDescriptor tableDesc;
+      Set<String> uniqueColumnFamilies = new HashSet<String>();
+      if (!getHBaseAdmin().tableExists(tableName)) {
+        // if it is not an external table then create one
+        if (!isExternal) {
+          // Create the column descriptors
+          tableDesc = new HTableDescriptor(tableName);
+          uniqueColumnFamilies.addAll(hbaseColumnFamilies);
+          uniqueColumnFamilies.remove(hbaseColumnFamilies.get(iKey));
+
+          for (String columnFamily : uniqueColumnFamilies) {
+            HColumnDescriptor familyDesc = new HColumnDescriptor(Bytes
+              .toBytes(columnFamily));
+            familyDesc.setMaxVersions(Integer.MAX_VALUE);
+            tableDesc.addFamily(familyDesc);
+          }
 
-        } catch (IOException e) {
-            throw new IllegalStateException("Error while configuring job properties", e);
-        }
-    }
-
-    @Override
-    public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
-        // Populate jobProperties with output table name, hbase-default.xml, hbase-site.xml, OutputJobInfo
-        // Populate RM transaction in OutputJobInfo
-        // In case of bulk mode, populate intermediate output location
-        Map<String, String> tableJobProperties = tableDesc.getJobProperties();
-        String jobString = tableJobProperties.get(HCatConstants.HCAT_KEY_OUTPUT_INFO);
-        try {
-            OutputJobInfo outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(jobString);
-            HCatTableInfo tableInfo = outputJobInfo.getTableInfo();
-            String qualifiedTableName = HBaseHCatStorageHandler.getFullyQualifiedHBaseTableName(tableInfo);
-            jobProperties.put(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, qualifiedTableName);
-            jobProperties.put(TableOutputFormat.OUTPUT_TABLE, qualifiedTableName);
-
-            Configuration jobConf = getJobConf();
-            addResources(jobConf, jobProperties);
-
-            Configuration copyOfConf = new Configuration(jobConf);
-            HBaseConfiguration.addHbaseResources(copyOfConf);
-
-            String txnString = outputJobInfo.getProperties().getProperty(
-                HBaseConstants.PROPERTY_WRITE_TXN_KEY);
-            Transaction txn = null;
-            if (txnString == null) {
-                txn = HBaseRevisionManagerUtil.beginWriteTransaction(qualifiedTableName, tableInfo,
-                    RevisionManagerConfiguration.create(copyOfConf));
-                String serializedTxn = HCatUtil.serialize(txn);
-                outputJobInfo.getProperties().setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY,
-                    serializedTxn);
-            } else {
-                txn = (Transaction) HCatUtil.deserialize(txnString);
-            }
-            if (isBulkMode(outputJobInfo)) {
-                String tableLocation = tableInfo.getTableLocation();
-                String location = new Path(tableLocation, "REVISION_" + txn.getRevisionNumber())
-                    .toString();
-                outputJobInfo.getProperties().setProperty(PROPERTY_INT_OUTPUT_LOCATION, location);
-                // We are writing out an intermediate sequenceFile hence
-                // location is not passed in OutputJobInfo.getLocation()
-                // TODO replace this with a mapreduce constant when available
-                jobProperties.put("mapred.output.dir", location);
-                jobProperties.put("mapred.output.committer.class", HBaseBulkOutputCommitter.class.getName());
-            } else {
-                jobProperties.put("mapred.output.committer.class", HBaseDirectOutputCommitter.class.getName());
-            }
-
-            jobProperties.put(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo));
-            addOutputDependencyJars(jobConf);
-            jobProperties.put("tmpjars", jobConf.get("tmpjars"));
-
-        } catch (IOException e) {
-            throw new IllegalStateException("Error while configuring job properties", e);
-        }
-    }
-
-    /*
-    * @return instance of HiveAuthorizationProvider
-    *
-    * @throws HiveException
-    *
-    * @see org.apache.hcatalog.storagehandler.HCatStorageHandler#
-    * getAuthorizationProvider()
-    */
-    @Override
-    public HiveAuthorizationProvider getAuthorizationProvider()
-        throws HiveException {
-
-        HBaseAuthorizationProvider hbaseAuth = new HBaseAuthorizationProvider();
-        hbaseAuth.init(getConf());
-        return hbaseAuth;
-    }
-
-    /*
-     * @param table
-     *
-     * @throws MetaException
-     *
-     * @see org.apache.hcatalog.storagehandler.HCatStorageHandler
-     * #commitCreateTable(org.apache.hadoop.hive.metastore.api.Table)
-     */
-    @Override
-    public void commitCreateTable(Table table) throws MetaException {
-    }
-
-    /*
-     * @param instance of table
-     *
-     * @param deleteData
-     *
-     * @throws MetaException
-     *
-     * @see org.apache.hcatalog.storagehandler.HCatStorageHandler
-     * #commitDropTable(org.apache.hadoop.hive.metastore.api.Table, boolean)
-     */
-    @Override
-    public void commitDropTable(Table tbl, boolean deleteData)
-        throws MetaException {
-        checkDeleteTable(tbl);
-
-    }
-
-    /*
-     * @param instance of table
-     *
-     * @throws MetaException
-     *
-     * @see org.apache.hcatalog.storagehandler.HCatStorageHandler
-     * #preCreateTable(org.apache.hadoop.hive.metastore.api.Table)
-     */
-    @Override
-    public void preCreateTable(Table tbl) throws MetaException {
-        boolean isExternal = MetaStoreUtils.isExternalTable(tbl);
-
-        hbaseConf = getConf();
-
-        if (tbl.getSd().getLocation() != null) {
-            throw new MetaException("LOCATION may not be specified for HBase.");
-        }
-
-        try {
-            String tableName = getFullyQualifiedHBaseTableName(tbl);
-            String hbaseColumnsMapping = tbl.getParameters().get(
-                HBaseSerDe.HBASE_COLUMNS_MAPPING);
-
-            if (hbaseColumnsMapping == null) {
-                throw new MetaException(
-                    "No hbase.columns.mapping defined in table"
-                        + " properties.");
-            }
-
-            List<String> hbaseColumnFamilies = new ArrayList<String>();
-            List<String> hbaseColumnQualifiers = new ArrayList<String>();
-            List<byte[]> hbaseColumnFamiliesBytes = new ArrayList<byte[]>();
-            int iKey = HBaseUtil.parseColumnMapping(hbaseColumnsMapping,
-                hbaseColumnFamilies, hbaseColumnFamiliesBytes,
-                hbaseColumnQualifiers, null);
-
-            HTableDescriptor tableDesc;
-            Set<String> uniqueColumnFamilies = new HashSet<String>();
-            if (!getHBaseAdmin().tableExists(tableName)) {
-                // if it is not an external table then create one
-                if (!isExternal) {
-                    // Create the column descriptors
-                    tableDesc = new HTableDescriptor(tableName);
-                    uniqueColumnFamilies.addAll(hbaseColumnFamilies);
-                    uniqueColumnFamilies.remove(hbaseColumnFamilies.get(iKey));
-
-                    for (String columnFamily : uniqueColumnFamilies) {
-                        HColumnDescriptor familyDesc = new HColumnDescriptor(Bytes
-                            .toBytes(columnFamily));
-                        familyDesc.setMaxVersions(Integer.MAX_VALUE);
-                        tableDesc.addFamily(familyDesc);
-                    }
-
-                    getHBaseAdmin().createTable(tableDesc);
-                } else {
-                    // an external table
-                    throw new MetaException("HBase table " + tableName
-                        + " doesn't exist while the table is "
-                        + "declared as an external table.");
-                }
-
-            } else {
-                if (!isExternal) {
-                    throw new MetaException("Table " + tableName
-                        + " already exists within HBase."
-                        + " Use CREATE EXTERNAL TABLE instead to"
-                        + " register it in HCatalog.");
-                }
-                // make sure the schema mapping is right
-                tableDesc = getHBaseAdmin().getTableDescriptor(
-                    Bytes.toBytes(tableName));
-
-                for (int i = 0; i < hbaseColumnFamilies.size(); i++) {
-                    if (i == iKey) {
-                        continue;
-                    }
-
-                    if (!tableDesc.hasFamily(hbaseColumnFamiliesBytes.get(i))) {
-                        throw new MetaException("Column Family "
-                            + hbaseColumnFamilies.get(i)
-                            + " is not defined in hbase table " + tableName);
-                    }
-                }
-            }
-
-            // ensure the table is online
-            new HTable(hbaseConf, tableDesc.getName());
-
-            //Set up table in revision manager.
-            RevisionManager rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(hbaseConf);
-            rm.createTable(tableName, new ArrayList<String>(uniqueColumnFamilies));
-
-        } catch (MasterNotRunningException mnre) {
-            throw new MetaException(StringUtils.stringifyException(mnre));
-        } catch (IOException ie) {
-            throw new MetaException(StringUtils.stringifyException(ie));
-        } catch (IllegalArgumentException iae) {
-            throw new MetaException(StringUtils.stringifyException(iae));
-        }
-
-    }
-
-    /*
-     * @param table
-     *
-     * @throws MetaException
-     *
-     * @see org.apache.hcatalog.storagehandler.HCatStorageHandler
-     * #preDropTable(org.apache.hadoop.hive.metastore.api.Table)
-     */
-    @Override
-    public void preDropTable(Table table) throws MetaException {
-    }
-
-    /*
-     * @param table
-     *
-     * @throws MetaException
-     *
-     * @see org.apache.hcatalog.storagehandler.HCatStorageHandler
-     * #rollbackCreateTable(org.apache.hadoop.hive.metastore.api.Table)
-     */
-    @Override
-    public void rollbackCreateTable(Table table) throws MetaException {
-        checkDeleteTable(table);
-    }
-
-    /*
-     * @param table
-     *
-     * @throws MetaException
-     *
-     * @see org.apache.hcatalog.storagehandler.HCatStorageHandler
-     * #rollbackDropTable(org.apache.hadoop.hive.metastore.api.Table)
-     */
-    @Override
-    public void rollbackDropTable(Table table) throws MetaException {
-    }
-
-    /*
-     * @return instance of HiveMetaHook
-     *
-     * @see org.apache.hcatalog.storagehandler.HCatStorageHandler#getMetaHook()
-     */
-    @Override
-    public HiveMetaHook getMetaHook() {
-        return this;
-    }
-
-    private HBaseAdmin getHBaseAdmin() throws MetaException {
-        try {
-            if (admin == null) {
-                admin = new HBaseAdmin(this.getConf());
-            }
-            return admin;
-        } catch (MasterNotRunningException mnre) {
-            throw new MetaException(StringUtils.stringifyException(mnre));
-        } catch (ZooKeeperConnectionException zkce) {
-            throw new MetaException(StringUtils.stringifyException(zkce));
-        }
-    }
-
-    private String getFullyQualifiedHBaseTableName(Table tbl) {
-        String tableName = tbl.getParameters().get(HBaseSerDe.HBASE_TABLE_NAME);
-        if (tableName == null) {
-            tableName = tbl.getSd().getSerdeInfo().getParameters()
-                .get(HBaseSerDe.HBASE_TABLE_NAME);
-        }
-        if (tableName == null) {
-            if (tbl.getDbName().equals(MetaStoreUtils.DEFAULT_DATABASE_NAME)) {
-                tableName = tbl.getTableName();
-            } else {
-                tableName = tbl.getDbName() + "." + tbl.getTableName();
-            }
-            tableName = tableName.toLowerCase();
-        }
-        return tableName;
-    }
-
-    static String getFullyQualifiedHBaseTableName(HCatTableInfo tableInfo) {
-        String qualifiedName = tableInfo.getStorerInfo().getProperties()
-            .getProperty(HBaseSerDe.HBASE_TABLE_NAME);
-        if (qualifiedName == null) {
-            String databaseName = tableInfo.getDatabaseName();
-            String tableName = tableInfo.getTableName();
-            if ((databaseName == null)
-                || (databaseName.equals(MetaStoreUtils.DEFAULT_DATABASE_NAME))) {
-                qualifiedName = tableName;
-            } else {
-                qualifiedName = databaseName + "." + tableName;
-            }
-            qualifiedName = qualifiedName.toLowerCase();
-        }
-        return qualifiedName;
-    }
-
-    @Override
-    public Class<? extends InputFormat> getInputFormatClass() {
-        return HBaseInputFormat.class;
-    }
-
-    @Override
-    public Class<? extends OutputFormat> getOutputFormatClass() {
-        return HBaseBaseOutputFormat.class;
-    }
-
-    /*
-    * @return subclass of SerDe
-    *
-    * @throws UnsupportedOperationException
-    *
-    * @see
-    * org.apache.hcatalog.storagehandler.HCatStorageHandler#getSerDeClass()
-    */
-    @Override
-    public Class<? extends SerDe> getSerDeClass()
-        throws UnsupportedOperationException {
-        return HBaseSerDe.class;
-    }
-
-    public Configuration getJobConf() {
-        return jobConf;
-    }
-
-    @Override
-    public Configuration getConf() {
-
-        if (hbaseConf == null) {
-            hbaseConf = HBaseConfiguration.create();
-        }
-        return hbaseConf;
-    }
-
-    @Override
-    public void setConf(Configuration conf) {
-        //setConf is called both during DDL operations and  mapred read/write jobs.
-        //Creating a copy of conf for DDL and adding hbase-default and hbase-site.xml to it.
-        //For jobs, maintaining a reference instead of cloning as we need to
-        //  1) add hbase delegation token to the Credentials.
-        //  2) set tmpjars on it. Putting in jobProperties does not get propagated to JobConf
-        //     in case of InputFormat as they are maintained per partition.
-        //Not adding hbase-default.xml and hbase-site.xml to jobConf as it will override any
-        //hbase properties set in the JobConf by the user. In configureInputJobProperties and
-        //configureOutputJobProperties, we take care of adding the default properties
-        //that are not already present. TODO: Change to a copy for jobs after HCAT-308 is fixed.
-        jobConf = conf;
-        hbaseConf = RevisionManagerConfiguration.create(HBaseConfiguration.create(conf));
-    }
-
-    private void checkDeleteTable(Table table) throws MetaException {
-        boolean isExternal = MetaStoreUtils.isExternalTable(table);
-        String tableName = getFullyQualifiedHBaseTableName(table);
-        RevisionManager rm = null;
-        try {
-            if (!isExternal && getHBaseAdmin().tableExists(tableName)) {
-                // we have created an HBase table, so we delete it to roll back;
-                if (getHBaseAdmin().isTableEnabled(tableName)) {
-                    getHBaseAdmin().disableTable(tableName);
-                }
-                getHBaseAdmin().deleteTable(tableName);
-
-                //Drop table in revision manager.
-                rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(hbaseConf);
-                rm.dropTable(tableName);
-            }
-        } catch (IOException ie) {
-            throw new MetaException(StringUtils.stringifyException(ie));
-        } finally {
-            HBaseRevisionManagerUtil.closeRevisionManagerQuietly(rm);
-        }
-    }
-
-    /**
-     * Helper method for users to add the required depedency jars to distributed cache.
-     * @param conf
-     * @throws IOException
-     */
-    private void addOutputDependencyJars(Configuration conf) throws IOException {
-        TableMapReduceUtil.addDependencyJars(conf,
-            //ZK
-            ZooKeeper.class,
-            //HBase
-            HTable.class,
-            //Hive
-            HiveException.class,
-            //HCatalog jar
-            HCatOutputFormat.class,
-            //hcat hbase storage handler jar
-            HBaseHCatStorageHandler.class,
-            //hive hbase storage handler jar
-            HBaseSerDe.class,
-            //hive jar
-            Table.class,
-            //libthrift jar
-            TBase.class,
-            //hbase jar
-            Bytes.class,
-            //thrift-fb303 .jar
-            FacebookBase.class,
-            //guava jar
-            ThreadFactoryBuilder.class);
-    }
-
-    /**
-     * Utility method to add hbase-default.xml and hbase-site.xml properties to a new map
-     * if they are not already present in the jobConf.
-     * @param jobConf Job configuration
-     * @param newJobProperties  Map to which new properties should be added
-     */
-    private void addResources(Configuration jobConf,
-                              Map<String, String> newJobProperties) {
-        Configuration conf = new Configuration(false);
-        HBaseConfiguration.addHbaseResources(conf);
-        RevisionManagerConfiguration.addResources(conf);
-        for (Entry<String, String> entry : conf) {
-            if (jobConf.get(entry.getKey()) == null)
-                newJobProperties.put(entry.getKey(), entry.getValue());
-        }
-    }
-
-    public static boolean isBulkMode(OutputJobInfo outputJobInfo) {
-        //Default is false
-        String bulkMode = outputJobInfo.getTableInfo().getStorerInfo().getProperties()
-            .getProperty(HBaseConstants.PROPERTY_BULK_OUTPUT_MODE_KEY,
-                "false");
-        return "true".equals(bulkMode);
-    }
-
-    private String getScanColumns(HCatTableInfo tableInfo, String outputColSchema) throws IOException {
-        StringBuilder builder = new StringBuilder();
-        String hbaseColumnMapping = tableInfo.getStorerInfo().getProperties()
-            .getProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING);
-        if (outputColSchema == null) {
-            String[] splits = hbaseColumnMapping.split("[,]");
-            for (int i = 0; i < splits.length; i++) {
-                if (!splits[i].equals(HBaseSerDe.HBASE_KEY_COL))
-                    builder.append(splits[i]).append(" ");
-            }
+          getHBaseAdmin().createTable(tableDesc);
         } else {
-            HCatSchema outputSchema = (HCatSchema) HCatUtil.deserialize(outputColSchema);
-            HCatSchema tableSchema = tableInfo.getDataColumns();
-            List<String> outputFieldNames = outputSchema.getFieldNames();
-            List<Integer> outputColumnMapping = new ArrayList<Integer>();
-            for (String fieldName : outputFieldNames) {
-                int position = tableSchema.getPosition(fieldName);
-                outputColumnMapping.add(position);
-            }
-            List<String> columnFamilies = new ArrayList<String>();
-            List<String> columnQualifiers = new ArrayList<String>();
-            HBaseUtil.parseColumnMapping(hbaseColumnMapping, columnFamilies, null,
-                columnQualifiers, null);
-            for (int i = 0; i < outputColumnMapping.size(); i++) {
-                int cfIndex = outputColumnMapping.get(i);
-                String cf = columnFamilies.get(cfIndex);
-                // We skip the key column.
-                if (cf.equals(HBaseSerDe.HBASE_KEY_COL) == false) {
-                    String qualifier = columnQualifiers.get(i);
-                    builder.append(cf);
-                    builder.append(":");
-                    if (qualifier != null) {
-                        builder.append(qualifier);
-                    }
-                    builder.append(" ");
-                }
-            }
-        }
-        //Remove the extra space delimiter
-        builder.deleteCharAt(builder.length() - 1);
-        return builder.toString();
-    }
+          // an external table
+          throw new MetaException("HBase table " + tableName
+            + " doesn't exist while the table is "
+            + "declared as an external table.");
+        }
+
+      } else {
+        if (!isExternal) {
+          throw new MetaException("Table " + tableName
+            + " already exists within HBase."
+            + " Use CREATE EXTERNAL TABLE instead to"
+            + " register it in HCatalog.");
+        }
+        // make sure the schema mapping is right
+        tableDesc = getHBaseAdmin().getTableDescriptor(
+          Bytes.toBytes(tableName));
+
+        for (int i = 0; i < hbaseColumnFamilies.size(); i++) {
+          if (i == iKey) {
+            continue;
+          }
+
+          if (!tableDesc.hasFamily(hbaseColumnFamiliesBytes.get(i))) {
+            throw new MetaException("Column Family "
+              + hbaseColumnFamilies.get(i)
+              + " is not defined in hbase table " + tableName);
+          }
+        }
+      }
+
+      // ensure the table is online
+      new HTable(hbaseConf, tableDesc.getName());
+
+      //Set up table in revision manager.
+      RevisionManager rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(hbaseConf);
+      rm.createTable(tableName, new ArrayList<String>(uniqueColumnFamilies));
+
+    } catch (MasterNotRunningException mnre) {
+      throw new MetaException(StringUtils.stringifyException(mnre));
+    } catch (IOException ie) {
+      throw new MetaException(StringUtils.stringifyException(ie));
+    } catch (IllegalArgumentException iae) {
+      throw new MetaException(StringUtils.stringifyException(iae));
+    }
+
+  }
+
+  /*
+   * @param table
+   *
+   * @throws MetaException
+   *
+   * @see org.apache.hive.hcatalog.storagehandler.HCatStorageHandler
+   * #preDropTable(org.apache.hadoop.hive.metastore.api.Table)
+   */
+  @Override
+  public void preDropTable(Table table) throws MetaException {
+  }
+
+  /*
+   * @param table
+   *
+   * @throws MetaException
+   *
+   * @see org.apache.hive.hcatalog.storagehandler.HCatStorageHandler
+   * #rollbackCreateTable(org.apache.hadoop.hive.metastore.api.Table)
+   */
+  @Override
+  public void rollbackCreateTable(Table table) throws MetaException {
+    checkDeleteTable(table);
+  }
+
+  /*
+   * @param table
+   *
+   * @throws MetaException
+   *
+   * @see org.apache.hive.hcatalog.storagehandler.HCatStorageHandler
+   * #rollbackDropTable(org.apache.hadoop.hive.metastore.api.Table)
+   */
+  @Override
+  public void rollbackDropTable(Table table) throws MetaException {
+  }
+
+  /*
+   * @return instance of HiveMetaHook
+   *
+   * @see org.apache.hive.hcatalog.storagehandler.HCatStorageHandler#getMetaHook()
+   */
+  @Override
+  public HiveMetaHook getMetaHook() {
+    return this;
+  }
+
+  private HBaseAdmin getHBaseAdmin() throws MetaException {
+    try {
+      if (admin == null) {
+        admin = new HBaseAdmin(this.getConf());
+      }
+      return admin;
+    } catch (MasterNotRunningException mnre) {
+      throw new MetaException(StringUtils.stringifyException(mnre));
+    } catch (ZooKeeperConnectionException zkce) {
+      throw new MetaException(StringUtils.stringifyException(zkce));
+    }
+  }
+
+  private String getFullyQualifiedHBaseTableName(Table tbl) {
+    String tableName = tbl.getParameters().get(HBaseSerDe.HBASE_TABLE_NAME);
+    if (tableName == null) {
+      tableName = tbl.getSd().getSerdeInfo().getParameters()
+        .get(HBaseSerDe.HBASE_TABLE_NAME);
+    }
+    if (tableName == null) {
+      if (tbl.getDbName().equals(MetaStoreUtils.DEFAULT_DATABASE_NAME)) {
+        tableName = tbl.getTableName();
+      } else {
+        tableName = tbl.getDbName() + "." + tbl.getTableName();
+      }
+      tableName = tableName.toLowerCase();
+    }
+    return tableName;
+  }
+
+  static String getFullyQualifiedHBaseTableName(HCatTableInfo tableInfo) {
+    String qualifiedName = tableInfo.getStorerInfo().getProperties()
+      .getProperty(HBaseSerDe.HBASE_TABLE_NAME);
+    if (qualifiedName == null) {
+      String databaseName = tableInfo.getDatabaseName();
+      String tableName = tableInfo.getTableName();
+      if ((databaseName == null)
+        || (databaseName.equals(MetaStoreUtils.DEFAULT_DATABASE_NAME))) {
+        qualifiedName = tableName;
+      } else {
+        qualifiedName = databaseName + "." + tableName;
+      }
+      qualifiedName = qualifiedName.toLowerCase();
+    }
+    return qualifiedName;
+  }
+
+  @Override
+  public Class<? extends InputFormat> getInputFormatClass() {
+    return HBaseInputFormat.class;
+  }
+
+  @Override
+  public Class<? extends OutputFormat> getOutputFormatClass() {
+    return HBaseBaseOutputFormat.class;
+  }
+
+  /*
+  * @return subclass of SerDe
+  *
+  * @throws UnsupportedOperationException
+  *
+  * @see
+  * org.apache.hive.hcatalog.storagehandler.HCatStorageHandler#getSerDeClass()
+  */
+  @Override
+  public Class<? extends SerDe> getSerDeClass()
+    throws UnsupportedOperationException {
+    return HBaseSerDe.class;
+  }
+
+  public Configuration getJobConf() {
+    return jobConf;
+  }
+
+  @Override
+  public Configuration getConf() {
+
+    if (hbaseConf == null) {
+      hbaseConf = HBaseConfiguration.create();
+    }
+    return hbaseConf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    //setConf is called both during DDL operations and  mapred read/write jobs.
+    //Creating a copy of conf for DDL and adding hbase-default and hbase-site.xml to it.
+    //For jobs, maintaining a reference instead of cloning as we need to
+    //  1) add hbase delegation token to the Credentials.
+    //  2) set tmpjars on it. Putting in jobProperties does not get propagated to JobConf
+    //     in case of InputFormat as they are maintained per partition.
+    //Not adding hbase-default.xml and hbase-site.xml to jobConf as it will override any
+    //hbase properties set in the JobConf by the user. In configureInputJobProperties and
+    //configureOutputJobProperties, we take care of adding the default properties
+    //that are not already present. TODO: Change to a copy for jobs after HCAT-308 is fixed.
+    jobConf = conf;
+    hbaseConf = RevisionManagerConfiguration.create(HBaseConfiguration.create(conf));
+  }
+
+  private void checkDeleteTable(Table table) throws MetaException {
+    boolean isExternal = MetaStoreUtils.isExternalTable(table);
+    String tableName = getFullyQualifiedHBaseTableName(table);
+    RevisionManager rm = null;
+    try {
+      if (!isExternal && getHBaseAdmin().tableExists(tableName)) {
+        // we have created an HBase table, so we delete it to roll back;
+        if (getHBaseAdmin().isTableEnabled(tableName)) {
+          getHBaseAdmin().disableTable(tableName);
+        }
+        getHBaseAdmin().deleteTable(tableName);
+
+        //Drop table in revision manager.
+        rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(hbaseConf);
+        rm.dropTable(tableName);
+      }
+    } catch (IOException ie) {
+      throw new MetaException(StringUtils.stringifyException(ie));
+    } finally {
+      HBaseRevisionManagerUtil.closeRevisionManagerQuietly(rm);
+    }
+  }
+
+  /**
+   * Helper method for users to add the required depedency jars to distributed cache.
+   * @param conf
+   * @throws IOException
+   */
+  private void addOutputDependencyJars(Configuration conf) throws IOException {
+    TableMapReduceUtil.addDependencyJars(conf,
+      //ZK
+      ZooKeeper.class,
+      //HBase
+      HTable.class,
+      //Hive
+      HiveException.class,
+      //HCatalog jar
+      HCatOutputFormat.class,
+      //hcat hbase storage handler jar
+      HBaseHCatStorageHandler.class,
+      //hive hbase storage handler jar
+      HBaseSerDe.class,
+      //hive jar
+      Table.class,
+      //libthrift jar
+      TBase.class,
+      //hbase jar
+      Bytes.class,
+      //thrift-fb303 .jar
+      FacebookBase.class,
+      //guava jar
+      ThreadFactoryBuilder.class);
+  }
+
+  /**
+   * Utility method to add hbase-default.xml and hbase-site.xml properties to a new map
+   * if they are not already present in the jobConf.
+   * @param jobConf Job configuration
+   * @param newJobProperties  Map to which new properties should be added
+   */
+  private void addResources(Configuration jobConf,
+                Map<String, String> newJobProperties) {
+    Configuration conf = new Configuration(false);
+    HBaseConfiguration.addHbaseResources(conf);
+    RevisionManagerConfiguration.addResources(conf);
+    for (Entry<String, String> entry : conf) {
+      if (jobConf.get(entry.getKey()) == null)
+        newJobProperties.put(entry.getKey(), entry.getValue());
+    }
+  }
+
+  public static boolean isBulkMode(OutputJobInfo outputJobInfo) {
+    //Default is false
+    String bulkMode = outputJobInfo.getTableInfo().getStorerInfo().getProperties()
+      .getProperty(HBaseConstants.PROPERTY_BULK_OUTPUT_MODE_KEY,
+        "false");
+    return "true".equals(bulkMode);
+  }
+
+  private String getScanColumns(HCatTableInfo tableInfo, String outputColSchema) throws IOException {
+    StringBuilder builder = new StringBuilder();
+    String hbaseColumnMapping = tableInfo.getStorerInfo().getProperties()
+      .getProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING);
+    if (outputColSchema == null) {
+      String[] splits = hbaseColumnMapping.split("[,]");
+      for (int i = 0; i < splits.length; i++) {
+        if (!splits[i].equals(HBaseSerDe.HBASE_KEY_COL))
+          builder.append(splits[i]).append(" ");
+      }
+    } else {
+      HCatSchema outputSchema = (HCatSchema) HCatUtil.deserialize(outputColSchema);
+      HCatSchema tableSchema = tableInfo.getDataColumns();
+      List<String> outputFieldNames = outputSchema.getFieldNames();
+      List<Integer> outputColumnMapping = new ArrayList<Integer>();
+      for (String fieldName : outputFieldNames) {
+        int position = tableSchema.getPosition(fieldName);
+        outputColumnMapping.add(position);
+      }
+      List<String> columnFamilies = new ArrayList<String>();
+      List<String> columnQualifiers = new ArrayList<String>();
+      HBaseUtil.parseColumnMapping(hbaseColumnMapping, columnFamilies, null,
+        columnQualifiers, null);
+      for (int i = 0; i < outputColumnMapping.size(); i++) {
+        int cfIndex = outputColumnMapping.get(i);
+        String cf = columnFamilies.get(cfIndex);
+        // We skip the key column.
+        if (cf.equals(HBaseSerDe.HBASE_KEY_COL) == false) {
+          String qualifier = columnQualifiers.get(i);
+          builder.append(cf);
+          builder.append(":");
+          if (qualifier != null) {
+            builder.append(qualifier);
+          }
+          builder.append(" ");
+        }
+      }
+    }
+    //Remove the extra space delimiter
+    builder.deleteCharAt(builder.length() - 1);
+    return builder.toString();
+  }
 
 }

Modified: hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java (original)
+++ hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java Thu Sep 12 01:21:10 2013
@@ -28,99 +28,99 @@ import org.apache.hadoop.hbase.client.Sc
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapred.TableSplit;
 import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
-import org.apache.hadoop.mapred.HCatMapRedUtil;
+import org.apache.hive.hcatalog.mapreduce.HCatMapRedUtil;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hcatalog.common.HCatConstants;
-import org.apache.hcatalog.common.HCatUtil;
-import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.apache.hive.hcatalog.common.HCatConstants;
+import org.apache.hive.hcatalog.common.HCatUtil;
+import org.apache.hive.hcatalog.mapreduce.InputJobInfo;
 
 /**
  * This class HBaseInputFormat is a wrapper class of TableInputFormat in HBase.
  */
 class HBaseInputFormat implements InputFormat<ImmutableBytesWritable, Result> {
 
-    private final TableInputFormat inputFormat;
+  private final TableInputFormat inputFormat;
 
-    public HBaseInputFormat() {
-        inputFormat = new TableInputFormat();
-    }
-
-    /*
-     * @param instance of InputSplit
-     *
-     * @param instance of TaskAttemptContext
-     *
-     * @return RecordReader
-     *
-     * @throws IOException
-     *
-     * @throws InterruptedException
-     *
-     * @see
-     * org.apache.hadoop.mapreduce.InputFormat#createRecordReader(org.apache
-     * .hadoop.mapreduce.InputSplit,
-     * org.apache.hadoop.mapreduce.TaskAttemptContext)
-     */
-    @Override
-    public RecordReader<ImmutableBytesWritable, Result> getRecordReader(
-        InputSplit split, JobConf job, Reporter reporter)
-        throws IOException {
-        String jobString = job.get(HCatConstants.HCAT_KEY_JOB_INFO);
-        InputJobInfo inputJobInfo = (InputJobInfo) HCatUtil.deserialize(jobString);
-
-        String tableName = job.get(TableInputFormat.INPUT_TABLE);
-        TableSplit tSplit = (TableSplit) split;
-        HbaseSnapshotRecordReader recordReader = new HbaseSnapshotRecordReader(inputJobInfo, job);
-        inputFormat.setConf(job);
-        Scan inputScan = inputFormat.getScan();
-        // TODO: Make the caching configurable by the user
-        inputScan.setCaching(200);
-        inputScan.setCacheBlocks(false);
-        Scan sc = new Scan(inputScan);
-        sc.setStartRow(tSplit.getStartRow());
-        sc.setStopRow(tSplit.getEndRow());
-        recordReader.setScan(sc);
-        recordReader.setHTable(new HTable(job, tableName));
-        recordReader.init();
-        return recordReader;
-    }
-
-    /*
-     * @param jobContext
-     *
-     * @return List of InputSplit
-     *
-     * @throws IOException
-     *
-     * @throws InterruptedException
-     *
-     * @see
-     * org.apache.hadoop.mapreduce.InputFormat#getSplits(org.apache.hadoop.mapreduce
-     * .JobContext)
-     */
-    @Override
-    public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job, int numSplits)
-        throws IOException {
-        inputFormat.setConf(job);
-        return convertSplits(inputFormat.getSplits(HCatMapRedUtil.createJobContext(job, null,
-            Reporter.NULL)));
-    }
-
-    private InputSplit[] convertSplits(List<org.apache.hadoop.mapreduce.InputSplit> splits) {
-        InputSplit[] converted = new InputSplit[splits.size()];
-        for (int i = 0; i < splits.size(); i++) {
-            org.apache.hadoop.hbase.mapreduce.TableSplit tableSplit =
-                (org.apache.hadoop.hbase.mapreduce.TableSplit) splits.get(i);
-            TableSplit newTableSplit = new TableSplit(tableSplit.getTableName(),
-                tableSplit.getStartRow(),
-                tableSplit.getEndRow(), tableSplit.getRegionLocation());
-            converted[i] = newTableSplit;
-        }
-        return converted;
+  public HBaseInputFormat() {
+    inputFormat = new TableInputFormat();
+  }
+
+  /*
+   * @param instance of InputSplit
+   *
+   * @param instance of TaskAttemptContext
+   *
+   * @return RecordReader
+   *
+   * @throws IOException
+   *
+   * @throws InterruptedException
+   *
+   * @see
+   * org.apache.hadoop.mapreduce.InputFormat#createRecordReader(org.apache
+   * .hadoop.mapreduce.InputSplit,
+   * org.apache.hadoop.mapreduce.TaskAttemptContext)
+   */
+  @Override
+  public RecordReader<ImmutableBytesWritable, Result> getRecordReader(
+    InputSplit split, JobConf job, Reporter reporter)
+    throws IOException {
+    String jobString = job.get(HCatConstants.HCAT_KEY_JOB_INFO);
+    InputJobInfo inputJobInfo = (InputJobInfo) HCatUtil.deserialize(jobString);
+
+    String tableName = job.get(TableInputFormat.INPUT_TABLE);
+    TableSplit tSplit = (TableSplit) split;
+    HbaseSnapshotRecordReader recordReader = new HbaseSnapshotRecordReader(inputJobInfo, job);
+    inputFormat.setConf(job);
+    Scan inputScan = inputFormat.getScan();
+    // TODO: Make the caching configurable by the user
+    inputScan.setCaching(200);
+    inputScan.setCacheBlocks(false);
+    Scan sc = new Scan(inputScan);
+    sc.setStartRow(tSplit.getStartRow());
+    sc.setStopRow(tSplit.getEndRow());
+    recordReader.setScan(sc);
+    recordReader.setHTable(new HTable(job, tableName));
+    recordReader.init();
+    return recordReader;
+  }
+
+  /*
+   * @param jobContext
+   *
+   * @return List of InputSplit
+   *
+   * @throws IOException
+   *
+   * @throws InterruptedException
+   *
+   * @see
+   * org.apache.hadoop.mapreduce.InputFormat#getSplits(org.apache.hadoop.mapreduce
+   * .JobContext)
+   */
+  @Override
+  public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job, int numSplits)
+    throws IOException {
+    inputFormat.setConf(job);
+    return convertSplits(inputFormat.getSplits(HCatMapRedUtil.createJobContext(job, null,
+      Reporter.NULL)));
+  }
+
+  private InputSplit[] convertSplits(List<org.apache.hadoop.mapreduce.InputSplit> splits) {
+    InputSplit[] converted = new InputSplit[splits.size()];
+    for (int i = 0; i < splits.size(); i++) {
+      org.apache.hadoop.hbase.mapreduce.TableSplit tableSplit =
+        (org.apache.hadoop.hbase.mapreduce.TableSplit) splits.get(i);
+      TableSplit newTableSplit = new TableSplit(tableSplit.getTableName(),
+        tableSplit.getStartRow(),
+        tableSplit.getEndRow(), tableSplit.getRegionLocation());
+      converted[i] = newTableSplit;
     }
+    return converted;
+  }
 
 }

Modified: hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseRevisionManagerUtil.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseRevisionManagerUtil.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseRevisionManagerUtil.java (original)
+++ hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseRevisionManagerUtil.java Thu Sep 12 01:21:10 2013
@@ -29,18 +29,18 @@ import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.hbase.HBaseSerDe;
-import org.apache.hcatalog.common.HCatConstants;
-import org.apache.hcatalog.common.HCatUtil;
-import org.apache.hcatalog.data.schema.HCatFieldSchema;
-import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hive.hcatalog.common.HCatConstants;
+import org.apache.hive.hcatalog.common.HCatUtil;
+import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hive.hcatalog.data.schema.HCatSchema;
 import org.apache.hcatalog.hbase.snapshot.RevisionManager;
 import org.apache.hcatalog.hbase.snapshot.RevisionManagerFactory;
 import org.apache.hcatalog.hbase.snapshot.TableSnapshot;
 import org.apache.hcatalog.hbase.snapshot.Transaction;
-import org.apache.hcatalog.mapreduce.HCatTableInfo;
-import org.apache.hcatalog.mapreduce.InputJobInfo;
-import org.apache.hcatalog.mapreduce.OutputJobInfo;
-import org.apache.hcatalog.mapreduce.StorerInfo;
+import org.apache.hive.hcatalog.mapreduce.HCatTableInfo;
+import org.apache.hive.hcatalog.mapreduce.InputJobInfo;
+import org.apache.hive.hcatalog.mapreduce.OutputJobInfo;
+import org.apache.hive.hcatalog.mapreduce.StorerInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,207 +51,207 @@ import org.slf4j.LoggerFactory;
  */
 class HBaseRevisionManagerUtil {
 
-    private final static Logger LOG = LoggerFactory.getLogger(HBaseRevisionManagerUtil.class);
+  private final static Logger LOG = LoggerFactory.getLogger(HBaseRevisionManagerUtil.class);
 
-    private HBaseRevisionManagerUtil() {
-    }
-
-    /**
-     * Creates the latest snapshot of the table.
-     *
-     * @param jobConf The job configuration.
-     * @param hbaseTableName The fully qualified name of the HBase table.
-     * @param tableInfo HCat table information
-     * @return An instance of HCatTableSnapshot
-     * @throws IOException Signals that an I/O exception has occurred.
-     */
-    static HCatTableSnapshot createSnapshot(Configuration jobConf,
-                                            String hbaseTableName, HCatTableInfo tableInfo) throws IOException {
-
-        RevisionManager rm = null;
-        TableSnapshot snpt;
-        try {
-            rm = getOpenedRevisionManager(jobConf);
-            snpt = rm.createSnapshot(hbaseTableName);
-        } finally {
-            closeRevisionManagerQuietly(rm);
-        }
-
-        HCatTableSnapshot hcatSnapshot = HBaseRevisionManagerUtil.convertSnapshot(snpt, tableInfo);
-        return hcatSnapshot;
-    }
-
-    /**
-     * Creates the snapshot using the revision specified by the user.
-     *
-     * @param jobConf The job configuration.
-     * @param tableName The fully qualified name of the table whose snapshot is being taken.
-     * @param revision The revision number to use for the snapshot.
-     * @return An instance of HCatTableSnapshot.
-     * @throws IOException Signals that an I/O exception has occurred.
-     */
-    static HCatTableSnapshot createSnapshot(Configuration jobConf,
-                                            String tableName, long revision)
-        throws IOException {
-
-        TableSnapshot snpt;
-        RevisionManager rm = null;
-        try {
-            rm = getOpenedRevisionManager(jobConf);
-            snpt = rm.createSnapshot(tableName, revision);
-        } finally {
-            closeRevisionManagerQuietly(rm);
-        }
-
-        String inputJobString = jobConf.get(HCatConstants.HCAT_KEY_JOB_INFO);
-        if (inputJobString == null) {
-            throw new IOException(
-                "InputJobInfo information not found in JobContext. "
-                    + "HCatInputFormat.setInput() not called?");
-        }
-        InputJobInfo inputInfo = (InputJobInfo) HCatUtil.deserialize(inputJobString);
-        HCatTableSnapshot hcatSnapshot = HBaseRevisionManagerUtil
-            .convertSnapshot(snpt, inputInfo.getTableInfo());
-
-        return hcatSnapshot;
-    }
-
-    /**
-     * Gets an instance of revision manager which is opened.
-     *
-     * @param jobConf The job configuration.
-     * @return RevisionManager An instance of revision manager.
-     * @throws IOException
-     */
-    static RevisionManager getOpenedRevisionManager(Configuration jobConf) throws IOException {
-        return RevisionManagerFactory.getOpenedRevisionManager(jobConf);
-    }
-
-    static void closeRevisionManagerQuietly(RevisionManager rm) {
-        if (rm != null) {
-            try {
-                rm.close();
-            } catch (IOException e) {
-                LOG.warn("Error while trying to close revision manager", e);
-            }
-        }
-    }
-
-
-    static HCatTableSnapshot convertSnapshot(TableSnapshot hbaseSnapshot,
-                                             HCatTableInfo hcatTableInfo) throws IOException {
-
-        HCatSchema hcatTableSchema = hcatTableInfo.getDataColumns();
-        Map<String, String> hcatHbaseColMap = getHCatHBaseColumnMapping(hcatTableInfo);
-        HashMap<String, Long> revisionMap = new HashMap<String, Long>();
-
-        for (HCatFieldSchema fSchema : hcatTableSchema.getFields()) {
-            if (hcatHbaseColMap.containsKey(fSchema.getName())) {
-                String colFamily = hcatHbaseColMap.get(fSchema.getName());
-                long revisionID = hbaseSnapshot.getRevision(colFamily);
-                revisionMap.put(fSchema.getName(), revisionID);
-            }
-        }
-
-        HCatTableSnapshot hcatSnapshot = new HCatTableSnapshot(
-            hcatTableInfo.getDatabaseName(), hcatTableInfo.getTableName(), revisionMap, hbaseSnapshot.getLatestRevision());
-        return hcatSnapshot;
-    }
-
-    static TableSnapshot convertSnapshot(HCatTableSnapshot hcatSnapshot,
-                                         HCatTableInfo hcatTableInfo) throws IOException {
-
-        HCatSchema hcatTableSchema = hcatTableInfo.getDataColumns();
-        Map<String, Long> revisionMap = new HashMap<String, Long>();
-        Map<String, String> hcatHbaseColMap = getHCatHBaseColumnMapping(hcatTableInfo);
-        for (HCatFieldSchema fSchema : hcatTableSchema.getFields()) {
-            String colFamily = hcatHbaseColMap.get(fSchema.getName());
-            if (hcatSnapshot.containsColumn(fSchema.getName())) {
-                long revision = hcatSnapshot.getRevision(fSchema.getName());
-                revisionMap.put(colFamily, revision);
-            }
-        }
-
-        String fullyQualifiedName = hcatSnapshot.getDatabaseName() + "."
-            + hcatSnapshot.getTableName();
-        return new TableSnapshot(fullyQualifiedName, revisionMap, hcatSnapshot.getLatestRevision());
-
-    }
-
-    /**
-     * Begins a transaction in the revision manager for the given table.
-     * @param qualifiedTableName Name of the table
-     * @param tableInfo HCat Table information
-     * @param jobConf Job Configuration
-     * @return The new transaction in revision manager
-     * @throws IOException
-     */
-    static Transaction beginWriteTransaction(String qualifiedTableName,
-                                             HCatTableInfo tableInfo, Configuration jobConf) throws IOException {
-        Transaction txn;
-        RevisionManager rm = null;
-        try {
-            rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(jobConf);
-            String hBaseColumns = tableInfo.getStorerInfo().getProperties()
-                .getProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING);
-            String[] splits = hBaseColumns.split("[,:]");
-            Set<String> families = new HashSet<String>();
-            for (int i = 0; i < splits.length; i += 2) {
-                if (!splits[i].isEmpty())
-                    families.add(splits[i]);
-            }
-            txn = rm.beginWriteTransaction(qualifiedTableName, new ArrayList<String>(families));
-        } finally {
-            HBaseRevisionManagerUtil.closeRevisionManagerQuietly(rm);
-        }
-        return txn;
-    }
-
-    static Transaction getWriteTransaction(Configuration conf) throws IOException {
-        OutputJobInfo outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(conf.get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
-        return (Transaction) HCatUtil.deserialize(outputJobInfo.getProperties()
-            .getProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY));
-    }
-
-    static void setWriteTransaction(Configuration conf, Transaction txn) throws IOException {
-        OutputJobInfo outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(conf.get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
-        outputJobInfo.getProperties().setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY, HCatUtil.serialize(txn));
-        conf.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo));
-    }
-
-    /**
-     * Get the Revision number that will be assigned to this job's output data
-     * @param conf configuration of the job
-     * @return the revision number used
-     * @throws IOException
-     */
-    static long getOutputRevision(Configuration conf) throws IOException {
-        return getWriteTransaction(conf).getRevisionNumber();
-    }
-
-    private static Map<String, String> getHCatHBaseColumnMapping(HCatTableInfo hcatTableInfo)
-        throws IOException {
-
-        HCatSchema hcatTableSchema = hcatTableInfo.getDataColumns();
-        StorerInfo storeInfo = hcatTableInfo.getStorerInfo();
-        String hbaseColumnMapping = storeInfo.getProperties().getProperty(
-            HBaseSerDe.HBASE_COLUMNS_MAPPING);
-
-        Map<String, String> hcatHbaseColMap = new HashMap<String, String>();
-        List<String> columnFamilies = new ArrayList<String>();
-        List<String> columnQualifiers = new ArrayList<String>();
-        HBaseUtil.parseColumnMapping(hbaseColumnMapping, columnFamilies,
-            null, columnQualifiers, null);
-
-        for (HCatFieldSchema column : hcatTableSchema.getFields()) {
-            int fieldPos = hcatTableSchema.getPosition(column.getName());
-            String colFamily = columnFamilies.get(fieldPos);
-            if (colFamily.equals(HBaseSerDe.HBASE_KEY_COL) == false) {
-                hcatHbaseColMap.put(column.getName(), colFamily);
-            }
-        }
+  private HBaseRevisionManagerUtil() {
+  }
 
-        return hcatHbaseColMap;
+  /**
+   * Creates the latest snapshot of the table.
+   *
+   * @param jobConf The job configuration.
+   * @param hbaseTableName The fully qualified name of the HBase table.
+   * @param tableInfo HCat table information
+   * @return An instance of HCatTableSnapshot
+   * @throws IOException Signals that an I/O exception has occurred.
+   */
+  static HCatTableSnapshot createSnapshot(Configuration jobConf,
+                      String hbaseTableName, HCatTableInfo tableInfo) throws IOException {
+
+    RevisionManager rm = null;
+    TableSnapshot snpt;
+    try {
+      rm = getOpenedRevisionManager(jobConf);
+      snpt = rm.createSnapshot(hbaseTableName);
+    } finally {
+      closeRevisionManagerQuietly(rm);
+    }
+
+    HCatTableSnapshot hcatSnapshot = HBaseRevisionManagerUtil.convertSnapshot(snpt, tableInfo);
+    return hcatSnapshot;
+  }
+
+  /**
+   * Creates the snapshot using the revision specified by the user.
+   *
+   * @param jobConf The job configuration.
+   * @param tableName The fully qualified name of the table whose snapshot is being taken.
+   * @param revision The revision number to use for the snapshot.
+   * @return An instance of HCatTableSnapshot.
+   * @throws IOException Signals that an I/O exception has occurred.
+   */
+  static HCatTableSnapshot createSnapshot(Configuration jobConf,
+                      String tableName, long revision)
+    throws IOException {
+
+    TableSnapshot snpt;
+    RevisionManager rm = null;
+    try {
+      rm = getOpenedRevisionManager(jobConf);
+      snpt = rm.createSnapshot(tableName, revision);
+    } finally {
+      closeRevisionManagerQuietly(rm);
+    }
+
+    String inputJobString = jobConf.get(HCatConstants.HCAT_KEY_JOB_INFO);
+    if (inputJobString == null) {
+      throw new IOException(
+        "InputJobInfo information not found in JobContext. "
+          + "HCatInputFormat.setInput() not called?");
+    }
+    InputJobInfo inputInfo = (InputJobInfo) HCatUtil.deserialize(inputJobString);
+    HCatTableSnapshot hcatSnapshot = HBaseRevisionManagerUtil
+      .convertSnapshot(snpt, inputInfo.getTableInfo());
+
+    return hcatSnapshot;
+  }
+
+  /**
+   * Gets an instance of revision manager which is opened.
+   *
+   * @param jobConf The job configuration.
+   * @return RevisionManager An instance of revision manager.
+   * @throws IOException
+   */
+  static RevisionManager getOpenedRevisionManager(Configuration jobConf) throws IOException {
+    return RevisionManagerFactory.getOpenedRevisionManager(jobConf);
+  }
+
+  static void closeRevisionManagerQuietly(RevisionManager rm) {
+    if (rm != null) {
+      try {
+        rm.close();
+      } catch (IOException e) {
+        LOG.warn("Error while trying to close revision manager", e);
+      }
+    }
+  }
+
+
+  static HCatTableSnapshot convertSnapshot(TableSnapshot hbaseSnapshot,
+                       HCatTableInfo hcatTableInfo) throws IOException {
+
+    HCatSchema hcatTableSchema = hcatTableInfo.getDataColumns();
+    Map<String, String> hcatHbaseColMap = getHCatHBaseColumnMapping(hcatTableInfo);
+    HashMap<String, Long> revisionMap = new HashMap<String, Long>();
+
+    for (HCatFieldSchema fSchema : hcatTableSchema.getFields()) {
+      if (hcatHbaseColMap.containsKey(fSchema.getName())) {
+        String colFamily = hcatHbaseColMap.get(fSchema.getName());
+        long revisionID = hbaseSnapshot.getRevision(colFamily);
+        revisionMap.put(fSchema.getName(), revisionID);
+      }
+    }
+
+    HCatTableSnapshot hcatSnapshot = new HCatTableSnapshot(
+      hcatTableInfo.getDatabaseName(), hcatTableInfo.getTableName(), revisionMap, hbaseSnapshot.getLatestRevision());
+    return hcatSnapshot;
+  }
+
+  static TableSnapshot convertSnapshot(HCatTableSnapshot hcatSnapshot,
+                     HCatTableInfo hcatTableInfo) throws IOException {
+
+    HCatSchema hcatTableSchema = hcatTableInfo.getDataColumns();
+    Map<String, Long> revisionMap = new HashMap<String, Long>();
+    Map<String, String> hcatHbaseColMap = getHCatHBaseColumnMapping(hcatTableInfo);
+    for (HCatFieldSchema fSchema : hcatTableSchema.getFields()) {
+      String colFamily = hcatHbaseColMap.get(fSchema.getName());
+      if (hcatSnapshot.containsColumn(fSchema.getName())) {
+        long revision = hcatSnapshot.getRevision(fSchema.getName());
+        revisionMap.put(colFamily, revision);
+      }
+    }
+
+    String fullyQualifiedName = hcatSnapshot.getDatabaseName() + "."
+      + hcatSnapshot.getTableName();
+    return new TableSnapshot(fullyQualifiedName, revisionMap, hcatSnapshot.getLatestRevision());
+
+  }
+
+  /**
+   * Begins a transaction in the revision manager for the given table.
+   * @param qualifiedTableName Name of the table
+   * @param tableInfo HCat Table information
+   * @param jobConf Job Configuration
+   * @return The new transaction in revision manager
+   * @throws IOException
+   */
+  static Transaction beginWriteTransaction(String qualifiedTableName,
+                       HCatTableInfo tableInfo, Configuration jobConf) throws IOException {
+    Transaction txn;
+    RevisionManager rm = null;
+    try {
+      rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(jobConf);
+      String hBaseColumns = tableInfo.getStorerInfo().getProperties()
+        .getProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING);
+      String[] splits = hBaseColumns.split("[,:]");
+      Set<String> families = new HashSet<String>();
+      for (int i = 0; i < splits.length; i += 2) {
+        if (!splits[i].isEmpty())
+          families.add(splits[i]);
+      }
+      txn = rm.beginWriteTransaction(qualifiedTableName, new ArrayList<String>(families));
+    } finally {
+      HBaseRevisionManagerUtil.closeRevisionManagerQuietly(rm);
+    }
+    return txn;
+  }
+
+  static Transaction getWriteTransaction(Configuration conf) throws IOException {
+    OutputJobInfo outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(conf.get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
+    return (Transaction) HCatUtil.deserialize(outputJobInfo.getProperties()
+      .getProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY));
+  }
+
+  static void setWriteTransaction(Configuration conf, Transaction txn) throws IOException {
+    OutputJobInfo outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(conf.get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
+    outputJobInfo.getProperties().setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY, HCatUtil.serialize(txn));
+    conf.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo));
+  }
+
+  /**
+   * Get the Revision number that will be assigned to this job's output data
+   * @param conf configuration of the job
+   * @return the revision number used
+   * @throws IOException
+   */
+  static long getOutputRevision(Configuration conf) throws IOException {
+    return getWriteTransaction(conf).getRevisionNumber();
+  }
+
+  private static Map<String, String> getHCatHBaseColumnMapping(HCatTableInfo hcatTableInfo)
+    throws IOException {
+
+    HCatSchema hcatTableSchema = hcatTableInfo.getDataColumns();
+    StorerInfo storeInfo = hcatTableInfo.getStorerInfo();
+    String hbaseColumnMapping = storeInfo.getProperties().getProperty(
+      HBaseSerDe.HBASE_COLUMNS_MAPPING);
+
+    Map<String, String> hcatHbaseColMap = new HashMap<String, String>();
+    List<String> columnFamilies = new ArrayList<String>();
+    List<String> columnQualifiers = new ArrayList<String>();
+    HBaseUtil.parseColumnMapping(hbaseColumnMapping, columnFamilies,
+      null, columnQualifiers, null);
+
+    for (HCatFieldSchema column : hcatTableSchema.getFields()) {
+      int fieldPos = hcatTableSchema.getPosition(column.getName());
+      String colFamily = columnFamilies.get(fieldPos);
+      if (colFamily.equals(HBaseSerDe.HBASE_KEY_COL) == false) {
+        hcatHbaseColMap.put(column.getName(), colFamily);
+      }
     }
 
+    return hcatHbaseColMap;
+  }
+
 }

Modified: hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseUtil.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseUtil.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseUtil.java (original)
+++ hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseUtil.java Thu Sep 12 01:21:10 2013
@@ -29,131 +29,131 @@ import org.apache.hadoop.mapred.JobConf;
 
 class HBaseUtil {
 
-    private HBaseUtil() {
-    }
-
-    /**
-     * Parses the HBase columns mapping to identify the column families, qualifiers
-     * and also caches the byte arrays corresponding to them. One of the HCat table
-     * columns maps to the HBase row key, by default the first column.
-     *
-     * @param columnMapping - the column mapping specification to be parsed
-     * @param colFamilies - the list of HBase column family names
-     * @param colFamiliesBytes - the corresponding byte array
-     * @param colQualifiers - the list of HBase column qualifier names
-     * @param colQualifiersBytes - the corresponding byte array
-     * @return the row key index in the column names list
-     * @throws IOException
-     */
-    static int parseColumnMapping(
-        String columnMapping,
-        List<String> colFamilies,
-        List<byte[]> colFamiliesBytes,
-        List<String> colQualifiers,
-        List<byte[]> colQualifiersBytes) throws IOException {
-
-        int rowKeyIndex = -1;
-
-        if (colFamilies == null || colQualifiers == null) {
-            throw new IllegalArgumentException("Error: caller must pass in lists for the column families " +
-                "and qualifiers.");
-        }
+  private HBaseUtil() {
+  }
 
-        colFamilies.clear();
-        colQualifiers.clear();
+  /**
+   * Parses the HBase columns mapping to identify the column families, qualifiers
+   * and also caches the byte arrays corresponding to them. One of the HCat table
+   * columns maps to the HBase row key, by default the first column.
+   *
+   * @param columnMapping - the column mapping specification to be parsed
+   * @param colFamilies - the list of HBase column family names
+   * @param colFamiliesBytes - the corresponding byte array
+   * @param colQualifiers - the list of HBase column qualifier names
+   * @param colQualifiersBytes - the corresponding byte array
+   * @return the row key index in the column names list
+   * @throws IOException
+   */
+  static int parseColumnMapping(
+    String columnMapping,
+    List<String> colFamilies,
+    List<byte[]> colFamiliesBytes,
+    List<String> colQualifiers,
+    List<byte[]> colQualifiersBytes) throws IOException {
+
+    int rowKeyIndex = -1;
+
+    if (colFamilies == null || colQualifiers == null) {
+      throw new IllegalArgumentException("Error: caller must pass in lists for the column families " +
+        "and qualifiers.");
+    }
 
-        if (columnMapping == null) {
-            throw new IllegalArgumentException("Error: hbase.columns.mapping missing for this HBase table.");
-        }
+    colFamilies.clear();
+    colQualifiers.clear();
 
-        if (columnMapping.equals("") || columnMapping.equals(HBaseSerDe.HBASE_KEY_COL)) {
-            throw new IllegalArgumentException("Error: hbase.columns.mapping specifies only the HBase table"
-                + " row key. A valid Hive-HBase table must specify at least one additional column.");
-        }
+    if (columnMapping == null) {
+      throw new IllegalArgumentException("Error: hbase.columns.mapping missing for this HBase table.");
+    }
 
-        String[] mapping = columnMapping.split(",");
+    if (columnMapping.equals("") || columnMapping.equals(HBaseSerDe.HBASE_KEY_COL)) {
+      throw new IllegalArgumentException("Error: hbase.columns.mapping specifies only the HBase table"
+        + " row key. A valid Hive-HBase table must specify at least one additional column.");
+    }
 
-        for (int i = 0; i < mapping.length; i++) {
-            String elem = mapping[i];
-            int idxFirst = elem.indexOf(":");
-            int idxLast = elem.lastIndexOf(":");
-
-            if (idxFirst < 0 || !(idxFirst == idxLast)) {
-                throw new IllegalArgumentException("Error: the HBase columns mapping contains a badly formed " +
-                    "column family, column qualifier specification.");
-            }
-
-            if (elem.equals(HBaseSerDe.HBASE_KEY_COL)) {
-                rowKeyIndex = i;
-                colFamilies.add(elem);
-                colQualifiers.add(null);
-            } else {
-                String[] parts = elem.split(":");
-                assert (parts.length > 0 && parts.length <= 2);
-                colFamilies.add(parts[0]);
-
-                if (parts.length == 2) {
-                    colQualifiers.add(parts[1]);
-                } else {
-                    colQualifiers.add(null);
-                }
-            }
-        }
+    String[] mapping = columnMapping.split(",");
 
-        if (rowKeyIndex == -1) {
-            colFamilies.add(0, HBaseSerDe.HBASE_KEY_COL);
-            colQualifiers.add(0, null);
-            rowKeyIndex = 0;
+    for (int i = 0; i < mapping.length; i++) {
+      String elem = mapping[i];
+      int idxFirst = elem.indexOf(":");
+      int idxLast = elem.lastIndexOf(":");
+
+      if (idxFirst < 0 || !(idxFirst == idxLast)) {
+        throw new IllegalArgumentException("Error: the HBase columns mapping contains a badly formed " +
+          "column family, column qualifier specification.");
+      }
+
+      if (elem.equals(HBaseSerDe.HBASE_KEY_COL)) {
+        rowKeyIndex = i;
+        colFamilies.add(elem);
+        colQualifiers.add(null);
+      } else {
+        String[] parts = elem.split(":");
+        assert (parts.length > 0 && parts.length <= 2);
+        colFamilies.add(parts[0]);
+
+        if (parts.length == 2) {
+          colQualifiers.add(parts[1]);
+        } else {
+          colQualifiers.add(null);
         }
+      }
+    }
 
-        if (colFamilies.size() != colQualifiers.size()) {
-            throw new IOException("Error in parsing the hbase columns mapping.");
-        }
+    if (rowKeyIndex == -1) {
+      colFamilies.add(0, HBaseSerDe.HBASE_KEY_COL);
+      colQualifiers.add(0, null);
+      rowKeyIndex = 0;
+    }
 
-        // populate the corresponding byte [] if the client has passed in a non-null list
-        if (colFamiliesBytes != null) {
-            colFamiliesBytes.clear();
-
-            for (String fam : colFamilies) {
-                colFamiliesBytes.add(Bytes.toBytes(fam));
-            }
-        }
+    if (colFamilies.size() != colQualifiers.size()) {
+      throw new IOException("Error in parsing the hbase columns mapping.");
+    }
 
-        if (colQualifiersBytes != null) {
-            colQualifiersBytes.clear();
+    // populate the corresponding byte [] if the client has passed in a non-null list
+    if (colFamiliesBytes != null) {
+      colFamiliesBytes.clear();
+
+      for (String fam : colFamilies) {
+        colFamiliesBytes.add(Bytes.toBytes(fam));
+      }
+    }
 
-            for (String qual : colQualifiers) {
-                if (qual == null) {
-                    colQualifiersBytes.add(null);
-                } else {
-                    colQualifiersBytes.add(Bytes.toBytes(qual));
-                }
-            }
-        }
+    if (colQualifiersBytes != null) {
+      colQualifiersBytes.clear();
 
-        if (colFamiliesBytes != null && colQualifiersBytes != null) {
-            if (colFamiliesBytes.size() != colQualifiersBytes.size()) {
-                throw new IOException("Error in caching the bytes for the hbase column families " +
-                    "and qualifiers.");
-            }
+      for (String qual : colQualifiers) {
+        if (qual == null) {
+          colQualifiersBytes.add(null);
+        } else {
+          colQualifiersBytes.add(Bytes.toBytes(qual));
         }
+      }
+    }
 
-        return rowKeyIndex;
+    if (colFamiliesBytes != null && colQualifiersBytes != null) {
+      if (colFamiliesBytes.size() != colQualifiersBytes.size()) {
+        throw new IOException("Error in caching the bytes for the hbase column families " +
+          "and qualifiers.");
+      }
     }
 
-    /**
-     * Get delegation token from hbase and add it to JobConf
-     * @param job
-     * @throws IOException
-     */
-    static void addHBaseDelegationToken(JobConf job) throws IOException {
-        if (User.isHBaseSecurityEnabled(job)) {
-            try {
-                User.getCurrent().obtainAuthTokenForJob(job);
-            } catch (InterruptedException e) {
-                throw new IOException("Error while obtaining hbase delegation token", e);
-            }
-        }
+    return rowKeyIndex;
+  }
+
+  /**
+   * Get delegation token from hbase and add it to JobConf
+   * @param job
+   * @throws IOException
+   */
+  static void addHBaseDelegationToken(JobConf job) throws IOException {
+    if (User.isHBaseSecurityEnabled(job)) {
+      try {
+        User.getCurrent().obtainAuthTokenForJob(job);
+      } catch (InterruptedException e) {
+        throw new IOException("Error while obtaining hbase delegation token", e);
+      }
     }
+  }
 
 }

Modified: hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HCatTableSnapshot.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HCatTableSnapshot.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HCatTableSnapshot.java (original)
+++ hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HCatTableSnapshot.java Thu Sep 12 01:21:10 2013
@@ -30,63 +30,63 @@ import java.util.Map;
  */
 public class HCatTableSnapshot implements Serializable {
 
-    private static final long serialVersionUID = 1L;
-    private String tableName;
-    private String databaseName;
-    private Map<String, Long> columnMap;
-    private long latestRevision;
-
-    HCatTableSnapshot(String databaseName, String tableName, Map<String, Long> columnMap, long latestRevision) {
-        this.tableName = tableName;
-        this.databaseName = databaseName;
-        this.columnMap = columnMap;
-        this.latestRevision = latestRevision;
-    }
-
-    /**
-     * @return The name of the table in the snapshot.
-     */
-    public String getTableName() {
-        return this.tableName;
-    }
-
-    /**
-     * @return The name of the database to which the table snapshot belongs.
-     */
-    public String getDatabaseName() {
-        return this.databaseName;
-    }
-
-    /**
-     * @return The revision number of a column in a snapshot.
-     */
-    long getRevision(String column) {
-        if (columnMap.containsKey(column))
-            return this.columnMap.get(column);
-        return latestRevision;
-    }
-
-    /**
-     * The method checks if the snapshot contains information about a data column.
-     *
-     * @param column The data column of the table
-     * @return true, if successful
-     */
-    boolean containsColumn(String column) {
-        return this.columnMap.containsKey(column);
-    }
-
-    /**
-     * @return latest committed revision when snapshot was taken
-     */
-    long getLatestRevision() {
-        return latestRevision;
-    }
-
-    @Override
-    public String toString() {
-        String snapshot = " Database Name: " + this.databaseName + " Table Name : " + tableName +
-            "Latest Revision: " + latestRevision + " Column revision : " + columnMap.toString();
-        return snapshot;
-    }
+  private static final long serialVersionUID = 1L;
+  private String tableName;
+  private String databaseName;
+  private Map<String, Long> columnMap;
+  private long latestRevision;
+
+  HCatTableSnapshot(String databaseName, String tableName, Map<String, Long> columnMap, long latestRevision) {
+    this.tableName = tableName;
+    this.databaseName = databaseName;
+    this.columnMap = columnMap;
+    this.latestRevision = latestRevision;
+  }
+
+  /**
+   * @return The name of the table in the snapshot.
+   */
+  public String getTableName() {
+    return this.tableName;
+  }
+
+  /**
+   * @return The name of the database to which the table snapshot belongs.
+   */
+  public String getDatabaseName() {
+    return this.databaseName;
+  }
+
+  /**
+   * @return The revision number of a column in a snapshot.
+   */
+  long getRevision(String column) {
+    if (columnMap.containsKey(column))
+      return this.columnMap.get(column);
+    return latestRevision;
+  }
+
+  /**
+   * The method checks if the snapshot contains information about a data column.
+   *
+   * @param column The data column of the table
+   * @return true, if successful
+   */
+  boolean containsColumn(String column) {
+    return this.columnMap.containsKey(column);
+  }
+
+  /**
+   * @return latest committed revision when snapshot was taken
+   */
+  long getLatestRevision() {
+    return latestRevision;
+  }
+
+  @Override
+  public String toString() {
+    String snapshot = " Database Name: " + this.databaseName + " Table Name : " + tableName +
+      "Latest Revision: " + latestRevision + " Column revision : " + columnMap.toString();
+    return snapshot;
+  }
 }