You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kh...@apache.org on 2014/08/04 21:17:54 UTC
svn commit: r1615730 - in /hive/trunk: ./
common/src/java/org/apache/hadoop/hive/conf/
hbase-handler/src/java/org/apache/hadoop/hive/hbase/
hbase-handler/src/test/queries/positive/
hbase-handler/src/test/results/positive/ hbase-handler/src/test/templat...
Author: khorgath
Date: Mon Aug 4 19:17:53 2014
New Revision: 1615730
URL: http://svn.apache.org/r1615730
Log:
HIVE-6584 : Add HiveHBaseTableSnapshotInputFormat (Nick Dimiduk, reviewed by Navis Ryu, Sushanth Sowmyan)
Added:
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseTableSnapshotInputFormatUtil.java
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableSnapshotInputFormat.java
hive/trunk/hbase-handler/src/test/queries/positive/hbase_handler_snapshot.q
hive/trunk/hbase-handler/src/test/results/positive/hbase_handler_snapshot.q.out
Modified:
hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSplit.java
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
hive/trunk/hbase-handler/src/test/results/positive/external_table_ppd.q.out
hive/trunk/hbase-handler/src/test/results/positive/hbase_binary_storage_queries.q.out
hive/trunk/hbase-handler/src/test/templates/TestHBaseCliDriver.vm
hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseQTestUtil.java
hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseTestSetup.java
hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
hive/trunk/pom.xml
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1615730&r1=1615729&r2=1615730&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Mon Aug 4 19:17:53 2014
@@ -1256,6 +1256,9 @@ public class HiveConf extends Configurat
"Disabling this improves HBase write performance at the risk of lost writes in case of a crash."),
HIVE_HBASE_GENERATE_HFILES("hive.hbase.generatehfiles", false,
"True when HBaseStorageHandler should generate hfiles instead of operate against the online table."),
+ HIVE_HBASE_SNAPSHOT_NAME("hive.hbase.snapshot.name", null, "The HBase table snapshot name to use."),
+ HIVE_HBASE_SNAPSHOT_RESTORE_DIR("hive.hbase.snapshot.restoredir", "/tmp", "The directory in which to " +
+ "restore the HBase table snapshot."),
// For har files
HIVEARCHIVEENABLED("hive.archive.enabled", false, "Whether archiving operations are permitted"),
Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSplit.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSplit.java?rev=1615730&r1=1615729&r2=1615730&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSplit.java (original)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSplit.java Mon Aug 4 19:17:53 2014
@@ -31,46 +31,86 @@ import org.apache.hadoop.mapred.InputSpl
* HBaseSplit augments FileSplit with HBase column mapping.
*/
public class HBaseSplit extends FileSplit implements InputSplit {
- private final TableSplit split;
+ private final TableSplit tableSplit;
+ private final InputSplit snapshotSplit;
+ private boolean isTableSplit; // should be final but Writable
+
+ /**
+ * For Writable
+ */
public HBaseSplit() {
super((Path) null, 0, 0, (String[]) null);
- split = new TableSplit();
+ tableSplit = new TableSplit();
+ snapshotSplit = HBaseTableSnapshotInputFormatUtil.createTableSnapshotRegionSplit();
}
- public HBaseSplit(TableSplit split, Path dummyPath) {
+ public HBaseSplit(TableSplit tableSplit, Path dummyPath) {
super(dummyPath, 0, 0, (String[]) null);
- this.split = split;
+ this.tableSplit = tableSplit;
+ this.snapshotSplit = HBaseTableSnapshotInputFormatUtil.createTableSnapshotRegionSplit();
+ this.isTableSplit = true;
}
- public TableSplit getSplit() {
- return this.split;
+ /**
+ * TODO: use TableSnapshotRegionSplit HBASE-11555 is fixed.
+ */
+ public HBaseSplit(InputSplit snapshotSplit, Path dummyPath) {
+ super(dummyPath, 0, 0, (String[]) null);
+ this.tableSplit = new TableSplit();
+ this.snapshotSplit = snapshotSplit;
+ this.isTableSplit = false;
}
- @Override
- public void readFields(DataInput in) throws IOException {
- super.readFields(in);
- split.readFields(in);
+ public TableSplit getTableSplit() {
+ assert isTableSplit;
+ return this.tableSplit;
+ }
+
+ public InputSplit getSnapshotSplit() {
+ assert !isTableSplit;
+ return this.snapshotSplit;
}
@Override
public String toString() {
- return "TableSplit " + split;
+ return "" + (isTableSplit ? tableSplit : snapshotSplit);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ this.isTableSplit = in.readBoolean();
+ if (this.isTableSplit) {
+ tableSplit.readFields(in);
+ } else {
+ snapshotSplit.readFields(in);
+ }
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
- split.write(out);
+ out.writeBoolean(isTableSplit);
+ if (isTableSplit) {
+ tableSplit.write(out);
+ } else {
+ snapshotSplit.write(out);
+ }
}
@Override
public long getLength() {
- return split.getLength();
+ long val = 0;
+ try {
+ val = isTableSplit ? tableSplit.getLength() : snapshotSplit.getLength();
+ } finally {
+ return val;
+ }
}
@Override
public String[] getLocations() throws IOException {
- return split.getLocations();
+ return isTableSplit ? tableSplit.getLocations() : snapshotSplit.getLocations();
}
}
Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java?rev=1615730&r1=1615729&r2=1615730&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java (original)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java Mon Aug 4 19:17:53 2014
@@ -29,7 +29,10 @@ import java.util.Properties;
import java.util.Set;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
@@ -69,6 +72,19 @@ import org.apache.hadoop.util.StringUtil
public class HBaseStorageHandler extends DefaultStorageHandler
implements HiveMetaHook, HiveStoragePredicateHandler {
+ private static final Log LOG = LogFactory.getLog(HBaseStorageHandler.class);
+
+ /** HBase-internal config by which input format receives snapshot name. */
+ private static final String HBASE_SNAPSHOT_NAME_KEY = "hbase.TableSnapshotInputFormat.snapshot.name";
+ /** HBase-internal config by which input format received restore dir before HBASE-11335. */
+ private static final String HBASE_SNAPSHOT_TABLE_DIR_KEY = "hbase.TableSnapshotInputFormat.table.dir";
+ /** HBase-internal config by which input format received restore dir after HBASE-11335. */
+ private static final String HBASE_SNAPSHOT_RESTORE_DIR_KEY = "hbase.TableSnapshotInputFormat.restore.dir";
+ /** HBase config by which a SlabCache is sized. */
+ private static final String HBASE_OFFHEAP_PCT_KEY = "hbase.offheapcache.percentage";
+ /** HBase config by which a BucketCache is sized. */
+ private static final String HBASE_BUCKETCACHE_SIZE_KEY = "hbase.bucketcache.size";
+
final static public String DEFAULT_PREFIX = "default.";
//Check if the configure job properties is called from input
@@ -258,6 +274,11 @@ public class HBaseStorageHandler extends
@Override
public Class<? extends InputFormat> getInputFormatClass() {
+ if (HiveConf.getVar(jobConf, HiveConf.ConfVars.HIVE_HBASE_SNAPSHOT_NAME) != null) {
+ LOG.debug("Using TableSnapshotInputFormat");
+ return HiveHBaseTableSnapshotInputFormat.class;
+ }
+ LOG.debug("Using HiveHBaseTableInputFormat");
return HiveHBaseTableInputFormat.class;
}
@@ -342,6 +363,37 @@ public class HBaseStorageHandler extends
// do this for reconciling HBaseStorageHandler for use in HCatalog
// check to see if this an input job or an outputjob
if (this.configureInputJobProps) {
+ String snapshotName = HiveConf.getVar(jobConf, HiveConf.ConfVars.HIVE_HBASE_SNAPSHOT_NAME);
+ if (snapshotName != null) {
+ HBaseTableSnapshotInputFormatUtil.assertSupportsTableSnapshots();
+
+ try {
+ String restoreDir =
+ HiveConf.getVar(jobConf, HiveConf.ConfVars.HIVE_HBASE_SNAPSHOT_RESTORE_DIR);
+ if (restoreDir == null) {
+ throw new IllegalArgumentException(
+ "Cannot process HBase snapshot without specifying " + HiveConf.ConfVars
+ .HIVE_HBASE_SNAPSHOT_RESTORE_DIR);
+ }
+
+ HBaseTableSnapshotInputFormatUtil.configureJob(hbaseConf, snapshotName, new Path(restoreDir));
+ // copy over configs touched by above method
+ jobProperties.put(HBASE_SNAPSHOT_NAME_KEY, hbaseConf.get(HBASE_SNAPSHOT_NAME_KEY));
+ if (hbaseConf.get(HBASE_SNAPSHOT_TABLE_DIR_KEY, null) != null) {
+ jobProperties.put(HBASE_SNAPSHOT_TABLE_DIR_KEY, hbaseConf.get(HBASE_SNAPSHOT_TABLE_DIR_KEY));
+ } else {
+ jobProperties.put(HBASE_SNAPSHOT_RESTORE_DIR_KEY, hbaseConf.get(HBASE_SNAPSHOT_RESTORE_DIR_KEY));
+ }
+
+ TableMapReduceUtil.resetCacheConfig(hbaseConf);
+ // copy over configs touched by above method
+ jobProperties.put(HBASE_OFFHEAP_PCT_KEY, hbaseConf.get(HBASE_OFFHEAP_PCT_KEY));
+ jobProperties.put(HBASE_BUCKETCACHE_SIZE_KEY, hbaseConf.get(HBASE_BUCKETCACHE_SIZE_KEY));
+ } catch (IOException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
for (String k : jobProperties.keySet()) {
jobConf.set(k, jobProperties.get(k));
}
@@ -415,7 +467,8 @@ public class HBaseStorageHandler extends
* only need TableMapReduceUtil.addDependencyJars(jobConf) here.
*/
TableMapReduceUtil.addDependencyJars(
- jobConf, HBaseStorageHandler.class, TableInputFormatBase.class);
+ jobConf, HBaseStorageHandler.class, TableInputFormatBase.class,
+ org.cliffc.high_scale_lib.Counter.class); // this will be removed for HBase 1.0
Set<String> merged = new LinkedHashSet<String>(jobConf.getStringCollection("tmpjars"));
Job copy = new Job(jobConf);
Added: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseTableSnapshotInputFormatUtil.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseTableSnapshotInputFormatUtil.java?rev=1615730&view=auto
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseTableSnapshotInputFormatUtil.java (added)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseTableSnapshotInputFormatUtil.java Mon Aug 4 19:17:53 2014
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.hbase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl;
+import org.apache.hadoop.mapred.InputSplit;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+/**
+ * A helper class to isolate newer HBase features from users running against older versions of
+ * HBase that don't provide those features.
+ *
+ * TODO: remove this class when it's okay to drop support for earlier version of HBase.
+ */
+public class HBaseTableSnapshotInputFormatUtil {
+
+ private static final Log LOG = LogFactory.getLog(HBaseTableSnapshotInputFormatUtil.class);
+
+ /** The class we look for to determine if hbase snapshots are supported. */
+ private static final String TABLESNAPSHOTINPUTFORMAT_CLASS
+ = "org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl";
+
+ private static final String TABLESNAPSHOTREGIONSPLIT_CLASS
+ = "org.apache.hadoop.hbase.mapred.TableSnapshotInputFormat$TableSnapshotRegionSplit";
+
+ /** True when {@link #TABLESNAPSHOTINPUTFORMAT_CLASS} is present. */
+ private static final boolean SUPPORTS_TABLE_SNAPSHOTS;
+
+ static {
+ boolean support = false;
+ try {
+ Class<?> clazz = Class.forName(TABLESNAPSHOTINPUTFORMAT_CLASS);
+ support = clazz != null;
+ } catch (ClassNotFoundException e) {
+ // pass
+ }
+ SUPPORTS_TABLE_SNAPSHOTS = support;
+ }
+
+ /** Return true when the HBase runtime supports {@link HiveHBaseTableSnapshotInputFormat}. */
+ public static void assertSupportsTableSnapshots() {
+ if (!SUPPORTS_TABLE_SNAPSHOTS) {
+ throw new RuntimeException("This version of HBase does not support Hive over table " +
+ "snapshots. Please upgrade to at least HBase 0.98.3 or later. See HIVE-6584 for details.");
+ }
+ }
+
+ /**
+ * Configures {@code conf} for the snapshot job. Call only when
+ * {@link #assertSupportsTableSnapshots()} returns true.
+ */
+ public static void configureJob(Configuration conf, String snapshotName, Path restoreDir)
+ throws IOException {
+ TableSnapshotInputFormatImpl.setInput(conf, snapshotName, restoreDir);
+ }
+
+ /**
+ * Create a bare TableSnapshotRegionSplit. Needed because Writables require a
+ * default-constructed instance to hydrate from the DataInput.
+ *
+ * TODO: remove once HBASE-11555 is fixed.
+ */
+ public static InputSplit createTableSnapshotRegionSplit() {
+ try {
+ assertSupportsTableSnapshots();
+ } catch (RuntimeException e) {
+ LOG.debug("Probably don't support table snapshots. Returning null instance.", e);
+ return null;
+ }
+
+ try {
+ Class<? extends InputSplit> resultType =
+ (Class<? extends InputSplit>) Class.forName(TABLESNAPSHOTREGIONSPLIT_CLASS);
+ Constructor<? extends InputSplit> cxtor = resultType.getDeclaredConstructor(new Class[]{});
+ cxtor.setAccessible(true);
+ return cxtor.newInstance(new Object[]{});
+ } catch (ClassNotFoundException e) {
+ throw new UnsupportedOperationException(
+ "Unable to find " + TABLESNAPSHOTREGIONSPLIT_CLASS, e);
+ } catch (IllegalAccessException e) {
+ throw new UnsupportedOperationException(
+ "Unable to access specified class " + TABLESNAPSHOTREGIONSPLIT_CLASS, e);
+ } catch (InstantiationException e) {
+ throw new UnsupportedOperationException(
+ "Unable to instantiate specified class " + TABLESNAPSHOTREGIONSPLIT_CLASS, e);
+ } catch (InvocationTargetException e) {
+ throw new UnsupportedOperationException(
+ "Constructor threw an exception for " + TABLESNAPSHOTREGIONSPLIT_CLASS, e);
+ } catch (NoSuchMethodException e) {
+ throw new UnsupportedOperationException(
+ "Unable to find suitable constructor for class " + TABLESNAPSHOTREGIONSPLIT_CLASS, e);
+ }
+ }
+}
Added: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java?rev=1615730&view=auto
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java (added)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java Mon Aug 4 19:17:53 2014
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.hbase;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Util code common between HiveHBaseTableInputFormat and HiveHBaseTableSnapshotInputFormat.
+ */
+class HiveHBaseInputFormatUtil {
+
+ /**
+ * Parse {@code jobConf} to create the target {@link HTable} instance.
+ */
+ public static HTable getTable(JobConf jobConf) throws IOException {
+ String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME);
+ return new HTable(HBaseConfiguration.create(jobConf), Bytes.toBytes(hbaseTableName));
+ }
+
+ /**
+ * Parse {@code jobConf} to create a {@link Scan} instance.
+ */
+ public static Scan getScan(JobConf jobConf) throws IOException {
+ String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING);
+ boolean doColumnRegexMatching = jobConf.getBoolean(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, true);
+ List<Integer> readColIDs = ColumnProjectionUtils.getReadColumnIDs(jobConf);
+ ColumnMappings columnMappings;
+
+ try {
+ columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching);
+ } catch (SerDeException e) {
+ throw new IOException(e);
+ }
+
+ if (columnMappings.size() < readColIDs.size()) {
+ throw new IOException("Cannot read more columns than the given table contains.");
+ }
+
+ boolean readAllColumns = ColumnProjectionUtils.isReadAllColumns(jobConf);
+ Scan scan = new Scan();
+ boolean empty = true;
+
+ // The list of families that have been added to the scan
+ List<String> addedFamilies = new ArrayList<String>();
+
+ if (!readAllColumns) {
+ ColumnMapping[] columnsMapping = columnMappings.getColumnsMapping();
+ for (int i : readColIDs) {
+ ColumnMapping colMap = columnsMapping[i];
+ if (colMap.hbaseRowKey) {
+ continue;
+ }
+
+ if (colMap.qualifierName == null) {
+ scan.addFamily(colMap.familyNameBytes);
+ addedFamilies.add(colMap.familyName);
+ } else {
+ if(!addedFamilies.contains(colMap.familyName)){
+ // add only if the corresponding family has not already been added
+ scan.addColumn(colMap.familyNameBytes, colMap.qualifierNameBytes);
+ }
+ }
+
+ empty = false;
+ }
+ }
+
+ // The HBase table's row key maps to a Hive table column. In the corner case when only the
+ // row key column is selected in Hive, the HBase Scan will be empty i.e. no column family/
+ // column qualifier will have been added to the scan. We arbitrarily add at least one column
+ // to the HBase scan so that we can retrieve all of the row keys and return them as the Hive
+ // tables column projection.
+ if (empty) {
+ for (ColumnMapping colMap: columnMappings) {
+ if (colMap.hbaseRowKey) {
+ continue;
+ }
+
+ if (colMap.qualifierName == null) {
+ scan.addFamily(colMap.familyNameBytes);
+ } else {
+ scan.addColumn(colMap.familyNameBytes, colMap.qualifierNameBytes);
+ }
+
+ if (!readAllColumns) {
+ break;
+ }
+ }
+ }
+
+ String scanCache = jobConf.get(HBaseSerDe.HBASE_SCAN_CACHE);
+ if (scanCache != null) {
+ scan.setCaching(Integer.valueOf(scanCache));
+ }
+ String scanCacheBlocks = jobConf.get(HBaseSerDe.HBASE_SCAN_CACHEBLOCKS);
+ if (scanCacheBlocks != null) {
+ scan.setCacheBlocks(Boolean.valueOf(scanCacheBlocks));
+ }
+ String scanBatch = jobConf.get(HBaseSerDe.HBASE_SCAN_BATCH);
+ if (scanBatch != null) {
+ scan.setBatch(Integer.valueOf(scanBatch));
+ }
+ return scan;
+ }
+
+ public static boolean getStorageFormatOfKey(String spec, String defaultFormat) throws IOException{
+
+ String[] mapInfo = spec.split("#");
+ boolean tblLevelDefault = "binary".equalsIgnoreCase(defaultFormat);
+
+ switch (mapInfo.length) {
+ case 1:
+ return tblLevelDefault;
+
+ case 2:
+ String storageType = mapInfo[1];
+ if(storageType.equals("-")) {
+ return tblLevelDefault;
+ } else if ("string".startsWith(storageType)){
+ return false;
+ } else if ("binary".startsWith(storageType)){
+ return true;
+ }
+
+ default:
+ throw new IOException("Malformed string: " + spec);
+ }
+ }
+}
Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java?rev=1615730&r1=1615729&r2=1615730&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java (original)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java Mon Aug 4 19:17:53 2014
@@ -46,7 +46,6 @@ import org.apache.hadoop.hive.ql.plan.Ex
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.ByteStream;
-import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.io.ByteWritable;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
@@ -88,90 +87,11 @@ public class HiveHBaseTableInputFormat e
final Reporter reporter) throws IOException {
HBaseSplit hbaseSplit = (HBaseSplit) split;
- TableSplit tableSplit = hbaseSplit.getSplit();
- String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME);
- setHTable(new HTable(HBaseConfiguration.create(jobConf), Bytes.toBytes(hbaseTableName)));
- String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING);
- boolean doColumnRegexMatching = jobConf.getBoolean(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, true);
- List<Integer> readColIDs = ColumnProjectionUtils.getReadColumnIDs(jobConf);
- ColumnMappings columnMappings;
-
- try {
- columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching);
- } catch (SerDeException e) {
- throw new IOException(e);
- }
+ TableSplit tableSplit = hbaseSplit.getTableSplit();
- if (columnMappings.size() < readColIDs.size()) {
- throw new IOException("Cannot read more columns than the given table contains.");
- }
+ setHTable(HiveHBaseInputFormatUtil.getTable(jobConf));
+ setScan(HiveHBaseInputFormatUtil.getScan(jobConf));
- boolean readAllColumns = ColumnProjectionUtils.isReadAllColumns(jobConf);
- Scan scan = new Scan();
- boolean empty = true;
-
- // The list of families that have been added to the scan
- List<String> addedFamilies = new ArrayList<String>();
-
- if (!readAllColumns) {
- ColumnMapping[] columnsMapping = columnMappings.getColumnsMapping();
- for (int i : readColIDs) {
- ColumnMapping colMap = columnsMapping[i];
- if (colMap.hbaseRowKey) {
- continue;
- }
-
- if (colMap.qualifierName == null) {
- scan.addFamily(colMap.familyNameBytes);
- addedFamilies.add(colMap.familyName);
- } else {
- if(!addedFamilies.contains(colMap.familyName)){
- // add only if the corresponding family has not already been added
- scan.addColumn(colMap.familyNameBytes, colMap.qualifierNameBytes);
- }
- }
-
- empty = false;
- }
- }
-
- // The HBase table's row key maps to a Hive table column. In the corner case when only the
- // row key column is selected in Hive, the HBase Scan will be empty i.e. no column family/
- // column qualifier will have been added to the scan. We arbitrarily add at least one column
- // to the HBase scan so that we can retrieve all of the row keys and return them as the Hive
- // tables column projection.
- if (empty) {
- for (ColumnMapping colMap: columnMappings) {
- if (colMap.hbaseRowKey) {
- continue;
- }
-
- if (colMap.qualifierName == null) {
- scan.addFamily(colMap.familyNameBytes);
- } else {
- scan.addColumn(colMap.familyNameBytes, colMap.qualifierNameBytes);
- }
-
- if (!readAllColumns) {
- break;
- }
- }
- }
-
- String scanCache = jobConf.get(HBaseSerDe.HBASE_SCAN_CACHE);
- if (scanCache != null) {
- scan.setCaching(Integer.valueOf(scanCache));
- }
- String scanCacheBlocks = jobConf.get(HBaseSerDe.HBASE_SCAN_CACHEBLOCKS);
- if (scanCacheBlocks != null) {
- scan.setCacheBlocks(Boolean.valueOf(scanCacheBlocks));
- }
- String scanBatch = jobConf.get(HBaseSerDe.HBASE_SCAN_BATCH);
- if (scanBatch != null) {
- scan.setBatch(Integer.valueOf(scanBatch));
- }
-
- setScan(scan);
Job job = new Job(jobConf);
TaskAttemptContext tac = ShimLoader.getHadoopShims().newTaskAttemptContext(
job.getConfiguration(), reporter);
@@ -443,12 +363,12 @@ public class HiveHBaseTableInputFormat e
boolean doColumnRegexMatching = jobConf.getBoolean(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, true);
if (hbaseColumnsMapping == null) {
- throw new IOException("hbase.columns.mapping required for HBase Table.");
+ throw new IOException(HBaseSerDe.HBASE_COLUMNS_MAPPING + " required for HBase Table.");
}
ColumnMappings columnMappings = null;
try {
- columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping,doColumnRegexMatching);
+ columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching);
} catch (SerDeException e) {
throw new IOException(e);
}
@@ -463,10 +383,9 @@ public class HiveHBaseTableInputFormat e
// definition into account and excludes regions which don't satisfy
// the start/stop row conditions (HBASE-1829).
Scan scan = createFilterScan(jobConf, iKey,
- getStorageFormatOfKey(keyMapping.mappingSpec,
+ HiveHBaseInputFormatUtil.getStorageFormatOfKey(keyMapping.mappingSpec,
jobConf.get(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE, "string")));
-
// The list of families that have been added to the scan
List<String> addedFamilies = new ArrayList<String>();
@@ -503,28 +422,4 @@ public class HiveHBaseTableInputFormat e
return results;
}
-
- private boolean getStorageFormatOfKey(String spec, String defaultFormat) throws IOException{
-
- String[] mapInfo = spec.split("#");
- boolean tblLevelDefault = "binary".equalsIgnoreCase(defaultFormat) ? true : false;
-
- switch (mapInfo.length) {
- case 1:
- return tblLevelDefault;
-
- case 2:
- String storageType = mapInfo[1];
- if(storageType.equals("-")) {
- return tblLevelDefault;
- } else if ("string".startsWith(storageType)){
- return false;
- } else if ("binary".startsWith(storageType)){
- return true;
- }
-
- default:
- throw new IOException("Malformed string: " + spec);
- }
- }
}
Added: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableSnapshotInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableSnapshotInputFormat.java?rev=1615730&view=auto
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableSnapshotInputFormat.java (added)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableSnapshotInputFormat.java Mon Aug 4 19:17:53 2014
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.hbase;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapred.TableInputFormat;
+import org.apache.hadoop.hbase.mapred.TableSnapshotInputFormat;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+import java.util.List;
+
+public class HiveHBaseTableSnapshotInputFormat
+ implements InputFormat<ImmutableBytesWritable, ResultWritable> {
+
+ TableSnapshotInputFormat delegate = new TableSnapshotInputFormat();
+
+ private static void setColumns(JobConf job) throws IOException {
+ // hbase mapred API doesn't support scan at the moment.
+ Scan scan = HiveHBaseInputFormatUtil.getScan(job);
+ byte[][] families = scan.getFamilies();
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < families.length; i++) {
+ if (i > 0) sb.append(" ");
+ sb.append(Bytes.toString(families[i]));
+ }
+ job.set(TableInputFormat.COLUMN_LIST, sb.toString());
+ }
+
+ @Override
+ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+ setColumns(job);
+
+ // hive depends on FileSplits, so wrap in HBaseSplit
+ Path[] tablePaths = FileInputFormat.getInputPaths(job);
+
+ InputSplit [] results = delegate.getSplits(job, numSplits);
+ for (int i = 0; i < results.length; i++) {
+ results[i] = new HBaseSplit(results[i], tablePaths[0]);
+ }
+
+ return results;
+ }
+
+ @Override
+ public RecordReader<ImmutableBytesWritable, ResultWritable> getRecordReader(
+ InputSplit split, JobConf job, Reporter reporter) throws IOException {
+ setColumns(job);
+ final RecordReader<ImmutableBytesWritable, Result> rr =
+ delegate.getRecordReader(((HBaseSplit) split).getSnapshotSplit(), job, reporter);
+
+ return new RecordReader<ImmutableBytesWritable, ResultWritable>() {
+ @Override
+ public boolean next(ImmutableBytesWritable key, ResultWritable value) throws IOException {
+ return rr.next(key, value.getResult());
+ }
+
+ @Override
+ public ImmutableBytesWritable createKey() {
+ return rr.createKey();
+ }
+
+ @Override
+ public ResultWritable createValue() {
+ return new ResultWritable(rr.createValue());
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return rr.getPos();
+ }
+
+ @Override
+ public void close() throws IOException {
+ rr.close();
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return rr.getProgress();
+ }
+ };
+ }
+}
Added: hive/trunk/hbase-handler/src/test/queries/positive/hbase_handler_snapshot.q
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/test/queries/positive/hbase_handler_snapshot.q?rev=1615730&view=auto
==============================================================================
--- hive/trunk/hbase-handler/src/test/queries/positive/hbase_handler_snapshot.q (added)
+++ hive/trunk/hbase-handler/src/test/queries/positive/hbase_handler_snapshot.q Mon Aug 4 19:17:53 2014
@@ -0,0 +1,4 @@
+SET hive.hbase.snapshot.name=src_hbase_snapshot;
+SET hive.hbase.snapshot.restoredir=/tmp;
+
+SELECT * FROM src_hbase LIMIT 5;
Modified: hive/trunk/hbase-handler/src/test/results/positive/external_table_ppd.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/test/results/positive/external_table_ppd.q.out?rev=1615730&r1=1615729&r2=1615730&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/test/results/positive/external_table_ppd.q.out (original)
+++ hive/trunk/hbase-handler/src/test/results/positive/external_table_ppd.q.out Mon Aug 4 19:17:53 2014
@@ -63,8 +63,8 @@ Table Parameters:
# Storage Information
SerDe Library: org.apache.hadoop.hive.hbase.HBaseSerDe
-InputFormat: org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat
-OutputFormat: org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat
+InputFormat: null
+OutputFormat: null
Compressed: No
Num Buckets: -1
Bucket Columns: []
Modified: hive/trunk/hbase-handler/src/test/results/positive/hbase_binary_storage_queries.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/test/results/positive/hbase_binary_storage_queries.q.out?rev=1615730&r1=1615729&r2=1615730&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/test/results/positive/hbase_binary_storage_queries.q.out (original)
+++ hive/trunk/hbase-handler/src/test/results/positive/hbase_binary_storage_queries.q.out Mon Aug 4 19:17:53 2014
@@ -63,8 +63,8 @@ Table Parameters:
# Storage Information
SerDe Library: org.apache.hadoop.hive.hbase.HBaseSerDe
-InputFormat: org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat
-OutputFormat: org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat
+InputFormat: null
+OutputFormat: null
Compressed: No
Num Buckets: -1
Bucket Columns: []
@@ -238,8 +238,8 @@ Table Parameters:
# Storage Information
SerDe Library: org.apache.hadoop.hive.hbase.HBaseSerDe
-InputFormat: org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat
-OutputFormat: org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat
+InputFormat: null
+OutputFormat: null
Compressed: No
Num Buckets: -1
Bucket Columns: []
Added: hive/trunk/hbase-handler/src/test/results/positive/hbase_handler_snapshot.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/test/results/positive/hbase_handler_snapshot.q.out?rev=1615730&view=auto
==============================================================================
--- hive/trunk/hbase-handler/src/test/results/positive/hbase_handler_snapshot.q.out (added)
+++ hive/trunk/hbase-handler/src/test/results/positive/hbase_handler_snapshot.q.out Mon Aug 4 19:17:53 2014
@@ -0,0 +1,13 @@
+PREHOOK: query: SELECT * FROM src_hbase LIMIT 5
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src_hbase
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT * FROM src_hbase LIMIT 5
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src_hbase
+#### A masked pattern was here ####
+0 val_0
+10 val_10
+100 val_100
+103 val_103
+104 val_104
Modified: hive/trunk/hbase-handler/src/test/templates/TestHBaseCliDriver.vm
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/test/templates/TestHBaseCliDriver.vm?rev=1615730&r1=1615729&r2=1615730&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/test/templates/TestHBaseCliDriver.vm (original)
+++ hive/trunk/hbase-handler/src/test/templates/TestHBaseCliDriver.vm Mon Aug 4 19:17:53 2014
@@ -27,7 +27,6 @@ import java.util.*;
import org.apache.hadoop.hive.ql.QTestUtil.MiniClusterType;
import org.apache.hadoop.hive.hbase.HBaseQTestUtil;
import org.apache.hadoop.hive.hbase.HBaseTestSetup;
-import org.apache.hadoop.hive.ql.session.SessionState;
public class $className extends TestCase {
Modified: hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseQTestUtil.java
URL: http://svn.apache.org/viewvc/hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseQTestUtil.java?rev=1615730&r1=1615729&r2=1615730&view=diff
==============================================================================
--- hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseQTestUtil.java (original)
+++ hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseQTestUtil.java Mon Aug 4 19:17:53 2014
@@ -17,24 +17,98 @@
*/
package org.apache.hadoop.hive.hbase;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.ql.QTestUtil;
-import org.apache.hadoop.hive.ql.QTestUtil.MiniClusterType;
+
+import java.util.List;
/**
* HBaseQTestUtil initializes HBase-specific test fixtures.
*/
public class HBaseQTestUtil extends QTestUtil {
+
+ /** Name of the HBase table, in both Hive and HBase. */
+ public static String HBASE_SRC_NAME = "src_hbase";
+
+ /** Name of the table snapshot. */
+ public static String HBASE_SRC_SNAPSHOT_NAME = "src_hbase_snapshot";
+
+ /** A handle to this harness's cluster */
+ private final HConnection conn;
+
public HBaseQTestUtil(
String outDir, String logDir, MiniClusterType miniMr, HBaseTestSetup setup)
throws Exception {
super(outDir, logDir, miniMr, null);
setup.preTest(conf);
+ this.conn = setup.getConnection();
super.init();
}
+ /** return true when HBase table snapshot exists, false otherwise. */
+ private static boolean hbaseTableSnapshotExists(HBaseAdmin admin, String snapshotName) throws
+ Exception {
+ List<HBaseProtos.SnapshotDescription> snapshots =
+ admin.listSnapshots(".*" + snapshotName + ".*");
+ for (HBaseProtos.SnapshotDescription sn : snapshots) {
+ if (sn.getName().equals(HBASE_SRC_SNAPSHOT_NAME)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
@Override
public void init() throws Exception {
// defer
}
+
+ @Override
+ public void createSources() throws Exception {
+ super.createSources();
+
+ conf.setBoolean("hive.test.init.phase", true);
+
+ // create and load the input data into the hbase table
+ runCreateTableCmd(
+ "CREATE TABLE " + HBASE_SRC_NAME + "(key INT, value STRING)"
+ + " STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'"
+ + " WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,cf:val')"
+ + " TBLPROPERTIES ('hbase.table.name' = '" + HBASE_SRC_NAME + "')"
+ );
+ runCmd("INSERT OVERWRITE TABLE " + HBASE_SRC_NAME + " SELECT * FROM src");
+
+ // create a snapshot
+ HBaseAdmin admin = null;
+ try {
+ admin = new HBaseAdmin(conn.getConfiguration());
+ admin.snapshot(HBASE_SRC_SNAPSHOT_NAME, HBASE_SRC_NAME);
+ } finally {
+ if (admin != null) admin.close();
+ }
+
+ conf.setBoolean("hive.test.init.phase", false);
+ }
+
+ @Override
+ public void cleanUp() throws Exception {
+ super.cleanUp();
+
+ // drop in case leftover from unsuccessful run
+ db.dropTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, HBASE_SRC_NAME);
+
+ HBaseAdmin admin = null;
+ try {
+ admin = new HBaseAdmin(conn.getConfiguration());
+ if (hbaseTableSnapshotExists(admin, HBASE_SRC_SNAPSHOT_NAME)) {
+ admin.deleteSnapshot(HBASE_SRC_SNAPSHOT_NAME);
+ }
+ } finally {
+ if (admin != null) admin.close();
+ }
+ }
}
Modified: hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseTestSetup.java
URL: http://svn.apache.org/viewvc/hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseTestSetup.java?rev=1615730&r1=1615729&r2=1615730&view=diff
==============================================================================
--- hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseTestSetup.java (original)
+++ hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseTestSetup.java Mon Aug 4 19:17:53 2014
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.hbase;
-import java.io.File;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.Arrays;
@@ -29,12 +28,13 @@ import junit.framework.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -50,6 +50,7 @@ public class HBaseTestSetup extends Test
private MiniHBaseCluster hbaseCluster;
private int zooKeeperPort;
private String hbaseRoot;
+ private HConnection hbaseConn;
private static final int NUM_REGIONSERVERS = 1;
@@ -57,6 +58,10 @@ public class HBaseTestSetup extends Test
super(test);
}
+ public HConnection getConnection() {
+ return this.hbaseConn;
+ }
+
void preTest(HiveConf conf) throws Exception {
setUpFixtures(conf);
@@ -97,27 +102,23 @@ public class HBaseTestSetup extends Test
hbaseConf.setInt("hbase.regionserver.info.port", -1);
hbaseCluster = new MiniHBaseCluster(hbaseConf, NUM_REGIONSERVERS);
conf.set("hbase.master", hbaseCluster.getMaster().getServerName().getHostAndPort());
+ hbaseConn = HConnectionManager.createConnection(hbaseConf);
+
// opening the META table ensures that cluster is running
- new HTable(hbaseConf, HConstants.META_TABLE_NAME);
- createHBaseTable(hbaseConf);
+ HTableInterface meta = null;
+ try {
+ meta = hbaseConn.getTable(TableName.META_TABLE_NAME);
+ } finally {
+ if (meta != null) meta.close();
+ }
+ createHBaseTable();
}
- private void createHBaseTable(Configuration hbaseConf) throws IOException {
+ private void createHBaseTable() throws IOException {
final String HBASE_TABLE_NAME = "HiveExternalTable";
HTableDescriptor htableDesc = new HTableDescriptor(HBASE_TABLE_NAME.getBytes());
HColumnDescriptor hcolDesc = new HColumnDescriptor("cf".getBytes());
htableDesc.addFamily(hcolDesc);
- HBaseAdmin hbaseAdmin = new HBaseAdmin(hbaseConf);
- if(Arrays.asList(hbaseAdmin.listTables()).contains(htableDesc)){
- // if table is already in there, don't recreate.
- return;
- }
- hbaseAdmin.createTable(htableDesc);
- HTable htable = new HTable(hbaseConf, HBASE_TABLE_NAME);
-
- // data
- Put [] puts = new Put [] {
- new Put("key-1".getBytes()), new Put("key-2".getBytes()), new Put("key-3".getBytes()) };
boolean [] booleans = new boolean [] { true, false, true };
byte [] bytes = new byte [] { Byte.MIN_VALUE, -1, Byte.MAX_VALUE };
@@ -128,18 +129,37 @@ public class HBaseTestSetup extends Test
float [] floats = new float [] { Float.MIN_VALUE, -1.0F, Float.MAX_VALUE };
double [] doubles = new double [] { Double.MIN_VALUE, -1.0, Double.MAX_VALUE };
- // store data
- for (int i = 0; i < puts.length; i++) {
- puts[i].add("cf".getBytes(), "cq-boolean".getBytes(), Bytes.toBytes(booleans[i]));
- puts[i].add("cf".getBytes(), "cq-byte".getBytes(), new byte [] { bytes[i] });
- puts[i].add("cf".getBytes(), "cq-short".getBytes(), Bytes.toBytes(shorts[i]));
- puts[i].add("cf".getBytes(), "cq-int".getBytes(), Bytes.toBytes(ints[i]));
- puts[i].add("cf".getBytes(), "cq-long".getBytes(), Bytes.toBytes(longs[i]));
- puts[i].add("cf".getBytes(), "cq-string".getBytes(), Bytes.toBytes(strings[i]));
- puts[i].add("cf".getBytes(), "cq-float".getBytes(), Bytes.toBytes(floats[i]));
- puts[i].add("cf".getBytes(), "cq-double".getBytes(), Bytes.toBytes(doubles[i]));
-
- htable.put(puts[i]);
+ HBaseAdmin hbaseAdmin = null;
+ HTableInterface htable = null;
+ try {
+ hbaseAdmin = new HBaseAdmin(hbaseConn.getConfiguration());
+ if (Arrays.asList(hbaseAdmin.listTables()).contains(htableDesc)) {
+ // if table is already in there, don't recreate.
+ return;
+ }
+ hbaseAdmin.createTable(htableDesc);
+ htable = hbaseConn.getTable(HBASE_TABLE_NAME);
+
+ // data
+ Put[] puts = new Put[]{
+ new Put("key-1".getBytes()), new Put("key-2".getBytes()), new Put("key-3".getBytes())};
+
+ // store data
+ for (int i = 0; i < puts.length; i++) {
+ puts[i].add("cf".getBytes(), "cq-boolean".getBytes(), Bytes.toBytes(booleans[i]));
+ puts[i].add("cf".getBytes(), "cq-byte".getBytes(), new byte[]{bytes[i]});
+ puts[i].add("cf".getBytes(), "cq-short".getBytes(), Bytes.toBytes(shorts[i]));
+ puts[i].add("cf".getBytes(), "cq-int".getBytes(), Bytes.toBytes(ints[i]));
+ puts[i].add("cf".getBytes(), "cq-long".getBytes(), Bytes.toBytes(longs[i]));
+ puts[i].add("cf".getBytes(), "cq-string".getBytes(), Bytes.toBytes(strings[i]));
+ puts[i].add("cf".getBytes(), "cq-float".getBytes(), Bytes.toBytes(floats[i]));
+ puts[i].add("cf".getBytes(), "cq-double".getBytes(), Bytes.toBytes(doubles[i]));
+
+ htable.put(puts[i]);
+ }
+ } finally {
+ if (htable != null) htable.close();
+ if (hbaseAdmin != null) hbaseAdmin.close();
}
}
@@ -152,6 +172,10 @@ public class HBaseTestSetup extends Test
@Override
protected void tearDown() throws Exception {
+ if (hbaseConn != null) {
+ hbaseConn.close();
+ hbaseConn = null;
+ }
if (hbaseCluster != null) {
HConnectionManager.deleteAllConnections(true);
hbaseCluster.shutdown();
Modified: hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
URL: http://svn.apache.org/viewvc/hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java?rev=1615730&r1=1615729&r2=1615730&view=diff
==============================================================================
--- hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java (original)
+++ hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java Mon Aug 4 19:17:53 2014
@@ -130,7 +130,7 @@ public class QTestUtil {
public static final HashSet<String> srcTables = new HashSet<String>();
private static MiniClusterType clusterType = MiniClusterType.none;
private ParseDriver pd;
- private Hive db;
+ protected Hive db;
protected HiveConf conf;
private Driver drv;
private BaseSemanticAnalyzer sem;
@@ -630,7 +630,7 @@ public class QTestUtil {
return;
}
- private void runCreateTableCmd(String createTableCmd) throws Exception {
+ protected void runCreateTableCmd(String createTableCmd) throws Exception {
int ecode = 0;
ecode = drv.run(createTableCmd).getResponseCode();
if (ecode != 0) {
@@ -641,7 +641,7 @@ public class QTestUtil {
return;
}
- private void runCmd(String cmd) throws Exception {
+ protected void runCmd(String cmd) throws Exception {
int ecode = 0;
ecode = drv.run(cmd).getResponseCode();
drv.close();
Modified: hive/trunk/pom.xml
URL: http://svn.apache.org/viewvc/hive/trunk/pom.xml?rev=1615730&r1=1615729&r2=1615730&view=diff
==============================================================================
--- hive/trunk/pom.xml (original)
+++ hive/trunk/pom.xml Mon Aug 4 19:17:53 2014
@@ -113,8 +113,8 @@
<hadoop-20S.version>1.2.1</hadoop-20S.version>
<hadoop-23.version>2.4.0</hadoop-23.version>
<hadoop.bin.path>${basedir}/${hive.path.to.root}/testutils/hadoop</hadoop.bin.path>
- <hbase.hadoop1.version>0.96.0-hadoop1</hbase.hadoop1.version>
- <hbase.hadoop2.version>0.96.0-hadoop2</hbase.hadoop2.version>
+ <hbase.hadoop1.version>0.98.3-hadoop1</hbase.hadoop1.version>
+ <hbase.hadoop2.version>0.98.3-hadoop2</hbase.hadoop2.version>
<!-- httpcomponents are not always in version sync -->
<httpcomponents.client.version>4.2.5</httpcomponents.client.version>
<httpcomponents.core.version>4.2.5</httpcomponents.core.version>
@@ -774,7 +774,7 @@
<test.warehouse.dir>${test.warehouse.scheme}${test.warehouse.dir}</test.warehouse.dir>
<java.net.preferIPv4Stack>true</java.net.preferIPv4Stack>
<!-- EnforceReadOnlyTables hook and QTestUtil -->
- <test.src.tables>src,src1,srcbucket,srcbucket2,src_json,src_thrift,src_sequencefile,srcpart,alltypesorc</test.src.tables>
+ <test.src.tables>src,src1,srcbucket,srcbucket2,src_json,src_thrift,src_sequencefile,srcpart,alltypesorc,src_hbase</test.src.tables>
<java.security.krb5.conf>${test.tmp.dir}/conf/krb5.conf</java.security.krb5.conf>
</systemPropertyVariables>
</configuration>
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java?rev=1615730&r1=1615729&r2=1615730&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java Mon Aug 4 19:17:53 2014
@@ -3924,12 +3924,16 @@ public class DDLTask extends Task<DDLWor
tbl.setInputFormatClass(crtTbl.getInputFormat());
tbl.setOutputFormatClass(crtTbl.getOutputFormat());
- tbl.getTTable().getSd().setInputFormat(
- tbl.getInputFormatClass().getName());
- tbl.getTTable().getSd().setOutputFormat(
- tbl.getOutputFormatClass().getName());
+ // only persist input/ouput format to metadata when it is explicitly specified.
+ // Otherwise, load lazily via StorageHandler at query time.
+ if (crtTbl.getInputFormat() != null && !crtTbl.getInputFormat().isEmpty()) {
+ tbl.getTTable().getSd().setInputFormat(tbl.getInputFormatClass().getName());
+ }
+ if (crtTbl.getOutputFormat() != null && !crtTbl.getOutputFormat().isEmpty()) {
+ tbl.getTTable().getSd().setOutputFormat(tbl.getOutputFormatClass().getName());
+ }
- if (!Utilities.isDefaultNameNode(conf)) {
+ if (!Utilities.isDefaultNameNode(conf) && tbl.getTTable().getSd().isSetLocation()) {
// If location is specified - ensure that it is a full qualified name
makeLocationQualified(tbl.getDbName(), tbl.getTTable().getSd(), tbl.getTableName());
}