You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/10/30 17:22:48 UTC
svn commit: r1635536 [3/28] - in /hive/branches/spark: ./ accumulo-handler/
accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/
accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/columns/
accumulo-handler/src/test/org/apache/hadoo...
Modified: hive/branches/spark/contrib/src/java/org/apache/hadoop/hive/contrib/serde2/MultiDelimitSerDe.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/contrib/src/java/org/apache/hadoop/hive/contrib/serde2/MultiDelimitSerDe.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/contrib/src/java/org/apache/hadoop/hive/contrib/serde2/MultiDelimitSerDe.java (original)
+++ hive/branches/spark/contrib/src/java/org/apache/hadoop/hive/contrib/serde2/MultiDelimitSerDe.java Thu Oct 30 16:22:33 2014
@@ -55,6 +55,14 @@ import org.apache.hadoop.io.Writable;
* Currently field.delim can be multiple character while collection.delim
* and mapkey.delim should be just single character.
*/
+@SerDeSpec(schemaProps = {
+ serdeConstants.LIST_COLUMNS, serdeConstants.LIST_COLUMN_TYPES,
+ serdeConstants.FIELD_DELIM, serdeConstants.COLLECTION_DELIM, serdeConstants.MAPKEY_DELIM,
+ serdeConstants.SERIALIZATION_FORMAT, serdeConstants.SERIALIZATION_NULL_FORMAT,
+ serdeConstants.SERIALIZATION_LAST_COLUMN_TAKES_REST,
+ serdeConstants.ESCAPE_CHAR,
+ serdeConstants.SERIALIZATION_ENCODING,
+ LazySimpleSerDe.SERIALIZATION_EXTEND_NESTING_LEVELS})
public class MultiDelimitSerDe extends AbstractSerDe {
private static final Log LOG = LogFactory.getLog(MultiDelimitSerDe.class.getName());
private static final byte[] DEFAULT_SEPARATORS = {(byte) 1, (byte) 2, (byte) 3};
Modified: hive/branches/spark/contrib/src/java/org/apache/hadoop/hive/contrib/serde2/RegexSerDe.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/contrib/src/java/org/apache/hadoop/hive/contrib/serde2/RegexSerDe.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/contrib/src/java/org/apache/hadoop/hive/contrib/serde2/RegexSerDe.java (original)
+++ hive/branches/spark/contrib/src/java/org/apache/hadoop/hive/contrib/serde2/RegexSerDe.java Thu Oct 30 16:22:33 2014
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeSpec;
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
@@ -70,10 +71,18 @@ import org.apache.hadoop.io.Writable;
* writableStringObjectInspector. We should switch to that when we have a UTF-8
* based Regex library.
*/
+@SerDeSpec(schemaProps = {
+ serdeConstants.LIST_COLUMNS, serdeConstants.LIST_COLUMN_TYPES,
+ RegexSerDe.INPUT_REGEX, RegexSerDe.OUTPUT_FORMAT_STRING,
+ RegexSerDe.INPUT_REGEX_CASE_SENSITIVE })
public class RegexSerDe extends AbstractSerDe {
public static final Log LOG = LogFactory.getLog(RegexSerDe.class.getName());
+ public static final String INPUT_REGEX = "input.regex";
+ public static final String OUTPUT_FORMAT_STRING = "output.format.string";
+ public static final String INPUT_REGEX_CASE_SENSITIVE = "input.regex.case.insensitive";
+
int numColumns;
String inputRegex;
String outputFormatString;
@@ -90,12 +99,12 @@ public class RegexSerDe extends Abstract
// We can get the table definition from tbl.
// Read the configuration parameters
- inputRegex = tbl.getProperty("input.regex");
- outputFormatString = tbl.getProperty("output.format.string");
+ inputRegex = tbl.getProperty(INPUT_REGEX);
+ outputFormatString = tbl.getProperty(OUTPUT_FORMAT_STRING);
String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS);
String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
boolean inputRegexIgnoreCase = "true".equalsIgnoreCase(tbl
- .getProperty("input.regex.case.insensitive"));
+ .getProperty(INPUT_REGEX_CASE_SENSITIVE));
// Parse the configuration parameters
if (inputRegex != null) {
@@ -258,6 +267,7 @@ public class RegexSerDe extends Abstract
return outputRowText;
}
+ @Override
public SerDeStats getSerDeStats() {
// no support for statistics
return null;
Modified: hive/branches/spark/contrib/src/java/org/apache/hadoop/hive/contrib/serde2/TypedBytesSerDe.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/contrib/src/java/org/apache/hadoop/hive/contrib/serde2/TypedBytesSerDe.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/contrib/src/java/org/apache/hadoop/hive/contrib/serde2/TypedBytesSerDe.java (original)
+++ hive/branches/spark/contrib/src/java/org/apache/hadoop/hive/contrib/serde2/TypedBytesSerDe.java Thu Oct 30 16:22:33 2014
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.io.NonS
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeSpec;
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.io.ByteWritable;
@@ -73,6 +74,7 @@ import org.apache.hadoop.io.Writable;
* this, which is apparently 25% faster than the python version is available at
* http://github.com/klbostee/ctypedbytes/tree/master
*/
+@SerDeSpec(schemaProps = {serdeConstants.LIST_COLUMNS, serdeConstants.LIST_COLUMN_TYPES})
public class TypedBytesSerDe extends AbstractSerDe {
public static final Log LOG = LogFactory.getLog(TypedBytesSerDe.class
Modified: hive/branches/spark/data/conf/hive-log4j.properties
URL: http://svn.apache.org/viewvc/hive/branches/spark/data/conf/hive-log4j.properties?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/data/conf/hive-log4j.properties (original)
+++ hive/branches/spark/data/conf/hive-log4j.properties Thu Oct 30 16:22:33 2014
@@ -75,6 +75,11 @@ log4j.category.JPOX.Query=ERROR,DRFA
log4j.category.JPOX.General=ERROR,DRFA
log4j.category.JPOX.Enhancer=ERROR,DRFA
log4j.logger.org.apache.hadoop.conf.Configuration=ERROR,DRFA
+log4j.logger.org.apache.zookeeper=INFO,DRFA
+log4j.logger.org.apache.zookeeper.server.ServerCnxn=WARN,DRFA
log4j.logger.org.apache.zookeeper.server.NIOServerCnxn=WARN,DRFA
+log4j.logger.org.apache.zookeeper.ClientCnxn=WARN,DRFA
+log4j.logger.org.apache.zookeeper.ClientCnxnSocket=WARN,DRFA
log4j.logger.org.apache.zookeeper.ClientCnxnSocketNIO=WARN,DRFA
-
+log4j.logger.org.apache.hadoop.hive.ql.log.PerfLogger=WARN,DRFA
+log4j.logger.org.apache.hadoop.hive.ql.exec.Operator=INFO,DRFA
Modified: hive/branches/spark/hbase-handler/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/hbase-handler/pom.xml?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hbase-handler/pom.xml (original)
+++ hive/branches/spark/hbase-handler/pom.xml Thu Oct 30 16:22:33 2014
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.hive</groupId>
<artifactId>hive</artifactId>
- <version>0.14.0-SNAPSHOT</version>
+ <version>0.15.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
Propchange: hive/branches/spark/hbase-handler/pom.xml
------------------------------------------------------------------------------
Merged /hive/trunk/hbase-handler/pom.xml:r1629563-1631926,1631928-1632135,1632137-1633421
Modified: hive/branches/spark/hbase-handler/src/java/org/apache/hadoop/hive/hbase/ColumnMappings.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/hbase-handler/src/java/org/apache/hadoop/hive/hbase/ColumnMappings.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hbase-handler/src/java/org/apache/hadoop/hive/hbase/ColumnMappings.java (original)
+++ hive/branches/spark/hbase-handler/src/java/org/apache/hadoop/hive/hbase/ColumnMappings.java Thu Oct 30 16:22:33 2014
@@ -34,7 +34,9 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import com.google.common.collect.Iterators;
@@ -42,11 +44,17 @@ import com.google.common.collect.Iterato
public class ColumnMappings implements Iterable<ColumnMappings.ColumnMapping> {
private final int keyIndex;
+ private final int timestampIndex;
private final ColumnMapping[] columnsMapping;
public ColumnMappings(List<ColumnMapping> columnMapping, int keyIndex) {
+ this(columnMapping, keyIndex, -1);
+ }
+
+ public ColumnMappings(List<ColumnMapping> columnMapping, int keyIndex, int timestampIndex) {
this.columnsMapping = columnMapping.toArray(new ColumnMapping[columnMapping.size()]);
this.keyIndex = keyIndex;
+ this.timestampIndex = timestampIndex;
}
@Override
@@ -109,7 +117,9 @@ public class ColumnMappings implements I
// where key extends LazyPrimitive<?, ?> and thus has type Category.PRIMITIVE
for (int i = 0; i < columnNames.size(); i++) {
ColumnMapping colMap = columnsMapping[i];
- if (colMap.qualifierName == null && !colMap.hbaseRowKey) {
+ colMap.columnName = columnNames.get(i);
+ colMap.columnType = columnTypes.get(i);
+ if (colMap.qualifierName == null && !colMap.hbaseRowKey && !colMap.hbaseTimestamp) {
TypeInfo typeInfo = columnTypes.get(i);
if ((typeInfo.getCategory() != ObjectInspector.Category.MAP) ||
(((MapTypeInfo) typeInfo).getMapKeyTypeInfo().getCategory()
@@ -122,8 +132,14 @@ public class ColumnMappings implements I
+ typeInfo.getTypeName());
}
}
- colMap.columnName = columnNames.get(i);
- colMap.columnType = columnTypes.get(i);
+ if (colMap.hbaseTimestamp) {
+ TypeInfo typeInfo = columnTypes.get(i);
+ if (!colMap.isCategory(PrimitiveCategory.TIMESTAMP) &&
+ !colMap.isCategory(PrimitiveCategory.LONG)) {
+ throw new SerDeException(serdeName + ": timestamp columns should be of " +
+ "timestamp or bigint type, but is mapped to " + typeInfo.getTypeName());
+ }
+ }
}
}
@@ -299,10 +315,18 @@ public class ColumnMappings implements I
return columnsMapping[keyIndex];
}
+ public ColumnMapping getTimestampMapping() {
+ return timestampIndex < 0 ? null : columnsMapping[timestampIndex];
+ }
+
public int getKeyIndex() {
return keyIndex;
}
+ public int getTimestampIndex() {
+ return timestampIndex;
+ }
+
public ColumnMapping[] getColumnsMapping() {
return columnsMapping;
}
@@ -326,6 +350,7 @@ public class ColumnMappings implements I
byte[] qualifierNameBytes;
List<Boolean> binaryStorage;
boolean hbaseRowKey;
+ boolean hbaseTimestamp;
String mappingSpec;
String qualifierPrefix;
byte[] qualifierPrefixBytes;
@@ -377,5 +402,14 @@ public class ColumnMappings implements I
public boolean isCategory(ObjectInspector.Category category) {
return columnType.getCategory() == category;
}
+
+ public boolean isCategory(PrimitiveCategory category) {
+ return columnType.getCategory() == ObjectInspector.Category.PRIMITIVE &&
+ ((PrimitiveTypeInfo)columnType).getPrimitiveCategory() == category;
+ }
+
+ public boolean isComparable() {
+ return binaryStorage.get(0) || isCategory(PrimitiveCategory.STRING);
+ }
}
}
Modified: hive/branches/spark/hbase-handler/src/java/org/apache/hadoop/hive/hbase/DefaultHBaseKeyFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/hbase-handler/src/java/org/apache/hadoop/hive/hbase/DefaultHBaseKeyFactory.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hbase-handler/src/java/org/apache/hadoop/hive/hbase/DefaultHBaseKeyFactory.java (original)
+++ hive/branches/spark/hbase-handler/src/java/org/apache/hadoop/hive/hbase/DefaultHBaseKeyFactory.java Thu Oct 30 16:22:33 2014
@@ -21,17 +21,18 @@ package org.apache.hadoop.hive.hbase;
import java.io.IOException;
import java.util.Properties;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase;
-import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
public class DefaultHBaseKeyFactory extends AbstractHBaseKeyFactory implements HBaseKeyFactory {
- protected LazySimpleSerDe.SerDeParameters serdeParams;
+ protected SerDeParameters serdeParams;
protected HBaseRowSerializer serializer;
@Override
@@ -56,4 +57,12 @@ public class DefaultHBaseKeyFactory exte
public byte[] serializeKey(Object object, StructField field) throws IOException {
return serializer.serializeKeyField(object, field, keyMapping);
}
+
+ @VisibleForTesting
+ static DefaultHBaseKeyFactory forTest(SerDeParameters params, ColumnMappings mappings) {
+ DefaultHBaseKeyFactory factory = new DefaultHBaseKeyFactory();
+ factory.serdeParams = params;
+ factory.keyMapping = mappings.getKeyMapping();
+ return factory;
+ }
}
Modified: hive/branches/spark/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseKeyFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseKeyFactory.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseKeyFactory.java (original)
+++ hive/branches/spark/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseKeyFactory.java Thu Oct 30 16:22:33 2014
@@ -59,8 +59,7 @@ public interface HBaseKeyFactory extends
* serialize hive object in internal format of custom key
*
* @param object
- * @param inspector
- * @param output
+ * @param field
*
* @return true if it's not null
* @throws java.io.IOException
Modified: hive/branches/spark/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseRowSerializer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseRowSerializer.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseRowSerializer.java (original)
+++ hive/branches/spark/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseRowSerializer.java Thu Oct 30 16:22:33 2014
@@ -35,7 +35,9 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
import org.apache.hadoop.io.Writable;
public class HBaseRowSerializer {
@@ -45,7 +47,9 @@ public class HBaseRowSerializer {
private final LazySimpleSerDe.SerDeParameters serdeParam;
private final int keyIndex;
+ private final int timestampIndex;
private final ColumnMapping keyMapping;
+ private final ColumnMapping timestampMapping;
private final ColumnMapping[] columnMappings;
private final byte[] separators; // the separators array
private final boolean escaped; // whether we need to escape the data when writing out
@@ -66,8 +70,10 @@ public class HBaseRowSerializer {
this.escapeChar = serdeParam.getEscapeChar();
this.needsEscape = serdeParam.getNeedsEscape();
this.keyIndex = hbaseParam.getKeyIndex();
+ this.timestampIndex = hbaseParam.getTimestampIndex();
this.columnMappings = hbaseParam.getColumnMappings().getColumnsMapping();
this.keyMapping = hbaseParam.getColumnMappings().getKeyMapping();
+ this.timestampMapping = hbaseParam.getColumnMappings().getTimestampMapping();
this.putTimestamp = hbaseParam.getPutTimestamp();
}
@@ -81,25 +87,36 @@ public class HBaseRowSerializer {
// Prepare the field ObjectInspectors
StructObjectInspector soi = (StructObjectInspector) objInspector;
List<? extends StructField> fields = soi.getAllStructFieldRefs();
- List<Object> list = soi.getStructFieldsDataAsList(obj);
+ List<Object> values = soi.getStructFieldsDataAsList(obj);
StructField field = fields.get(keyIndex);
- Object value = list.get(keyIndex);
+ Object value = values.get(keyIndex);
byte[] key = keyFactory.serializeKey(value, field);
if (key == null) {
throw new SerDeException("HBase row key cannot be NULL");
}
+ long timestamp = putTimestamp;
+ if (timestamp < 0 && timestampIndex >= 0) {
+ ObjectInspector inspector = fields.get(timestampIndex).getFieldObjectInspector();
+ value = values.get(timestampIndex);
+ if (inspector instanceof LongObjectInspector) {
+ timestamp = ((LongObjectInspector)inspector).get(value);
+ } else {
+ PrimitiveObjectInspector primitive = (PrimitiveObjectInspector) inspector;
+ timestamp = PrimitiveObjectInspectorUtils.getTimestamp(value, primitive).getTime();
+ }
+ }
- Put put = putTimestamp >= 0 ? new Put(key, putTimestamp) : new Put(key);
+ Put put = timestamp >= 0 ? new Put(key, timestamp) : new Put(key);
// Serialize each field
for (int i = 0; i < fields.size(); i++) {
- if (i == keyIndex) {
+ if (i == keyIndex || i == timestampIndex) {
continue;
}
field = fields.get(i);
- value = list.get(i);
+ value = values.get(i);
serializeField(value, field, columnMappings[i], put);
}
Modified: hive/branches/spark/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java (original)
+++ hive/branches/spark/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java Thu Oct 30 16:22:33 2014
@@ -28,9 +28,11 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping;
import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeSpec;
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
@@ -42,6 +44,27 @@ import org.apache.hadoop.mapred.JobConf;
* HBaseSerDe can be used to serialize object into an HBase table and
* deserialize objects from an HBase table.
*/
+@SerDeSpec(schemaProps = {
+ serdeConstants.LIST_COLUMNS, serdeConstants.LIST_COLUMN_TYPES,
+ serdeConstants.FIELD_DELIM, serdeConstants.COLLECTION_DELIM, serdeConstants.MAPKEY_DELIM,
+ serdeConstants.SERIALIZATION_FORMAT, serdeConstants.SERIALIZATION_NULL_FORMAT,
+ serdeConstants.SERIALIZATION_LAST_COLUMN_TAKES_REST,
+ serdeConstants.ESCAPE_CHAR,
+ serdeConstants.SERIALIZATION_ENCODING,
+ LazySimpleSerDe.SERIALIZATION_EXTEND_NESTING_LEVELS,
+ HBaseSerDe.HBASE_COLUMNS_MAPPING,
+ HBaseSerDe.HBASE_TABLE_NAME,
+ HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE,
+ HBaseSerDe.HBASE_KEY_COL,
+ HBaseSerDe.HBASE_PUT_TIMESTAMP,
+ HBaseSerDe.HBASE_COMPOSITE_KEY_CLASS,
+ HBaseSerDe.HBASE_COMPOSITE_KEY_TYPES,
+ HBaseSerDe.HBASE_COMPOSITE_KEY_FACTORY,
+ HBaseSerDe.HBASE_STRUCT_SERIALIZER_CLASS,
+ HBaseSerDe.HBASE_SCAN_CACHE,
+ HBaseSerDe.HBASE_SCAN_CACHEBLOCKS,
+ HBaseSerDe.HBASE_SCAN_BATCH,
+ HBaseSerDe.HBASE_AUTOGENERATE_STRUCT})
public class HBaseSerDe extends AbstractSerDe {
public static final Log LOG = LogFactory.getLog(HBaseSerDe.class);
@@ -49,6 +72,7 @@ public class HBaseSerDe extends Abstract
public static final String HBASE_TABLE_NAME = "hbase.table.name";
public static final String HBASE_TABLE_DEFAULT_STORAGE_TYPE = "hbase.table.default.storage.type";
public static final String HBASE_KEY_COL = ":key";
+ public static final String HBASE_TIMESTAMP_COL = ":timestamp";
public static final String HBASE_PUT_TIMESTAMP = "hbase.put.timestamp";
public static final String HBASE_COMPOSITE_KEY_CLASS = "hbase.composite.key.class";
public static final String HBASE_COMPOSITE_KEY_TYPES = "hbase.composite.key.types";
@@ -98,8 +122,7 @@ public class HBaseSerDe extends Abstract
serdeParams.getValueFactories());
cachedHBaseRow = new LazyHBaseRow(
- (LazySimpleStructObjectInspector) cachedObjectInspector,
- serdeParams.getKeyIndex(), serdeParams.getKeyFactory(), serdeParams.getValueFactories());
+ (LazySimpleStructObjectInspector) cachedObjectInspector, serdeParams);
serializer = new HBaseRowSerializer(serdeParams);
@@ -135,6 +158,7 @@ public class HBaseSerDe extends Abstract
}
int rowKeyIndex = -1;
+ int timestampIndex = -1;
List<ColumnMapping> columnsMapping = new ArrayList<ColumnMapping>();
String[] columnSpecs = columnsMappingSpec.split(",");
@@ -160,12 +184,20 @@ public class HBaseSerDe extends Abstract
columnMapping.qualifierName = null;
columnMapping.qualifierNameBytes = null;
columnMapping.hbaseRowKey = true;
+ } else if (colInfo.equals(HBASE_TIMESTAMP_COL)) {
+ timestampIndex = i;
+ columnMapping.familyName = colInfo;
+ columnMapping.familyNameBytes = Bytes.toBytes(colInfo);
+ columnMapping.qualifierName = null;
+ columnMapping.qualifierNameBytes = null;
+ columnMapping.hbaseTimestamp = true;
} else {
String [] parts = colInfo.split(":");
assert(parts.length > 0 && parts.length <= 2);
columnMapping.familyName = parts[0];
columnMapping.familyNameBytes = Bytes.toBytes(parts[0]);
columnMapping.hbaseRowKey = false;
+ columnMapping.hbaseTimestamp = false;
if (parts.length == 2) {
@@ -205,7 +237,7 @@ public class HBaseSerDe extends Abstract
columnsMapping.add(0, columnMapping);
}
- return new ColumnMappings(columnsMapping, rowKeyIndex);
+ return new ColumnMappings(columnsMapping, rowKeyIndex, timestampIndex);
}
public LazySimpleSerDe.SerDeParameters getSerdeParams() {
@@ -228,7 +260,7 @@ public class HBaseSerDe extends Abstract
throw new SerDeException(getClass().getName() + ": expects ResultWritable!");
}
- cachedHBaseRow.init(((ResultWritable) result).getResult(), serdeParams.getColumnMappings());
+ cachedHBaseRow.init(((ResultWritable) result).getResult());
return cachedHBaseRow;
}
Modified: hive/branches/spark/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeParameters.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeParameters.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeParameters.java (original)
+++ hive/branches/spark/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeParameters.java Thu Oct 30 16:22:33 2014
@@ -38,6 +38,8 @@ import org.apache.hadoop.hive.serde2.laz
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.util.ReflectionUtils;
+import javax.annotation.Nullable;
+
/**
* HBaseSerDeParameters encapsulates SerDeParameters and additional configurations that are specific for
* HBaseSerDe.
@@ -128,6 +130,14 @@ public class HBaseSerDeParameters {
return columnMappings.getKeyMapping();
}
+ public int getTimestampIndex() {
+ return columnMappings.getTimestampIndex();
+ }
+
+ public ColumnMapping getTimestampColumnMapping() {
+ return columnMappings.getTimestampMapping();
+ }
+
public ColumnMappings getColumnMappings() {
return columnMappings;
}
@@ -175,17 +185,25 @@ public class HBaseSerDeParameters {
throws Exception {
String factoryClassName = tbl.getProperty(HBaseSerDe.HBASE_COMPOSITE_KEY_FACTORY);
if (factoryClassName != null) {
- Class<?> factoryClazz = Class.forName(factoryClassName);
+ Class<?> factoryClazz = loadClass(factoryClassName, job);
return (HBaseKeyFactory) ReflectionUtils.newInstance(factoryClazz, job);
}
String keyClassName = tbl.getProperty(HBaseSerDe.HBASE_COMPOSITE_KEY_CLASS);
if (keyClassName != null) {
- Class<?> keyClass = Class.forName(keyClassName);
+ Class<?> keyClass = loadClass(keyClassName, job);
return new CompositeHBaseKeyFactory(keyClass);
}
return new DefaultHBaseKeyFactory();
}
+ private static Class<?> loadClass(String className, @Nullable Configuration configuration)
+ throws Exception {
+ if (configuration != null) {
+ return configuration.getClassByName(className);
+ }
+ return Class.forName(className);
+ }
+
private List<HBaseValueFactory> initValueFactories(Configuration conf, Properties tbl)
throws SerDeException {
List<HBaseValueFactory> valueFactories = createValueFactories(conf, tbl);
@@ -216,7 +234,7 @@ public class HBaseSerDeParameters {
+ " must be set for hbase columns of type [" + STRUCT_SERIALIZATION_TYPE + "]");
}
- Class<?> structValueClass = job.getClassByName(structValueClassName);
+ Class<?> structValueClass = loadClass(structValueClassName, job);
valueFactories.add(new StructHBaseValueFactory(i, structValueClass));
} else {
valueFactories.add(new DefaultHBaseValueFactory(i));
@@ -321,6 +339,10 @@ public class HBaseSerDeParameters {
tbl.getProperty(colMap.familyName + "." + qualifierName + "." + AvroSerdeUtils.SCHEMA_URL);
}
+ if (serType == null) {
+ throw new IllegalArgumentException("serialization.type property is missing");
+ }
+
String avroSchemaRetClass = tbl.getProperty(AvroSerdeUtils.SCHEMA_RETRIEVER);
if (schemaLiteral == null && serClassName == null && schemaUrl == null
@@ -333,7 +355,7 @@ public class HBaseSerDeParameters {
Class<?> deserializerClass = null;
if (serClassName != null) {
- deserializerClass = conf.getClassByName(serClassName);
+ deserializerClass = loadClass(serClassName, conf);
}
Schema schema = null;
@@ -354,4 +376,4 @@ public class HBaseSerDeParameters {
return schema;
}
-}
\ No newline at end of file
+}
Modified: hive/branches/spark/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java (original)
+++ hive/branches/spark/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java Thu Oct 30 16:22:33 2014
@@ -186,7 +186,7 @@ public class HBaseStorageHandler extends
Set<String> uniqueColumnFamilies = new HashSet<String>();
for (ColumnMapping colMap : columnMappings) {
- if (!colMap.hbaseRowKey) {
+ if (!colMap.hbaseRowKey && !colMap.hbaseTimestamp) {
uniqueColumnFamilies.add(colMap.familyName);
}
}
@@ -213,7 +213,7 @@ public class HBaseStorageHandler extends
for (ColumnMapping colMap : columnMappings) {
- if (colMap.hbaseRowKey) {
+ if (colMap.hbaseRowKey || colMap.hbaseTimestamp) {
continue;
}
@@ -475,6 +475,11 @@ public class HBaseStorageHandler extends
TableMapReduceUtil.addDependencyJars(copy);
merged.addAll(copy.getConfiguration().getStringCollection("tmpjars"));
jobConf.set("tmpjars", StringUtils.arrayToString(merged.toArray(new String[0])));
+
+ // Get credentials using the configuration instance which has HBase properties
+ JobConf hbaseJobConf = new JobConf(getConf());
+ org.apache.hadoop.hbase.mapred.TableMapReduceUtil.initCredentials(hbaseJobConf);
+ jobConf.getCredentials().mergeAll(hbaseJobConf.getCredentials());
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -495,34 +500,38 @@ public class HBaseStorageHandler extends
HBaseSerDe hBaseSerDe,
ExprNodeDesc predicate) {
ColumnMapping keyMapping = hBaseSerDe.getHBaseSerdeParam().getKeyColumnMapping();
+ ColumnMapping tsMapping = hBaseSerDe.getHBaseSerdeParam().getTimestampColumnMapping();
IndexPredicateAnalyzer analyzer = HiveHBaseTableInputFormat.newIndexPredicateAnalyzer(
- keyMapping.columnName, keyMapping.columnType, keyMapping.binaryStorage.get(0));
- List<IndexSearchCondition> searchConditions =
- new ArrayList<IndexSearchCondition>();
+ keyMapping.columnName, keyMapping.isComparable(),
+ tsMapping == null ? null : tsMapping.columnName);
+ List<IndexSearchCondition> conditions = new ArrayList<IndexSearchCondition>();
ExprNodeGenericFuncDesc residualPredicate =
- (ExprNodeGenericFuncDesc)analyzer.analyzePredicate(predicate, searchConditions);
- int scSize = searchConditions.size();
- if (scSize < 1 || 2 < scSize) {
- // Either there was nothing which could be pushed down (size = 0),
- // there were complex predicates which we don't support yet.
- // Currently supported are one of the form:
- // 1. key < 20 (size = 1)
- // 2. key = 20 (size = 1)
- // 3. key < 20 and key > 10 (size = 2)
- return null;
- }
- if (scSize == 2 &&
- (searchConditions.get(0).getComparisonOp()
- .equals("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual") ||
- searchConditions.get(1).getComparisonOp()
- .equals("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual"))) {
- // If one of the predicates is =, then any other predicate with it is illegal.
- return null;
+ (ExprNodeGenericFuncDesc)analyzer.analyzePredicate(predicate, conditions);
+
+ for (List<IndexSearchCondition> searchConditions:
+ HiveHBaseInputFormatUtil.decompose(conditions).values()) {
+ int scSize = searchConditions.size();
+ if (scSize < 1 || 2 < scSize) {
+ // Either there was nothing which could be pushed down (size = 0),
+ // there were complex predicates which we don't support yet.
+ // Currently supported are one of the form:
+ // 1. key < 20 (size = 1)
+ // 2. key = 20 (size = 1)
+ // 3. key < 20 and key > 10 (size = 2)
+ return null;
+ }
+ if (scSize == 2 &&
+ (searchConditions.get(0).getComparisonOp()
+ .equals("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual") ||
+ searchConditions.get(1).getComparisonOp()
+ .equals("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual"))) {
+ // If one of the predicates is =, then any other predicate with it is illegal.
+ return null;
+ }
}
DecomposedPredicate decomposedPredicate = new DecomposedPredicate();
- decomposedPredicate.pushedPredicate = analyzer.translateSearchConditions(
- searchConditions);
+ decomposedPredicate.pushedPredicate = analyzer.translateSearchConditions(conditions);
decomposedPredicate.residualPredicate = residualPredicate;
return decomposedPredicate;
}
Modified: hive/branches/spark/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java (original)
+++ hive/branches/spark/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java Thu Oct 30 16:22:33 2014
@@ -23,13 +23,16 @@ import org.apache.hadoop.hbase.client.HT
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping;
+import org.apache.hadoop.hive.ql.index.IndexSearchCondition;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.mapred.JobConf;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
/**
* Util code common between HiveHBaseTableInputFormat and HiveHBaseTableSnapshotInputFormat.
@@ -74,7 +77,7 @@ class HiveHBaseInputFormatUtil {
ColumnMapping[] columnsMapping = columnMappings.getColumnsMapping();
for (int i : readColIDs) {
ColumnMapping colMap = columnsMapping[i];
- if (colMap.hbaseRowKey) {
+ if (colMap.hbaseRowKey || colMap.hbaseTimestamp) {
continue;
}
@@ -99,7 +102,7 @@ class HiveHBaseInputFormatUtil {
// tables column projection.
if (empty) {
for (ColumnMapping colMap: columnMappings) {
- if (colMap.hbaseRowKey) {
+ if (colMap.hbaseRowKey || colMap.hbaseTimestamp) {
continue;
}
@@ -153,4 +156,19 @@ class HiveHBaseInputFormatUtil {
throw new IOException("Malformed string: " + spec);
}
}
+
+ public static Map<String, List<IndexSearchCondition>> decompose(
+ List<IndexSearchCondition> searchConditions) {
+ Map<String, List<IndexSearchCondition>> result =
+ new HashMap<String, List<IndexSearchCondition>>();
+ for (IndexSearchCondition condition : searchConditions) {
+ List<IndexSearchCondition> conditions = result.get(condition.getColumnDesc().getColumn());
+ if (conditions == null) {
+ conditions = new ArrayList<IndexSearchCondition>();
+ result.put(condition.getColumnDesc().getColumn(), conditions);
+ }
+ conditions.add(condition);
+ }
+ return result;
+ }
}
Modified: hive/branches/spark/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java (original)
+++ hive/branches/spark/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java Thu Oct 30 16:22:33 2014
@@ -20,7 +20,9 @@ package org.apache.hadoop.hive.hbase;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -51,9 +53,11 @@ import org.apache.hadoop.hive.serde2.io.
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.hive.serde2.io.ShortWritable;
import org.apache.hadoop.hive.serde2.lazy.LazyUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.FloatWritable;
@@ -69,6 +73,7 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
/**
* HiveHBaseTableInputFormat implements InputFormat for HBase storage handler
@@ -171,7 +176,7 @@ public class HiveHBaseTableInputFormat e
*
* @return converted table split if any
*/
- private Scan createFilterScan(JobConf jobConf, int iKey, boolean isKeyBinary)
+ private Scan createFilterScan(JobConf jobConf, int iKey, int iTimestamp, boolean isKeyBinary)
throws IOException {
// TODO: assert iKey is HBaseSerDe#HBASE_KEY_COL
@@ -193,22 +198,29 @@ public class HiveHBaseTableInputFormat e
if (filterExprSerialized == null) {
return scan;
}
+
ExprNodeGenericFuncDesc filterExpr =
Utilities.deserializeExpression(filterExprSerialized);
- String colName = jobConf.get(serdeConstants.LIST_COLUMNS).split(",")[iKey];
+ String keyColName = jobConf.get(serdeConstants.LIST_COLUMNS).split(",")[iKey];
String colType = jobConf.get(serdeConstants.LIST_COLUMN_TYPES).split(",")[iKey];
- IndexPredicateAnalyzer analyzer = newIndexPredicateAnalyzer(colName,colType, isKeyBinary);
+ boolean isKeyComparable = isKeyBinary || colType.equalsIgnoreCase("string");
+
+ String tsColName = null;
+ if (iTimestamp >= 0) {
+ tsColName = jobConf.get(serdeConstants.LIST_COLUMNS).split(",")[iTimestamp];
+ }
- List<IndexSearchCondition> searchConditions =
- new ArrayList<IndexSearchCondition>();
- ExprNodeDesc residualPredicate =
- analyzer.analyzePredicate(filterExpr, searchConditions);
+ IndexPredicateAnalyzer analyzer =
+ newIndexPredicateAnalyzer(keyColName, isKeyComparable, tsColName);
+
+ List<IndexSearchCondition> conditions = new ArrayList<IndexSearchCondition>();
+ ExprNodeDesc residualPredicate = analyzer.analyzePredicate(filterExpr, conditions);
// There should be no residual since we already negotiated that earlier in
// HBaseStorageHandler.decomposePredicate. However, with hive.optimize.index.filter
// OpProcFactory#pushFilterToStorageHandler pushes the original filter back down again.
- // Since pushed-down filters are not ommitted at the higher levels (and thus the
+ // Since pushed-down filters are not omitted at the higher levels (and thus the
// contract of negotiation is ignored anyway), just ignore the residuals.
// Re-assess this when negotiation is honored and the duplicate evaluation is removed.
// THIS IGNORES RESIDUAL PARSING FROM HBaseStorageHandler#decomposePredicate
@@ -216,9 +228,23 @@ public class HiveHBaseTableInputFormat e
LOG.debug("Ignoring residual predicate " + residualPredicate.getExprString());
}
+ Map<String, List<IndexSearchCondition>> split = HiveHBaseInputFormatUtil.decompose(conditions);
+ List<IndexSearchCondition> keyConditions = split.get(keyColName);
+ if (keyConditions != null && !keyConditions.isEmpty()) {
+ setupKeyRange(scan, keyConditions, isKeyBinary);
+ }
+ List<IndexSearchCondition> tsConditions = split.get(tsColName);
+ if (tsConditions != null && !tsConditions.isEmpty()) {
+ setupTimeRange(scan, tsConditions);
+ }
+ return scan;
+ }
+
+ private void setupKeyRange(Scan scan, List<IndexSearchCondition> conditions, boolean isBinary)
+ throws IOException {
// Convert the search condition into a restriction on the HBase scan
byte [] startRow = HConstants.EMPTY_START_ROW, stopRow = HConstants.EMPTY_END_ROW;
- for (IndexSearchCondition sc : searchConditions){
+ for (IndexSearchCondition sc : conditions) {
ExprNodeConstantEvaluator eval = new ExprNodeConstantEvaluator(sc.getConstantDesc());
PrimitiveObjectInspector objInspector;
@@ -234,7 +260,7 @@ public class HiveHBaseTableInputFormat e
throw new IOException(e);
}
- byte [] constantVal = getConstantVal(writable, objInspector, isKeyBinary);
+ byte[] constantVal = getConstantVal(writable, objInspector, isBinary);
String comparisonOp = sc.getComparisonOp();
if("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual".equals(comparisonOp)){
@@ -261,7 +287,52 @@ public class HiveHBaseTableInputFormat e
if (LOG.isDebugEnabled()) {
LOG.debug(Bytes.toStringBinary(startRow) + " ~ " + Bytes.toStringBinary(stopRow));
}
- return scan;
+ }
+
+ private void setupTimeRange(Scan scan, List<IndexSearchCondition> conditions)
+ throws IOException {
+ long start = 0;
+ long end = Long.MAX_VALUE;
+ for (IndexSearchCondition sc : conditions) {
+ long timestamp = getTimestampVal(sc);
+ String comparisonOp = sc.getComparisonOp();
+ if("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual".equals(comparisonOp)){
+ start = timestamp;
+ end = timestamp + 1;
+ } else if ("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan".equals(comparisonOp)){
+ end = timestamp;
+ } else if ("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan"
+ .equals(comparisonOp)) {
+ start = timestamp;
+ } else if ("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan"
+ .equals(comparisonOp)){
+ start = timestamp + 1;
+ } else if ("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan"
+ .equals(comparisonOp)){
+ end = timestamp + 1;
+ } else {
+ throw new IOException(comparisonOp + " is not a supported comparison operator");
+ }
+ }
+ scan.setTimeRange(start, end);
+ }
+
+ private long getTimestampVal(IndexSearchCondition sc) throws IOException {
+ long timestamp;
+ try {
+ ExprNodeConstantEvaluator eval = new ExprNodeConstantEvaluator(sc.getConstantDesc());
+ ObjectInspector inspector = eval.initialize(null);
+ Object value = eval.evaluate(null);
+ if (inspector instanceof LongObjectInspector) {
+ timestamp = ((LongObjectInspector)inspector).get(value);
+ } else {
+ PrimitiveObjectInspector primitive = (PrimitiveObjectInspector) inspector;
+ timestamp = PrimitiveObjectInspectorUtils.getTimestamp(value, primitive).getTime();
+ }
+ } catch (HiveException e) {
+ throw new IOException(e);
+ }
+ return timestamp;
}
private byte[] getConstantVal(Object writable, PrimitiveObjectInspector poi,
@@ -312,11 +383,6 @@ public class HiveHBaseTableInputFormat e
return next;
}
- static IndexPredicateAnalyzer newIndexPredicateAnalyzer(
- String keyColumnName, TypeInfo keyColType, boolean isKeyBinary) {
- return newIndexPredicateAnalyzer(keyColumnName, keyColType.getTypeName(), isKeyBinary);
- }
-
/**
* Instantiates a new predicate analyzer suitable for
* determining how to push a filter down into the HBase scan,
@@ -327,27 +393,35 @@ public class HiveHBaseTableInputFormat e
* @return preconfigured predicate analyzer
*/
static IndexPredicateAnalyzer newIndexPredicateAnalyzer(
- String keyColumnName, String keyColType, boolean isKeyBinary) {
+ String keyColumnName, boolean isKeyComparable, String timestampColumn) {
IndexPredicateAnalyzer analyzer = new IndexPredicateAnalyzer();
// We can always do equality predicate. Just need to make sure we get appropriate
// BA representation of constant of filter condition.
- analyzer.addComparisonOp("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual");
// We can do other comparisons only if storage format in hbase is either binary
- // or we are dealing with string types since there lexographic ordering will suffice.
- if(isKeyBinary || (keyColType.equalsIgnoreCase("string"))){
- analyzer.addComparisonOp("org.apache.hadoop.hive.ql.udf.generic." +
- "GenericUDFOPEqualOrGreaterThan");
- analyzer.addComparisonOp("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan");
- analyzer.addComparisonOp("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan");
- analyzer.addComparisonOp("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan");
+ // or we are dealing with string types since there lexicographic ordering will suffice.
+ if (isKeyComparable) {
+ analyzer.addComparisonOp(keyColumnName,
+ "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual",
+ "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan",
+ "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan",
+ "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan",
+ "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan");
+ } else {
+ analyzer.addComparisonOp(keyColumnName,
+ "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual");
+ }
+
+ if (timestampColumn != null) {
+ analyzer.addComparisonOp(timestampColumn,
+ "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual",
+ "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan",
+ "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan",
+ "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan",
+ "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan");
}
- // and only on the key column
- analyzer.clearAllowedColumnNames();
- analyzer.allowColumnName(keyColumnName);
-
return analyzer;
}
@@ -355,7 +429,9 @@ public class HiveHBaseTableInputFormat e
public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException {
//obtain delegation tokens for the job
- TableMapReduceUtil.initCredentials(jobConf);
+ if (UserGroupInformation.getCurrentUser().hasKerberosCredentials()) {
+ TableMapReduceUtil.initCredentials(jobConf);
+ }
String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME);
setHTable(new HTable(HBaseConfiguration.create(jobConf), Bytes.toBytes(hbaseTableName)));
@@ -374,6 +450,7 @@ public class HiveHBaseTableInputFormat e
}
int iKey = columnMappings.getKeyIndex();
+ int iTimestamp = columnMappings.getTimestampIndex();
ColumnMapping keyMapping = columnMappings.getKeyMapping();
// Take filter pushdown into account while calculating splits; this
@@ -382,7 +459,7 @@ public class HiveHBaseTableInputFormat e
// split per region, the implementation actually takes the scan
// definition into account and excludes regions which don't satisfy
// the start/stop row conditions (HBASE-1829).
- Scan scan = createFilterScan(jobConf, iKey,
+ Scan scan = createFilterScan(jobConf, iKey, iTimestamp,
HiveHBaseInputFormatUtil.getStorageFormatOfKey(keyMapping.mappingSpec,
jobConf.get(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE, "string")));
@@ -392,7 +469,7 @@ public class HiveHBaseTableInputFormat e
// REVIEW: are we supposed to be applying the getReadColumnIDs
// same as in getRecordReader?
for (ColumnMapping colMap : columnMappings) {
- if (colMap.hbaseRowKey) {
+ if (colMap.hbaseRowKey || colMap.hbaseTimestamp) {
continue;
}
Modified: hive/branches/spark/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java (original)
+++ hive/branches/spark/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java Thu Oct 30 16:22:33 2014
@@ -42,6 +42,7 @@ import org.apache.hadoop.mapreduce.JobCo
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.security.UserGroupInformation;
/**
* HiveHBaseTableOutputFormat implements HiveOutputFormat for HBase tables.
@@ -72,7 +73,9 @@ public class HiveHBaseTableOutputFormat
public void checkOutputSpecs(FileSystem fs, JobConf jc) throws IOException {
//obtain delegation tokens for the job
- TableMapReduceUtil.initCredentials(jc);
+ if (UserGroupInformation.getCurrentUser().hasKerberosCredentials()) {
+ TableMapReduceUtil.initCredentials(jc);
+ }
String hbaseTableName = jc.get(HBaseSerDe.HBASE_TABLE_NAME);
jc.set(TableOutputFormat.OUTPUT_TABLE, hbaseTableName);
Modified: hive/branches/spark/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java (original)
+++ hive/branches/spark/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java Thu Oct 30 16:22:33 2014
@@ -22,13 +22,17 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping;
import org.apache.hadoop.hive.hbase.struct.HBaseValueFactory;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef;
+import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
+import org.apache.hadoop.hive.serde2.lazy.LazyLong;
import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase;
import org.apache.hadoop.hive.serde2.lazy.LazyStruct;
+import org.apache.hadoop.hive.serde2.lazy.LazyTimestamp;
import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
@@ -42,43 +46,48 @@ public class LazyHBaseRow extends LazySt
* The HBase columns mapping of the row.
*/
private Result result;
- private ColumnMapping[] columnsMapping;
private ArrayList<Object> cachedList;
- private final int iKey;
private final HBaseKeyFactory keyFactory;
private final List<HBaseValueFactory> valueFactories;
+ private final ColumnMapping[] columnsMapping;
- public LazyHBaseRow(LazySimpleStructObjectInspector oi) {
- this(oi, -1, null, null);
+ @VisibleForTesting
+ LazyHBaseRow(LazySimpleStructObjectInspector oi, ColumnMappings columnMappings) {
+ super(oi);
+ this.keyFactory = DefaultHBaseKeyFactory.forTest(null, columnMappings);
+ this.valueFactories = null;
+ this.columnsMapping = columnMappings.getColumnsMapping();
}
/**
* Construct a LazyHBaseRow object with the ObjectInspector.
*/
- public LazyHBaseRow(LazySimpleStructObjectInspector oi, int iKey, HBaseKeyFactory keyFactory,
- List<HBaseValueFactory> valueFactories) {
+ public LazyHBaseRow(LazySimpleStructObjectInspector oi, HBaseSerDeParameters serdeParams) {
super(oi);
- this.iKey = iKey;
- this.keyFactory = keyFactory;
- this.valueFactories = valueFactories;
+ this.keyFactory = serdeParams.getKeyFactory();
+ this.valueFactories = serdeParams.getValueFactories();
+ this.columnsMapping = serdeParams.getColumnMappings().getColumnsMapping();
}
/**
* Set the HBase row data(a Result writable) for this LazyStruct.
* @see LazyHBaseRow#init(org.apache.hadoop.hbase.client.Result)
*/
- public void init(Result r, ColumnMappings columnsMappings) {
+ public void init(Result r) {
this.result = r;
- this.columnsMapping = columnsMappings.getColumnsMapping();
setParsed(false);
}
@Override
- protected LazyObjectBase createLazyField(int fieldID, StructField fieldRef) throws SerDeException {
- if (fieldID == iKey) {
+ protected LazyObjectBase createLazyField(final int fieldID, final StructField fieldRef)
+ throws SerDeException {
+ if (columnsMapping[fieldID].hbaseRowKey) {
return keyFactory.createKey(fieldRef.getFieldObjectInspector());
}
+ if (columnsMapping[fieldID].hbaseTimestamp) {
+ return LazyFactory.createLazyObject(fieldRef.getFieldObjectInspector());
+ }
if (valueFactories != null) {
return valueFactories.get(fieldID).createValueObject(fieldRef.getFieldObjectInspector());
@@ -121,7 +130,6 @@ public class LazyHBaseRow extends LazySt
* Get the field out of the row without checking whether parsing is needed.
* This is called by both getField and getFieldsAsList.
* @param fieldID The id of the field starting from 0.
- * @param nullSequence The sequence representing NULL value.
* @return The value of the field
*/
private Object uncheckedGetField(int fieldID) {
@@ -136,6 +144,14 @@ public class LazyHBaseRow extends LazySt
if (colMap.hbaseRowKey) {
ref = new ByteArrayRef();
ref.setData(result.getRow());
+ } else if (colMap.hbaseTimestamp) {
+ long timestamp = result.rawCells()[0].getTimestamp(); // from hbase-0.96.0
+ LazyObjectBase lz = fields[fieldID];
+ if (lz instanceof LazyTimestamp) {
+ ((LazyTimestamp) lz).getWritableObject().setTime(timestamp);
+ } else {
+ ((LazyLong) lz).getWritableObject().set(timestamp);
+ }
} else {
if (colMap.qualifierName == null) {
// it is a column family
Modified: hive/branches/spark/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/DefaultHBaseValueFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/DefaultHBaseValueFactory.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/DefaultHBaseValueFactory.java (original)
+++ hive/branches/spark/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/DefaultHBaseValueFactory.java Thu Oct 30 16:22:33 2014
@@ -35,7 +35,7 @@ import org.apache.hadoop.hive.serde2.typ
/**
* Default implementation of the {@link HBaseValueFactory}
* */
-public class DefaultHBaseValueFactory implements HBaseValueFactory{
+public class DefaultHBaseValueFactory implements HBaseValueFactory {
protected LazySimpleSerDe.SerDeParameters serdeParams;
protected ColumnMappings columnMappings;
@@ -49,32 +49,32 @@ public class DefaultHBaseValueFactory im
this.fieldID = fieldID;
}
- @Override
+ @Override
public void init(HBaseSerDeParameters hbaseParams, Configuration conf, Properties properties)
- throws SerDeException {
+ throws SerDeException {
this.hbaseParams = hbaseParams;
this.serdeParams = hbaseParams.getSerdeParams();
this.columnMappings = hbaseParams.getColumnMappings();
this.properties = properties;
this.conf = conf;
- }
+ }
- @Override
- public ObjectInspector createValueObjectInspector(TypeInfo type)
- throws SerDeException {
+ @Override
+ public ObjectInspector createValueObjectInspector(TypeInfo type)
+ throws SerDeException {
return LazyFactory.createLazyObjectInspector(type, serdeParams.getSeparators(),
1, serdeParams.getNullSequence(), serdeParams.isEscaped(), serdeParams.getEscapeChar());
- }
+ }
@Override
public LazyObjectBase createValueObject(ObjectInspector inspector) throws SerDeException {
return HBaseSerDeHelper.createLazyField(columnMappings.getColumnsMapping(), fieldID, inspector);
}
- @Override
- public byte[] serializeValue(Object object, StructField field)
- throws IOException {
+ @Override
+ public byte[] serializeValue(Object object, StructField field)
+ throws IOException {
// TODO Add support for serialization of values here
- return null;
- }
-}
\ No newline at end of file
+ return null;
+ }
+}
Modified: hive/branches/spark/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java (original)
+++ hive/branches/spark/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java Thu Oct 30 16:22:33 2014
@@ -459,7 +459,7 @@ public class TestLazyHBaseObject extends
List<TypeInfo> fieldTypeInfos =
TypeInfoUtils.getTypeInfosFromTypeString(
"string,int,array<string>,map<string,string>,string");
- List<String> fieldNames = Arrays.asList(new String[]{"key", "a", "b", "c", "d"});
+ List<String> fieldNames = Arrays.asList("key", "a", "b", "c", "d");
Text nullSequence = new Text("\\N");
String hbaseColsMapping = ":key,cfa:a,cfa:b,cfb:c,cfb:d";
@@ -483,7 +483,7 @@ public class TestLazyHBaseObject extends
ObjectInspector oi = LazyFactory.createLazyStructInspector(fieldNames,
fieldTypeInfos, new byte[] {' ', ':', '='},
nullSequence, false, false, (byte)0);
- LazyHBaseRow o = new LazyHBaseRow((LazySimpleStructObjectInspector) oi);
+ LazyHBaseRow o = new LazyHBaseRow((LazySimpleStructObjectInspector) oi, columnMappings);
List<KeyValue> kvs = new ArrayList<KeyValue>();
@@ -497,7 +497,7 @@ public class TestLazyHBaseObject extends
Bytes.toBytes("cfb"), Bytes.toBytes("d"), Bytes.toBytes("hi")));
Result r = new Result(kvs);
- o.init(r, columnMappings);
+ o.init(r);
assertEquals(
("{'key':'test-row','a':123,'b':['a','b','c'],"
@@ -511,7 +511,7 @@ public class TestLazyHBaseObject extends
Bytes.toBytes("cfb"), Bytes.toBytes("c"), Bytes.toBytes("d=e:f=g")));
r = new Result(kvs);
- o.init(r, columnMappings);
+ o.init(r);
assertEquals(
("{'key':'test-row','a':123,'b':null,"
@@ -527,7 +527,7 @@ public class TestLazyHBaseObject extends
Bytes.toBytes("cfb"), Bytes.toBytes("d"), Bytes.toBytes("no")));
r = new Result(kvs);
- o.init(r, columnMappings);
+ o.init(r);
assertEquals(
("{'key':'test-row','a':null,'b':['a'],"
@@ -541,7 +541,7 @@ public class TestLazyHBaseObject extends
Bytes.toBytes("cfb"), Bytes.toBytes("d"), Bytes.toBytes("no")));
r = new Result(kvs);
- o.init(r, columnMappings);
+ o.init(r);
assertEquals(
("{'key':'test-row','a':null,'b':['','a','',''],"
@@ -565,7 +565,7 @@ public class TestLazyHBaseObject extends
Bytes.toBytes("cfb"), Bytes.toBytes("d"), Bytes.toBytes("")));
r = new Result(kvs);
- o.init(r, columnMappings);
+ o.init(r);
assertEquals(
"{'key':'test-row','a':123,'b':[],'c':{},'d':''}".replace("'", "\""),
@@ -609,7 +609,7 @@ public class TestLazyHBaseObject extends
fieldTypeInfos,
new byte[] {' ', ':', '='},
nullSequence, false, false, (byte) 0);
- LazyHBaseRow o = new LazyHBaseRow((LazySimpleStructObjectInspector) oi);
+ LazyHBaseRow o = new LazyHBaseRow((LazySimpleStructObjectInspector) oi, columnMappings);
List<KeyValue> kvs = new ArrayList<KeyValue>();
kvs.add(new KeyValue(Bytes.toBytes("test-row"),
@@ -624,7 +624,7 @@ public class TestLazyHBaseObject extends
Bytes.toBytes("cfc"), Bytes.toBytes("d"), Bytes.toBytes("hi")));
Result r = new Result(kvs);
- o.init(r, columnMappings);
+ o.init(r);
assertEquals(
("{'key':'test-row','a':123,'b':['a','b','c'],"
@@ -640,7 +640,7 @@ public class TestLazyHBaseObject extends
Bytes.toBytes("cfb"), Bytes.toBytes("f"), Bytes.toBytes("g")));
r = new Result(kvs);
- o.init(r, columnMappings);
+ o.init(r);
assertEquals(
("{'key':'test-row','a':123,'b':null,"
@@ -656,7 +656,7 @@ public class TestLazyHBaseObject extends
Bytes.toBytes("cfc"), Bytes.toBytes("d"), Bytes.toBytes("no")));
r = new Result(kvs);
- o.init(r, columnMappings);
+ o.init(r);
assertEquals(
("{'key':'test-row','a':null,'b':['a'],"
@@ -670,7 +670,7 @@ public class TestLazyHBaseObject extends
Bytes.toBytes("cfc"), Bytes.toBytes("d"), Bytes.toBytes("no")));
r = new Result(kvs);
- o.init(r, columnMappings);
+ o.init(r);
assertEquals(
("{'key':'test-row','a':null,'b':['','a','',''],"
@@ -686,7 +686,7 @@ public class TestLazyHBaseObject extends
Bytes.toBytes("cfc"), Bytes.toBytes("d"), Bytes.toBytes("")));
r = new Result(kvs);
- o.init(r, columnMappings);
+ o.init(r);
assertEquals(
"{'key':'test-row','a':123,'b':[],'c':{},'d':''}".replace("'", "\""),
@@ -733,7 +733,7 @@ public class TestLazyHBaseObject extends
LazyFactory.createLazyStructInspector(fieldNames, fieldTypeInfos,
new byte [] {' ', ':', '='}, nullSequence, false, false, (byte) 0);
- LazyHBaseRow o = new LazyHBaseRow((LazySimpleStructObjectInspector) oi);
+ LazyHBaseRow o = new LazyHBaseRow((LazySimpleStructObjectInspector) oi, columnMappings);
byte [] rowKey = "row-key".getBytes();
List<KeyValue> kvs = new ArrayList<KeyValue>();
@@ -785,7 +785,7 @@ public class TestLazyHBaseObject extends
Collections.sort(kvs, KeyValue.COMPARATOR);
Result result = new Result(kvs);
- o.init(result, columnMappings);
+ o.init(result);
List<? extends StructField> fieldRefs = ((StructObjectInspector) oi).getAllStructFieldRefs();
Modified: hive/branches/spark/hbase-handler/src/test/results/positive/hbase_pushdown.q.out
URL: http://svn.apache.org/viewvc/hive/branches/spark/hbase-handler/src/test/results/positive/hbase_pushdown.q.out?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hbase-handler/src/test/results/positive/hbase_pushdown.q.out (original)
+++ hive/branches/spark/hbase-handler/src/test/results/positive/hbase_pushdown.q.out Thu Oct 30 16:22:33 2014
@@ -94,7 +94,7 @@ STAGE PLANS:
predicate: (value like '%90%') (type: boolean)
Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
Select Operator
- expressions: key (type: int), value (type: string)
+ expressions: 90 (type: int), value (type: string)
outputColumnNames: _col0, _col1
Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
File Output Operator
@@ -142,7 +142,7 @@ STAGE PLANS:
predicate: (value like '%90%') (type: boolean)
Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
Select Operator
- expressions: key (type: int), value (type: string)
+ expressions: 90 (type: int), value (type: string)
outputColumnNames: _col0, _col1
Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
File Output Operator
@@ -194,7 +194,7 @@ STAGE PLANS:
predicate: ((value like '%90%') and (key = UDFToInteger(value))) (type: boolean)
Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
Select Operator
- expressions: key (type: int), value (type: string)
+ expressions: 90 (type: int), value (type: string)
outputColumnNames: _col0, _col1
Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
File Output Operator
Modified: hive/branches/spark/hbase-handler/src/test/results/positive/hbase_queries.q.out
URL: http://svn.apache.org/viewvc/hive/branches/spark/hbase-handler/src/test/results/positive/hbase_queries.q.out?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hbase-handler/src/test/results/positive/hbase_queries.q.out (original)
+++ hive/branches/spark/hbase-handler/src/test/results/positive/hbase_queries.q.out Thu Oct 30 16:22:33 2014
@@ -271,7 +271,7 @@ STAGE PLANS:
alias: hbase_table_2
Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
Filter Operator
- predicate: ((key < 120) and key is not null) (type: boolean)
+ predicate: (key < 120) (type: boolean)
Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
Select Operator
expressions: key (type: int), value (type: string)
@@ -287,7 +287,7 @@ STAGE PLANS:
alias: hbase_table_1
Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
Filter Operator
- predicate: ((100 < key) and key is not null) (type: boolean)
+ predicate: (100 < key) (type: boolean)
Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
Select Operator
expressions: key (type: int)
Modified: hive/branches/spark/hcatalog/core/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/hcatalog/core/pom.xml?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hcatalog/core/pom.xml (original)
+++ hive/branches/spark/hcatalog/core/pom.xml Thu Oct 30 16:22:33 2014
@@ -25,7 +25,7 @@
<parent>
<groupId>org.apache.hive.hcatalog</groupId>
<artifactId>hive-hcatalog</artifactId>
- <version>0.14.0-SNAPSHOT</version>
+ <version>0.15.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
Modified: hive/branches/spark/hcatalog/core/src/main/java/org/apache/hive/hcatalog/cli/SemanticAnalysis/CreateTableHook.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/hcatalog/core/src/main/java/org/apache/hive/hcatalog/cli/SemanticAnalysis/CreateTableHook.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hcatalog/core/src/main/java/org/apache/hive/hcatalog/cli/SemanticAnalysis/CreateTableHook.java (original)
+++ hive/branches/spark/hcatalog/core/src/main/java/org/apache/hive/hcatalog/cli/SemanticAnalysis/CreateTableHook.java Thu Oct 30 16:22:33 2014
@@ -26,7 +26,6 @@ import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.exec.DDLTask;
import org.apache.hadoop.hive.ql.exec.Task;
@@ -195,8 +194,7 @@ final class CreateTableHook extends HCat
//authorize against the table operation so that location permissions can be checked if any
- if (HiveConf.getBoolVar(context.getConf(),
- HiveConf.ConfVars.HIVE_AUTHORIZATION_ENABLED)) {
+ if (HCatAuthUtil.isAuthorizationEnabled(context.getConf())) {
authorize(table, Privilege.CREATE);
}
} catch (HiveException ex) {
Modified: hive/branches/spark/hcatalog/core/src/main/java/org/apache/hive/hcatalog/cli/SemanticAnalysis/HCatSemanticAnalyzerBase.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/hcatalog/core/src/main/java/org/apache/hive/hcatalog/cli/SemanticAnalysis/HCatSemanticAnalyzerBase.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hcatalog/core/src/main/java/org/apache/hive/hcatalog/cli/SemanticAnalysis/HCatSemanticAnalyzerBase.java (original)
+++ hive/branches/spark/hcatalog/core/src/main/java/org/apache/hive/hcatalog/cli/SemanticAnalysis/HCatSemanticAnalyzerBase.java Thu Oct 30 16:22:33 2014
@@ -22,7 +22,6 @@ package org.apache.hive.hcatalog.cli.Sem
import java.io.Serializable;
import java.util.List;
-import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.metadata.AuthorizationException;
@@ -89,8 +88,7 @@ public class HCatSemanticAnalyzerBase ex
protected void authorizeDDL(HiveSemanticAnalyzerHookContext context,
List<Task<? extends Serializable>> rootTasks) throws SemanticException {
- if (!HiveConf.getBoolVar(context.getConf(),
- HiveConf.ConfVars.HIVE_AUTHORIZATION_ENABLED)) {
+ if (!HCatAuthUtil.isAuthorizationEnabled(context.getConf())) {
return;
}
Modified: hive/branches/spark/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java (original)
+++ hive/branches/spark/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java Thu Oct 30 16:22:33 2014
@@ -33,6 +33,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
+import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@@ -120,25 +121,11 @@ public class HCatUtil {
}
public static String encodeBytes(byte[] bytes) {
- StringBuilder strBuf = new StringBuilder();
-
- for (int i = 0; i < bytes.length; i++) {
- strBuf.append((char) (((bytes[i] >> 4) & 0xF) + ('a')));
- strBuf.append((char) (((bytes[i]) & 0xF) + ('a')));
- }
-
- return strBuf.toString();
+ return new String(Base64.encodeBase64(bytes, false, false));
}
public static byte[] decodeBytes(String str) {
- byte[] bytes = new byte[str.length() / 2];
- for (int i = 0; i < str.length(); i += 2) {
- char c = str.charAt(i);
- bytes[i / 2] = (byte) ((c - 'a') << 4);
- c = str.charAt(i + 1);
- bytes[i / 2] += (c - 'a');
- }
- return bytes;
+ return Base64.decodeBase64(str.getBytes());
}
public static List<HCatFieldSchema> getHCatFieldSchemaList(
Modified: hive/branches/spark/hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/schema/TestHCatSchemaUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/schema/TestHCatSchemaUtils.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/schema/TestHCatSchemaUtils.java (original)
+++ hive/branches/spark/hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/schema/TestHCatSchemaUtils.java Thu Oct 30 16:22:33 2014
@@ -35,10 +35,10 @@ public class TestHCatSchemaUtils extends
public void testSimpleOperation() throws Exception {
String typeString = "struct<name:string,studentid:int,"
- + "contact:struct<phno:string,email:string>,"
+ + "contact:struct<phNo:string,email:string>,"
+ "currently_registered_courses:array<string>,"
+ "current_grades:map<string,string>,"
- + "phnos:array<struct<phno:string,type:string>>,blah:array<int>>";
+ + "phNos:array<struct<phNo:string,type:string>>,blah:array<int>>";
TypeInfo ti = TypeInfoUtils.getTypeInfoFromTypeString(typeString);
@@ -46,8 +46,9 @@ public class TestHCatSchemaUtils extends
LOG.info("Type name : {}", ti.getTypeName());
LOG.info("HCatSchema : {}", hsch);
assertEquals(hsch.size(), 1);
- assertEquals(ti.getTypeName(), hsch.get(0).getTypeString());
- assertEquals(hsch.get(0).getTypeString(), typeString);
+ // Looks like HCatFieldSchema.getTypeString() lower-cases its results
+ assertEquals(ti.getTypeName().toLowerCase(), hsch.get(0).getTypeString());
+ assertEquals(hsch.get(0).getTypeString(), typeString.toLowerCase());
}
@SuppressWarnings("unused")
Modified: hive/branches/spark/hcatalog/hcatalog-pig-adapter/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/hcatalog/hcatalog-pig-adapter/pom.xml?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hcatalog/hcatalog-pig-adapter/pom.xml (original)
+++ hive/branches/spark/hcatalog/hcatalog-pig-adapter/pom.xml Thu Oct 30 16:22:33 2014
@@ -25,7 +25,7 @@
<parent>
<groupId>org.apache.hive.hcatalog</groupId>
<artifactId>hive-hcatalog</artifactId>
- <version>0.14.0-SNAPSHOT</version>
+ <version>0.15.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
Modified: hive/branches/spark/hcatalog/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/hcatalog/pom.xml?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hcatalog/pom.xml (original)
+++ hive/branches/spark/hcatalog/pom.xml Thu Oct 30 16:22:33 2014
@@ -24,7 +24,7 @@
<parent>
<groupId>org.apache.hive</groupId>
<artifactId>hive</artifactId>
- <version>0.14.0-SNAPSHOT</version>
+ <version>0.15.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
@@ -46,6 +46,15 @@
<module>streaming</module>
</modules>
+ <dependencies>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <version>${mockito-all.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
<profiles>
<profile>
<id>hadoop-1</id>
Modified: hive/branches/spark/hcatalog/server-extensions/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/hcatalog/server-extensions/pom.xml?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hcatalog/server-extensions/pom.xml (original)
+++ hive/branches/spark/hcatalog/server-extensions/pom.xml Thu Oct 30 16:22:33 2014
@@ -25,7 +25,7 @@
<parent>
<groupId>org.apache.hive.hcatalog</groupId>
<artifactId>hive-hcatalog</artifactId>
- <version>0.14.0-SNAPSHOT</version>
+ <version>0.15.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
Modified: hive/branches/spark/hcatalog/src/test/e2e/templeton/README.txt
URL: http://svn.apache.org/viewvc/hive/branches/spark/hcatalog/src/test/e2e/templeton/README.txt?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hcatalog/src/test/e2e/templeton/README.txt (original)
+++ hive/branches/spark/hcatalog/src/test/e2e/templeton/README.txt Thu Oct 30 16:22:33 2014
@@ -101,7 +101,7 @@ For Hadoop 2.x you would need to upload
Also see https://cwiki.apache.org/confluence/display/Hive/WebHCat+InstallWebHCat#WebHCatInstallWebHCat-HadoopDistributedCache
for notes on additional JAR files to copy to HDFS.
-5. Make sure TEMPLETON_HOME evnironment variable is set
+5. Make sure TEMPLETON_HOME environment variable is set
6. hadoop/conf/core-site.xml should have items described in
https://cwiki.apache.org/confluence/display/Hive/WebHCat+InstallWebHCat#WebHCatInstallWebHCat-Permissions
@@ -124,6 +124,9 @@ You may also need to adjust the followin
<final>true</final>
</property>
+8.Sqoop test require JDBC jar to be placed on HDFS for whichever DB the test is configured with,
+for example mysql-connector-java-5.1.30-bin.jar.
+
****
**** See deployers/ for scripts that automate a lot of the set up.
****
Modified: hive/branches/spark/hcatalog/src/test/e2e/templeton/build.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/hcatalog/src/test/e2e/templeton/build.xml?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hcatalog/src/test/e2e/templeton/build.xml (original)
+++ hive/branches/spark/hcatalog/src/test/e2e/templeton/build.xml Thu Oct 30 16:22:33 2014
@@ -111,6 +111,7 @@
<env key="USER_NAME" value="${test.user.name}"/>
<env key="HARNESS_CONF" value="${basedir}/conf/default.conf"/>
<env key="SECURE_MODE" value="${secure.mode}"/>
+ <env key="USER_REALM" value="${user.realm}"/>
<env key="HADOOP_VERSION" value="${hadoopversion}"/>
<arg value="./test_harness.pl"/>
<arg line="${tests.to.run}"/>
@@ -148,6 +149,7 @@
<env key="OTHER_USER_NAME" value="${test.other.user.name}"/>
<env key="HARNESS_CONF" value="${basedir}/conf/default.conf"/>
<env key="SECURE_MODE" value="${secure.mode}"/>
+ <env key="USER_REALM" value="${user.realm}"/>
<env key="KEYTAB_DIR" value="${keytab.dir}"/>
<arg value="./test_harness.pl"/>
<arg line="${tests.to.run}"/>
@@ -186,7 +188,9 @@
<env key="USER_NAME" value="${test.user.name}"/>
<env key="DOAS_USER" value="${doas.user}"/>
<env key="HARNESS_CONF" value="${basedir}/conf/default.conf"/>
+ <env key="USER_REALM" value="${user.realm}"/>
<env key="SECURE_MODE" value="${secure.mode}"/>
+ <env key="USER_REALM" value="${user.realm}"/>
<arg value="./test_harness.pl"/>
<arg line="${tests.to.run}"/>
<arg value="${basedir}/tests/doas.conf"/>
Modified: hive/branches/spark/hcatalog/src/test/e2e/templeton/deployers/clean_file_system.sh
URL: http://svn.apache.org/viewvc/hive/branches/spark/hcatalog/src/test/e2e/templeton/deployers/clean_file_system.sh?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hcatalog/src/test/e2e/templeton/deployers/clean_file_system.sh (original)
+++ hive/branches/spark/hcatalog/src/test/e2e/templeton/deployers/clean_file_system.sh Thu Oct 30 16:22:33 2014
@@ -28,6 +28,6 @@
echo "Deleting artifacts from HDFS..."
${HADOOP_HOME}/bin/hdfs dfs -rm -r /user/hive/ /user/${USER}/ /user/templeton /apps /tmp /sqoopoutputdir
-${HADOOP_HOME}/bin/hdfs dfs -mkdir -p /tmp/hadoop-${USER} /user/hive/warehouse /user/${USER}/ /user/templeton /apps/templeton /tmp/hadoop-yarn /tmp/templeton_test_out
+${HADOOP_HOME}/bin/hdfs dfs -mkdir -p /tmp/hadoop-${USER} /user/hive/warehouse /user/${USER}/ /user/templeton /apps/templeton/jdbc /tmp/hadoop-yarn /tmp/templeton_test_out
${HADOOP_HOME}/bin/hdfs dfs -chmod -R a+rwx /user /tmp/
${HADOOP_HOME}/bin/hdfs dfs -chmod g+rwx /user/hive/warehouse
Modified: hive/branches/spark/hcatalog/src/test/e2e/templeton/deployers/config/webhcat/webhcat-site.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/hcatalog/src/test/e2e/templeton/deployers/config/webhcat/webhcat-site.xml?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hcatalog/src/test/e2e/templeton/deployers/config/webhcat/webhcat-site.xml (original)
+++ hive/branches/spark/hcatalog/src/test/e2e/templeton/deployers/config/webhcat/webhcat-site.xml Thu Oct 30 16:22:33 2014
@@ -78,10 +78,22 @@
HCat, Hive query, etc.</description>
</property>
<property>
+ <name>templeton.sqoop.archive</name>
+ <value>hdfs:///apps/templeton/sqoop-1.4.5.bin__hadoop-2.0.4-alpha.tar.gz</value>
+ <description>This should point to Sqoop tar that will be shipped to target node executing
+ the actual sqoop command. If not set, Sqoop is expected to be installed on every node of the
+ cluster.</description>
+ </property>
+ <property>
<name>templeton.sqoop.path</name>
- <value>${env.SQOOP_HOME}/bin/sqoop</value>
+ <value>sqoop-1.4.5.bin__hadoop-2.0.4-alpha.tar.gz/sqoop-1.4.5.bin__hadoop-2.0.4-alpha/bin/sqoop</value>
<description>The path to the Sqoop executable.</description>
</property>
+ <property>
+ <name>templeton.sqoop.home</name>
+ <value>sqoop-1.4.5.bin__hadoop-2.0.4-alpha.tar.gz/sqoop-1.4.5.bin__hadoop-2.0.4-alpha</value>
+ <description>The path to the Sqoop home in the exploded archive.</description>
+ </property>
<property>
<name>templeton.controller.mr.child.opts</name>
Modified: hive/branches/spark/hcatalog/src/test/e2e/templeton/deployers/deploy_e2e_artifacts.sh
URL: http://svn.apache.org/viewvc/hive/branches/spark/hcatalog/src/test/e2e/templeton/deployers/deploy_e2e_artifacts.sh?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hcatalog/src/test/e2e/templeton/deployers/deploy_e2e_artifacts.sh (original)
+++ hive/branches/spark/hcatalog/src/test/e2e/templeton/deployers/deploy_e2e_artifacts.sh Thu Oct 30 16:22:33 2014
@@ -35,7 +35,6 @@ ${HADOOP_HOME}/bin/hdfs dfs -put ${HADOO
${HADOOP_HOME}/bin/hdfs dfs -put ${HADOOP_HOME}/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-${HADOOP_VERSION}.jar webhcate2e/hclient.jar
${HADOOP_HOME}/bin/hdfs dfs -put ${HADOOP_HOME}/share/hadoop/tools/lib/hadoop-streaming-${HADOOP_VERSION}.jar /user/templeton/hadoop-streaming.jar
-
#must match config/webhcat/webhcat-stie.xml
${HADOOP_HOME}/bin/hdfs dfs -put ${PROJ_HOME}/packaging/target/apache-hive-${HIVE_VERSION}-bin.tar.gz /apps/templeton/apache-hive-${HIVE_VERSION}-bin.tar.gz
# To run against Hadoop2 cluster, you have to build Pig tar yourself with
@@ -47,7 +46,7 @@ ${HADOOP_HOME}/bin/hadoop fs -put ${PIG_
#${HADOOP_HOME}/bin/hadoop fs -put /Users/ekoifman/dev/data/jarsForTmplte2e/pig-0.12.0/contrib/piggybank/java/piggybank.jar webhcate2e/
-${HADOOP_HOME}/bin/hadoop fs -put ${HIVE_HOME}/lib/zookeeper-3.4.5.jar /apps/templeton/zookeeper-3.4.5.jar
-
+${HADOOP_HOME}/bin/hadoop fs -put /Users/ekoifman/dev/sqoop-1.4.5.bin__hadoop-2.0.4-alpha.tar.gz /apps/templeton/sqoop-1.4.5.bin__hadoop-2.0.4-alpha.tar.gz
+${HADOOP_HOME}/bin/hadoop fs -put /Users/ekoifman/dev/mysql-connector-java-5.1.30/mysql-connector-java-5.1.30-bin.jar /apps/templeton/jdbc/mysql-connector-java.jar
#check what got deployed
-${HADOOP_HOME}/bin/hdfs dfs -ls /apps/templeton webhcate2e /user/templeton /user/hive/warehouse
+${HADOOP_HOME}/bin/hdfs dfs -ls -R /apps/templeton webhcate2e /user/templeton /user/hive/warehouse
Modified: hive/branches/spark/hcatalog/src/test/e2e/templeton/deployers/env.sh
URL: http://svn.apache.org/viewvc/hive/branches/spark/hcatalog/src/test/e2e/templeton/deployers/env.sh?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hcatalog/src/test/e2e/templeton/deployers/env.sh (original)
+++ hive/branches/spark/hcatalog/src/test/e2e/templeton/deployers/env.sh Thu Oct 30 16:22:33 2014
@@ -23,14 +23,13 @@
# define necessary env vars here and source it in other files
export HADOOP_VERSION=2.4.1-SNAPSHOT
-export HIVE_VERSION=0.14.0-SNAPSHOT
+#export HIVE_VERSION=0.14.0-SNAPSHOT
export PIG_VERSION=0.12.2-SNAPSHOT
#Root of project source tree
export PROJ_HOME=/Users/${USER}/dev/hive
export HIVE_HOME=${PROJ_HOME}/packaging/target/apache-hive-${HIVE_VERSION}-bin/apache-hive-${HIVE_VERSION}-bin
export HADOOP_HOME=/Users/${USER}/dev/hwxhadoop/hadoop-dist/target/hadoop-${HADOOP_VERSION}
-export SQOOP_HOME=/Users/${USER}/dev/sqoop-1.4.4.bin__hadoop-2.0.4-alpha
#Make sure Pig is built for the Hadoop version you are running
export PIG_TAR_PATH=/Users/${USER}/dev/pig-${PIG_VERSION}-src/build
Modified: hive/branches/spark/hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm
URL: http://svn.apache.org/viewvc/hive/branches/spark/hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm (original)
+++ hive/branches/spark/hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm Thu Oct 30 16:22:33 2014
@@ -193,6 +193,7 @@ sub globalSetup
$globalHash->{'db_password'} = $ENV{'DB_PASSWORD'};
$globalHash->{'is_secure_mode'} = $ENV{'SECURE_MODE'};
+ $globalHash->{'user_realm'} = $ENV{'USER_REALM'};
# add libexec location to the path
if (defined($ENV{'PATH'})) {
@@ -491,7 +492,14 @@ sub execCurlCmd(){
} elsif(scalar @files > 1){
die "More than one keytab file found for user $user_name in $keytab_dir";
}
- my @cmd = ('kinit', '-k', '-t', $files[0], $user_name);
+ my @cmd = ();
+ if (defined $testCmd->{'user_realm'}){
+ my $user_name_with_realm_name = $user_name.'@'.$testCmd->{'user_realm'};
+ @cmd = ('kinit', '-k', '-t', $files[0], $user_name_with_realm_name);
+ }
+ else{
+ @cmd = ('kinit', '-k', '-t', $files[0], $user_name);
+ }
print $log "Command @cmd";
IPC::Run::run(\@cmd, \undef, $log, $log) or
die "Could not kinit as $user_name using " . $files[0] . " $ERRNO";