You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kh...@apache.org on 2014/02/05 22:02:58 UTC
svn commit: r1564924 [2/2] - in /hive/trunk: ./
hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/
hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/
hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/schema/
hcatalog/core/src/...
Modified: hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatBaseStorer.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatBaseStorer.java?rev=1564924&r1=1564923&r2=1564924&view=diff
==============================================================================
--- hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatBaseStorer.java (original)
+++ hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatBaseStorer.java Wed Feb 5 21:02:57 2014
@@ -20,14 +20,27 @@
package org.apache.hive.hcatalog.pig;
import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Properties;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.type.HiveChar;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.common.type.HiveVarchar;
+import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordWriter;
@@ -52,6 +65,7 @@ import org.apache.pig.impl.logicalLayer.
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.util.Utils;
+import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,7 +89,22 @@ abstract class HCatBaseStorer extends St
private RecordWriter<WritableComparable<?>, HCatRecord> writer;
protected HCatSchema computedSchema;
protected static final String PIG_SCHEMA = "hcat.pig.store.schema";
+ /**
+ * Controls what happens when incoming Pig value is out-of-range for target Hive column
+ */
+ static final String ON_OOR_VALUE_OPT = "onOutOfRangeValue";
+ /**
+ * prop name in Configuration/context
+ */
+ static final String ON_OORA_VALUE_PROP = "hcat.pig.store.onoutofrangevalue";
+ /**
+ * valid values for ON_OOR_VALUE_OPT
+ */
+ public static enum OOR_VALUE_OPT_VALUES {Null, Throw}
protected String sign;
+ //it's key that this is a per HCatStorer instance object
+ private final DataLossLogger dataLossLogger = new DataLossLogger();
+ private final OOR_VALUE_OPT_VALUES onOutOfRange;
public HCatBaseStorer(String partSpecs, String schema) throws Exception {
@@ -95,12 +124,15 @@ abstract class HCatBaseStorer extends St
}
}
- if (schema != null) {
+ if (schema != null && !schema.trim().isEmpty()) {
pigSchema = Utils.getSchemaFromString(schema);
}
-
+ Properties udfProps = UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{sign});
+ onOutOfRange = OOR_VALUE_OPT_VALUES.valueOf(udfProps.getProperty(ON_OORA_VALUE_PROP, getDefaultValue().name()));
+ }
+ static OOR_VALUE_OPT_VALUES getDefaultValue() {
+ return OOR_VALUE_OPT_VALUES.Null;
}
-
@Override
public void checkSchema(ResourceSchema resourceSchema) throws IOException {
@@ -123,17 +155,26 @@ abstract class HCatBaseStorer extends St
* schema of the table in metastore.
*/
protected HCatSchema convertPigSchemaToHCatSchema(Schema pigSchema, HCatSchema tableSchema) throws FrontendException {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("convertPigSchemaToHCatSchema(pigSchema,tblSchema)=(" + pigSchema + "," + tableSchema + ")");
+ }
List<HCatFieldSchema> fieldSchemas = new ArrayList<HCatFieldSchema>(pigSchema.size());
for (FieldSchema fSchema : pigSchema.getFields()) {
try {
HCatFieldSchema hcatFieldSchema = getColFromSchema(fSchema.alias, tableSchema);
-
- fieldSchemas.add(getHCatFSFromPigFS(fSchema, hcatFieldSchema));
+ //if writing to a partitioned table, then pigSchema will have more columns than tableSchema
+ //partition columns are not part of tableSchema... e.g. TestHCatStorer#testPartColsInData()
+// HCatUtil.assertNotNull(hcatFieldSchema, "Nothing matching '" + fSchema.alias + "' found " +
+// "in target table schema", LOG);
+ fieldSchemas.add(getHCatFSFromPigFS(fSchema, hcatFieldSchema, pigSchema, tableSchema));
} catch (HCatException he) {
throw new FrontendException(he.getMessage(), PigHCatUtil.PIG_EXCEPTION_CODE, he);
}
}
- return new HCatSchema(fieldSchemas);
+
+ HCatSchema s = new HCatSchema(fieldSchemas);
+ LOG.debug("convertPigSchemaToHCatSchema(computed)=(" + s + ")");
+ return s;
}
public static boolean removeTupleFromBag(HCatFieldSchema hcatFieldSchema, FieldSchema bagFieldSchema) throws HCatException {
@@ -147,42 +188,60 @@ abstract class HCatBaseStorer extends St
}
return false;
}
-
-
- private HCatFieldSchema getHCatFSFromPigFS(FieldSchema fSchema, HCatFieldSchema hcatFieldSchema) throws FrontendException, HCatException {
+ /**
+ * Here we are processing HCat table schema as derived from metastore,
+ * thus it should have information about all fields/sub-fields, but not for partition columns
+ */
+ private HCatFieldSchema getHCatFSFromPigFS(FieldSchema fSchema, HCatFieldSchema hcatFieldSchema,
+ Schema pigSchema, HCatSchema tableSchema)
+ throws FrontendException, HCatException {
+ if(hcatFieldSchema == null) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("hcatFieldSchema is null for fSchema '" + fSchema.alias + "'");
+ //throw new IllegalArgumentException("hcatFiledSchema is null; fSchema=" + fSchema + " " +
+ // "(pigSchema, tableSchema)=(" + pigSchema + "," + tableSchema + ")");
+ }
+ }
byte type = fSchema.type;
switch (type) {
case DataType.CHARARRAY:
case DataType.BIGCHARARRAY:
- return new HCatFieldSchema(fSchema.alias, Type.STRING, null);
-
+ if(hcatFieldSchema != null && hcatFieldSchema.getTypeInfo() != null) {
+ return new HCatFieldSchema(fSchema.alias, hcatFieldSchema.getTypeInfo(), null);
+ }
+ return new HCatFieldSchema(fSchema.alias, TypeInfoFactory.stringTypeInfo, null);
case DataType.INTEGER:
if (hcatFieldSchema != null) {
if (!SUPPORTED_INTEGER_CONVERSIONS.contains(hcatFieldSchema.getType())) {
throw new FrontendException("Unsupported type: " + type + " in Pig's schema",
PigHCatUtil.PIG_EXCEPTION_CODE);
}
- return new HCatFieldSchema(fSchema.alias, hcatFieldSchema.getType(), null);
- } else {
- return new HCatFieldSchema(fSchema.alias, Type.INT, null);
+ return new HCatFieldSchema(fSchema.alias, hcatFieldSchema.getTypeInfo(), null);
}
-
+ return new HCatFieldSchema(fSchema.alias, TypeInfoFactory.intTypeInfo, null);
case DataType.LONG:
- return new HCatFieldSchema(fSchema.alias, Type.BIGINT, null);
-
+ return new HCatFieldSchema(fSchema.alias, TypeInfoFactory.longTypeInfo, null);
case DataType.FLOAT:
- return new HCatFieldSchema(fSchema.alias, Type.FLOAT, null);
-
+ return new HCatFieldSchema(fSchema.alias, TypeInfoFactory.floatTypeInfo, null);
case DataType.DOUBLE:
- return new HCatFieldSchema(fSchema.alias, Type.DOUBLE, null);
-
+ return new HCatFieldSchema(fSchema.alias, TypeInfoFactory.doubleTypeInfo, null);
case DataType.BYTEARRAY:
- return new HCatFieldSchema(fSchema.alias, Type.BINARY, null);
-
+ return new HCatFieldSchema(fSchema.alias, TypeInfoFactory.binaryTypeInfo, null);
case DataType.BOOLEAN:
- return new HCatFieldSchema(fSchema.alias, Type.BOOLEAN, null);
-
+ return new HCatFieldSchema(fSchema.alias, TypeInfoFactory.booleanTypeInfo, null);
+ case DataType.DATETIME:
+ //Pig DATETIME can map to DATE or TIMESTAMP (see HCatBaseStorer#validateSchema()) which
+ //is controlled by Hive target table information
+ if(hcatFieldSchema != null && hcatFieldSchema.getTypeInfo() != null) {
+ return new HCatFieldSchema(fSchema.alias, hcatFieldSchema.getTypeInfo(), null);
+ }
+ return new HCatFieldSchema(fSchema.alias, TypeInfoFactory.timestampTypeInfo, null);
+ case DataType.BIGDECIMAL:
+ if(hcatFieldSchema != null && hcatFieldSchema.getTypeInfo() != null) {
+ return new HCatFieldSchema(fSchema.alias, hcatFieldSchema.getTypeInfo(), null);
+ }
+ return new HCatFieldSchema(fSchema.alias, TypeInfoFactory.decimalTypeInfo, null);
case DataType.BAG:
Schema bagSchema = fSchema.schema;
List<HCatFieldSchema> arrFields = new ArrayList<HCatFieldSchema>(1);
@@ -193,21 +252,18 @@ abstract class HCatBaseStorer extends St
} else {
field = bagSchema.getField(0);
}
- arrFields.add(getHCatFSFromPigFS(field, hcatFieldSchema == null ? null : hcatFieldSchema.getArrayElementSchema().get(0)));
+ arrFields.add(getHCatFSFromPigFS(field, hcatFieldSchema == null ? null : hcatFieldSchema
+ .getArrayElementSchema().get(0), pigSchema, tableSchema));
return new HCatFieldSchema(fSchema.alias, Type.ARRAY, new HCatSchema(arrFields), "");
-
case DataType.TUPLE:
- List<String> fieldNames = new ArrayList<String>();
List<HCatFieldSchema> hcatFSs = new ArrayList<HCatFieldSchema>();
HCatSchema structSubSchema = hcatFieldSchema == null ? null : hcatFieldSchema.getStructSubSchema();
List<FieldSchema> fields = fSchema.schema.getFields();
for (int i = 0; i < fields.size(); i++) {
FieldSchema fieldSchema = fields.get(i);
- fieldNames.add(fieldSchema.alias);
- hcatFSs.add(getHCatFSFromPigFS(fieldSchema, structSubSchema == null ? null : structSubSchema.get(i)));
+ hcatFSs.add(getHCatFSFromPigFS(fieldSchema, structSubSchema == null ? null : structSubSchema.get(i), pigSchema, tableSchema));
}
return new HCatFieldSchema(fSchema.alias, Type.STRUCT, new HCatSchema(hcatFSs), "");
-
case DataType.MAP: {
// Pig's schema contain no type information about map's keys and
// values. So, if its a new column assume <string,string> if its existing
@@ -217,15 +273,18 @@ abstract class HCatBaseStorer extends St
List<HCatFieldSchema> valFSList = new ArrayList<HCatFieldSchema>(1);
if (hcatFieldSchema != null) {
- return new HCatFieldSchema(fSchema.alias, Type.MAP, Type.STRING, hcatFieldSchema.getMapValueSchema(), "");
+ return HCatFieldSchema.createMapTypeFieldSchema(fSchema.alias, hcatFieldSchema.getMapKeyTypeInfo(),
+ hcatFieldSchema.getMapValueSchema(), "");
}
// Column not found in target table. Its a new column. Its schema is map<string,string>
- valFS = new HCatFieldSchema(fSchema.alias, Type.STRING, "");
+ valFS = new HCatFieldSchema(fSchema.alias, TypeInfoFactory.stringTypeInfo, "");
valFSList.add(valFS);
- return new HCatFieldSchema(fSchema.alias, Type.MAP, Type.STRING, new HCatSchema(valFSList), "");
+ return HCatFieldSchema.createMapTypeFieldSchema(fSchema.alias,
+ TypeInfoFactory.stringTypeInfo, new HCatSchema(valFSList), "");
}
-
+ case DataType.BIGINTEGER:
+ //fall through; doesn't map to Hive/Hcat type; here for completeness
default:
throw new FrontendException("Unsupported type: " + type + " in Pig's schema", PigHCatUtil.PIG_EXCEPTION_CODE);
}
@@ -253,24 +312,22 @@ abstract class HCatBaseStorer extends St
}
}
+ /**
+ * Convert from Pig value object to Hive value object
+ * This method assumes that {@link #validateSchema(org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema, org.apache.hive.hcatalog.data.schema.HCatFieldSchema, org.apache.pig.impl.logicalLayer.schema.Schema, org.apache.hive.hcatalog.data.schema.HCatSchema, int)}
+ * which checks the types in Pig schema are compatible with target Hive table, has been called.
+ */
private Object getJavaObj(Object pigObj, HCatFieldSchema hcatFS) throws HCatException, BackendException {
try {
-
+ if(pigObj == null) return null;
// The real work-horse. Spend time and energy in this method if there is
// need to keep HCatStorer lean and go fast.
Type type = hcatFS.getType();
switch (type) {
-
case BINARY:
- if (pigObj == null) {
- return null;
- }
return ((DataByteArray) pigObj).get();
case STRUCT:
- if (pigObj == null) {
- return null;
- }
HCatSchema structSubSchema = hcatFS.getStructSubSchema();
// Unwrap the tuple.
List<Object> all = ((Tuple) pigObj).getAll();
@@ -281,9 +338,6 @@ abstract class HCatBaseStorer extends St
return converted;
case ARRAY:
- if (pigObj == null) {
- return null;
- }
// Unwrap the bag.
DataBag pigBag = (DataBag) pigObj;
HCatFieldSchema tupFS = hcatFS.getArrayElementSchema().get(0);
@@ -298,9 +352,6 @@ abstract class HCatBaseStorer extends St
}
return bagContents;
case MAP:
- if (pigObj == null) {
- return null;
- }
Map<?, ?> pigMap = (Map<?, ?>) pigObj;
Map<Object, Object> typeMap = new HashMap<Object, Object>();
for (Entry<?, ?> entry : pigMap.entrySet()) {
@@ -318,29 +369,18 @@ abstract class HCatBaseStorer extends St
case DOUBLE:
return pigObj;
case SMALLINT:
- if (pigObj == null) {
- return null;
- }
if ((Integer) pigObj < Short.MIN_VALUE || (Integer) pigObj > Short.MAX_VALUE) {
- throw new BackendException("Value " + pigObj + " is outside the bounds of column " +
- hcatFS.getName() + " with type " + hcatFS.getType(), PigHCatUtil.PIG_EXCEPTION_CODE);
+ handleOutOfRangeValue(pigObj, hcatFS);
+ return null;
}
return ((Integer) pigObj).shortValue();
case TINYINT:
- if (pigObj == null) {
- return null;
- }
if ((Integer) pigObj < Byte.MIN_VALUE || (Integer) pigObj > Byte.MAX_VALUE) {
- throw new BackendException("Value " + pigObj + " is outside the bounds of column " +
- hcatFS.getName() + " with type " + hcatFS.getType(), PigHCatUtil.PIG_EXCEPTION_CODE);
+ handleOutOfRangeValue(pigObj, hcatFS);
+ return null;
}
return ((Integer) pigObj).byteValue();
case BOOLEAN:
- if (pigObj == null) {
- LOG.debug( "HCatBaseStorer.getJavaObj(BOOLEAN): obj null, bailing early" );
- return null;
- }
-
if( pigObj instanceof String ) {
if( ((String)pigObj).trim().compareTo("0") == 0 ) {
return Boolean.FALSE;
@@ -348,24 +388,86 @@ abstract class HCatBaseStorer extends St
if( ((String)pigObj).trim().compareTo("1") == 0 ) {
return Boolean.TRUE;
}
-
- throw new BackendException(
- "Unexpected type " + type + " for value " + pigObj
- + (pigObj == null ? "" : " of class "
- + pigObj.getClass().getName()), PigHCatUtil.PIG_EXCEPTION_CODE);
+ throw new BackendException("Unexpected type " + type + " for value " + pigObj
+ + " of class " + pigObj.getClass().getName(), PigHCatUtil.PIG_EXCEPTION_CODE);
}
-
return Boolean.parseBoolean( pigObj.toString() );
+ case DECIMAL:
+ BigDecimal bd = (BigDecimal)pigObj;
+ DecimalTypeInfo dti = (DecimalTypeInfo)hcatFS.getTypeInfo();
+ if(bd.precision() > dti.precision() || bd.scale() > dti.scale()) {
+ handleOutOfRangeValue(pigObj, hcatFS);
+ return null;
+ }
+ return HiveDecimal.create(bd);
+ case CHAR:
+ String charVal = (String)pigObj;
+ CharTypeInfo cti = (CharTypeInfo)hcatFS.getTypeInfo();
+ if(charVal.length() > cti.getLength()) {
+ handleOutOfRangeValue(pigObj, hcatFS);
+ return null;
+ }
+ return new HiveChar(charVal, cti.getLength());
+ case VARCHAR:
+ String varcharVal = (String)pigObj;
+ VarcharTypeInfo vti = (VarcharTypeInfo)hcatFS.getTypeInfo();
+ if(varcharVal.length() > vti.getLength()) {
+ handleOutOfRangeValue(pigObj, hcatFS);
+ return null;
+ }
+ return new HiveVarchar(varcharVal, vti.getLength());
+ case TIMESTAMP:
+ DateTime dt = (DateTime)pigObj;
+ return new Timestamp(dt.getMillis());//getMillis() returns UTC time regardless of TZ
+ case DATE:
+ /**
+ * We ignore any TZ setting on Pig value since java.sql.Date doesn't have it (in any
+ * meaningful way). So the assumption is that if Pig value has 0 time component (midnight)
+ * we assume it reasonably 'fits' into a Hive DATE. If time part is not 0, it's considered
+ * out of range for target type.
+ */
+ DateTime dateTime = ((DateTime)pigObj);
+ if(dateTime.getMillisOfDay() != 0) {
+ handleOutOfRangeValue(pigObj, hcatFS, "Time component must be 0 (midnight) in local timezone; Local TZ val='" + pigObj + "'");
+ return null;
+ }
+ /*java.sql.Date is a poorly defined API. Some (all?) SerDes call toString() on it
+ [e.g. LazySimpleSerDe, uses LazyUtils.writePrimitiveUTF8()], which automatically adjusts
+ for local timezone. Date.valueOf() also uses local timezone (as does Date(int,int,int).
+ Also see PigHCatUtil#extractPigObject() for corresponding read op. This way a DATETIME from Pig,
+ when stored into Hive and read back comes back with the same value.*/
+ return new Date(dateTime.getYear() - 1900, dateTime.getMonthOfYear() - 1, dateTime.getDayOfMonth());
default:
- throw new BackendException("Unexpected type " + type + " for value " + pigObj
- + (pigObj == null ? "" : " of class "
- + pigObj.getClass().getName()), PigHCatUtil.PIG_EXCEPTION_CODE);
+ throw new BackendException("Unexpected HCat type " + type + " for value " + pigObj
+ + " of class " + pigObj.getClass().getName(), PigHCatUtil.PIG_EXCEPTION_CODE);
}
} catch (BackendException e) {
// provide the path to the field in the error message
throw new BackendException(
- (hcatFS.getName() == null ? " " : hcatFS.getName() + ".") + e.getMessage(),
- e.getCause() == null ? e : e.getCause());
+ (hcatFS.getName() == null ? " " : hcatFS.getName() + ".") + e.getMessage(), e);
+ }
+ }
+
+ private void handleOutOfRangeValue(Object pigObj, HCatFieldSchema hcatFS) throws BackendException {
+ handleOutOfRangeValue(pigObj, hcatFS, null);
+ }
+ /**
+ * depending on user config, throws an exception or logs a msg if the incoming Pig value is
+ * out-of-range for target type.
+ * @param additionalMsg may be {@code null}
+ */
+ private void handleOutOfRangeValue(Object pigObj, HCatFieldSchema hcatFS, String additionalMsg) throws BackendException {
+ String msg = "Pig value '" + pigObj + "' is outside the bounds of column " + hcatFS.getName() +
+ " with type " + (hcatFS.getTypeInfo() == null ? hcatFS.getType() : hcatFS.getTypeInfo().getTypeName()) +
+ (additionalMsg == null ? "" : "[" + additionalMsg + "]");
+ switch (onOutOfRange) {
+ case Throw:
+ throw new BackendException(msg, PigHCatUtil.PIG_EXCEPTION_CODE);
+ case Null:
+ dataLossLogger.logDataLossMsg(hcatFS, pigObj, msg);
+ break;
+ default:
+ throw new BackendException("Unexpected " + ON_OOR_VALUE_OPT + " value: '" + onOutOfRange + "'");
}
}
@@ -387,10 +489,10 @@ abstract class HCatBaseStorer extends St
// Iterate through all the elements in Pig Schema and do validations as
// dictated by semantics, consult HCatSchema of table when need be.
-
+ int columnPos = 0;//helps with debug messages
for (FieldSchema pigField : pigSchema.getFields()) {
HCatFieldSchema hcatField = getColFromSchema(pigField.alias, tblSchema);
- validateSchema(pigField, hcatField);
+ validateSchema(pigField, hcatField, pigSchema, tblSchema, columnPos++);
}
try {
@@ -400,8 +502,14 @@ abstract class HCatBaseStorer extends St
}
}
-
- private void validateSchema(FieldSchema pigField, HCatFieldSchema hcatField)
+ /**
+ * This method encodes which Pig type can map (be stored in) to which HCat type.
+ * @throws HCatException
+ * @throws FrontendException
+ */
+ private void validateSchema(FieldSchema pigField, HCatFieldSchema hcatField,
+ Schema topLevelPigSchema, HCatSchema topLevelHCatSchema,
+ int columnPos)
throws HCatException, FrontendException {
validateAlias(pigField.alias);
byte type = pigField.type;
@@ -420,14 +528,16 @@ abstract class HCatBaseStorer extends St
case DataType.BAG:
HCatSchema arrayElementSchema = hcatField == null ? null : hcatField.getArrayElementSchema();
for (FieldSchema innerField : pigField.schema.getField(0).schema.getFields()) {
- validateSchema(innerField, getColFromSchema(pigField.alias, arrayElementSchema));
+ validateSchema(innerField, getColFromSchema(pigField.alias, arrayElementSchema),
+ topLevelPigSchema, topLevelHCatSchema, columnPos);
}
break;
case DataType.TUPLE:
HCatSchema structSubSchema = hcatField == null ? null : hcatField.getStructSubSchema();
for (FieldSchema innerField : pigField.schema.getFields()) {
- validateSchema(innerField, getColFromSchema(pigField.alias, structSubSchema));
+ validateSchema(innerField, getColFromSchema(pigField.alias, structSubSchema),
+ topLevelPigSchema, topLevelHCatSchema, columnPos);
}
break;
@@ -435,6 +545,66 @@ abstract class HCatBaseStorer extends St
throw new FrontendException("Internal Error.", PigHCatUtil.PIG_EXCEPTION_CODE);
}
}
+ else if(hcatField != null) {
+ //there is no point trying to validate further if we have no type info about target field
+ switch (type) {
+ case DataType.BIGDECIMAL:
+ throwTypeMismatchException(type, Lists.newArrayList(Type.DECIMAL), hcatField, columnPos);
+ break;
+ case DataType.DATETIME:
+ throwTypeMismatchException(type, Lists.newArrayList(Type.TIMESTAMP, Type.DATE), hcatField, columnPos);
+ break;
+ case DataType.BYTEARRAY:
+ throwTypeMismatchException(type, Lists.newArrayList(Type.BINARY), hcatField, columnPos);
+ break;
+ case DataType.BIGINTEGER:
+ throwTypeMismatchException(type, Collections.<Type>emptyList(), hcatField, columnPos);
+ break;
+ case DataType.BOOLEAN:
+ throwTypeMismatchException(type, Lists.newArrayList(Type.BOOLEAN), hcatField, columnPos);
+ break;
+ case DataType.CHARARRAY:
+ throwTypeMismatchException(type, Lists.newArrayList(Type.STRING, Type.CHAR, Type.VARCHAR),
+ hcatField, columnPos);
+ break;
+ case DataType.DOUBLE:
+ throwTypeMismatchException(type, Lists.newArrayList(Type.DOUBLE), hcatField, columnPos);
+ break;
+ case DataType.FLOAT:
+ throwTypeMismatchException(type, Lists.newArrayList(Type.FLOAT), hcatField, columnPos);
+ break;
+ case DataType.INTEGER:
+ throwTypeMismatchException(type, Lists.newArrayList(Type.INT, Type.BIGINT,
+ Type.TINYINT, Type.SMALLINT), hcatField, columnPos);
+ break;
+ case DataType.LONG:
+ throwTypeMismatchException(type, Lists.newArrayList(Type.BIGINT), hcatField, columnPos);
+ break;
+ default:
+ throw new FrontendException("'" + type +
+ "' Pig datatype in column " + columnPos + "(0-based) is not supported by HCat",
+ PigHCatUtil.PIG_EXCEPTION_CODE);
+ }
+ }
+ else {
+ if(false) {
+ //see HIVE-6194
+ throw new FrontendException("(pigSch,hcatSchema)=(" + pigField + "," +
+ "" + hcatField + ") (topPig, topHcat)=(" + topLevelPigSchema + "," +
+ "" + topLevelHCatSchema + ")");
+ }
+ }
+ }
+ private static void throwTypeMismatchException(byte pigDataType,
+ List<Type> hcatRequiredType, HCatFieldSchema hcatActualField,
+ int columnPos) throws FrontendException {
+ if(!hcatRequiredType.contains(hcatActualField.getType())) {
+ throw new FrontendException(
+ "Pig '" + DataType.findTypeName(pigDataType) + "' type in column " +
+ columnPos + "(0-based) cannot map to HCat '" +
+ hcatActualField.getType() + "'type. Target filed must be of HCat type {" +
+ StringUtils.join(hcatRequiredType, " or ") + "}");
+ }
}
private void validateAlias(String alias) throws FrontendException {
@@ -467,4 +637,23 @@ abstract class HCatBaseStorer extends St
@Override
public void storeStatistics(ResourceStatistics stats, String arg1, Job job) throws IOException {
}
+
+ /**
+ * todo: when job is complete, should print the msgCount table to log
+ */
+ private static final class DataLossLogger {
+ private static final Map<String, Integer> msgCount = new HashMap<String, Integer>();
+ private static String getColumnTypeKey(HCatFieldSchema fieldSchema) {
+ return fieldSchema.getName() + "_" + (fieldSchema.getTypeInfo() == null ?
+ fieldSchema.getType() : fieldSchema.getTypeInfo());
+ }
+ private void logDataLossMsg(HCatFieldSchema fieldSchema, Object pigOjb, String msg) {
+ String key = getColumnTypeKey(fieldSchema);
+ if(!msgCount.containsKey(key)) {
+ msgCount.put(key, 0);
+ LOG.warn(msg + " " + "Will write NULL instead. Only 1 such message per type/column is emitted.");
+ }
+ msgCount.put(key, msgCount.get(key) + 1);
+ }
+ }
}
Modified: hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatLoader.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatLoader.java?rev=1564924&r1=1564923&r2=1564924&view=diff
==============================================================================
--- hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatLoader.java (original)
+++ hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatLoader.java Wed Feb 5 21:02:57 2014
@@ -47,6 +47,8 @@ import org.apache.pig.PigException;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceStatistics;
import org.apache.pig.impl.util.UDFContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Pig {@link org.apache.pig.LoadFunc} to read data from HCat
@@ -54,6 +56,7 @@ import org.apache.pig.impl.util.UDFConte
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class HCatLoader extends HCatBaseLoader {
+ private static final Logger LOG = LoggerFactory.getLogger(HCatLoader.class);
private static final String PARTITION_FILTER = "partition.filter"; // for future use
@@ -171,6 +174,9 @@ public class HCatLoader extends HCatBase
}
}
}
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("outputSchema=" + outputSchema);
+ }
}
Modified: hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatStorer.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatStorer.java?rev=1564924&r1=1564923&r2=1564924&view=diff
==============================================================================
--- hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatStorer.java (original)
+++ hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatStorer.java Wed Feb 5 21:02:57 2014
@@ -26,6 +26,12 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
import org.apache.hadoop.hive.common.classification.InterfaceStability;
@@ -45,6 +51,8 @@ import org.apache.pig.impl.logicalLayer.
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.UDFContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* HCatStorer.
@@ -53,6 +61,7 @@ import org.apache.pig.impl.util.UDFConte
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class HCatStorer extends HCatBaseStorer {
+ private static final Logger LOG = LoggerFactory.getLogger(HCatStorer.class);
// Signature for wrapped storer, see comments in LoadFuncBasedInputDriver.initialize
final public static String INNER_SIGNATURE = "hcatstorer.inner.signature";
@@ -60,18 +69,50 @@ public class HCatStorer extends HCatBase
// A hash map which stores job credentials. The key is a signature passed by Pig, which is
//unique to the store func and out file name (table, in our case).
private static Map<String, Credentials> jobCredentials = new HashMap<String, Credentials>();
-
-
- public HCatStorer(String partSpecs, String schema) throws Exception {
- super(partSpecs, schema);
+ private final static Options validOptions = new Options();
+ static {
+ try {
+ populateValidOptions();
+ }
+ catch(Throwable t) {
+ LOG.error("Failed to build option list: ", t);
+ throw new RuntimeException(t);
+ }
}
+ private final static CommandLineParser parser = new GnuParser();
+ /**
+ * @param optString may empty str (not null), in which case it's no-op
+ */
+ public HCatStorer(String partSpecs, String pigSchema, String optString) throws Exception {
+ super(partSpecs, pigSchema);
+ String[] optsArr = optString.split(" ");
+ CommandLine configuredOptions;
+ try {
+ configuredOptions = parser.parse(validOptions, optsArr);
+ } catch (ParseException e) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp( "[-" + ON_OOR_VALUE_OPT + "]", validOptions );
+ throw e;
+ }
+ Properties udfProps = UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{sign});
+ //'Throw' is the default for backwards compatibility
+ //downstream code expects it to be set to a valid value
+ udfProps.put(ON_OORA_VALUE_PROP, configuredOptions.getOptionValue(ON_OOR_VALUE_OPT, getDefaultValue().name()));
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("setting " + configuredOptions.getOptionValue(ON_OOR_VALUE_OPT));
+ }
+ isValidOOROption((String)udfProps.get(ON_OORA_VALUE_PROP));
+ }
+ public HCatStorer(String partSpecs, String pigSchema) throws Exception {
+ this(partSpecs, pigSchema, "");
+ }
public HCatStorer(String partSpecs) throws Exception {
- this(partSpecs, null);
+ this(partSpecs, null, "");
}
public HCatStorer() throws Exception {
- this(null, null);
+ this(null, null, "");
}
@Override
@@ -79,6 +120,33 @@ public class HCatStorer extends HCatBase
return new HCatOutputFormat();
}
+ /**
+ * makes a list of all options that HCatStorer understands
+ */
+ private static void populateValidOptions() {
+ validOptions.addOption(ON_OOR_VALUE_OPT, true,
+ "Controls how store operation handles Pig values which are out of range for the target column" +
+ "in Hive table. Default is to throw an exception.");
+ }
+ /**
+ * check that onOutOfRangeValue handling is configured properly
+ * @throws FrontendException
+ */
+ private static void isValidOOROption(String optVal) throws FrontendException {
+ boolean found = false;
+ for(OOR_VALUE_OPT_VALUES v : OOR_VALUE_OPT_VALUES.values()) {
+ if(v.name().equalsIgnoreCase(optVal)) {
+ found = true;
+ break;
+ }
+ }
+ if(!found) {
+ throw new FrontendException("Unexpected value for '" + ON_OOR_VALUE_OPT + "' found: " + optVal);
+ }
+ }
+ /**
+ * @param location databaseName.tableName
+ */
@Override
public void setStoreLocation(String location, Job job) throws IOException {
HCatContext.INSTANCE.setConf(job.getConfiguration()).getConf().get()
Modified: hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/PigHCatUtil.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/PigHCatUtil.java?rev=1564924&r1=1564923&r2=1564924&view=diff
==============================================================================
--- hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/PigHCatUtil.java (original)
+++ hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/PigHCatUtil.java Wed Feb 5 21:02:57 2014
@@ -20,8 +20,11 @@ package org.apache.hive.hcatalog.pig;
import java.io.IOException;
+import java.sql.Date;
+import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Calendar;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -29,6 +32,9 @@ import java.util.Map.Entry;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.type.HiveChar;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.common.type.HiveVarchar;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
@@ -56,6 +62,8 @@ import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.util.Utils;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -206,7 +214,7 @@ class PigHCatUtil {
rfSchemaList.add(rfSchema);
}
ResourceSchema rSchema = new ResourceSchema();
- rSchema.setFields(rfSchemaList.toArray(new ResourceFieldSchema[0]));
+ rSchema.setFields(rfSchemaList.toArray(new ResourceFieldSchema[rfSchemaList.size()]));
return rSchema;
}
@@ -266,7 +274,7 @@ class PigHCatUtil {
} else if (arrayElementFieldSchema.getType() == Type.ARRAY) {
ResourceSchema s = new ResourceSchema();
List<ResourceFieldSchema> lrfs = Arrays.asList(getResourceSchemaFromFieldSchema(arrayElementFieldSchema));
- s.setFields(lrfs.toArray(new ResourceFieldSchema[0]));
+ s.setFields(lrfs.toArray(new ResourceFieldSchema[lrfs.size()]));
bagSubFieldSchemas[0].setSchema(s);
} else {
ResourceFieldSchema[] innerTupleFieldSchemas = new ResourceFieldSchema[1];
@@ -276,8 +284,7 @@ class PigHCatUtil {
.setSchema(null); // the element type is not a tuple - so no subschema
bagSubFieldSchemas[0].setSchema(new ResourceSchema().setFields(innerTupleFieldSchemas));
}
- ResourceSchema s = new ResourceSchema().setFields(bagSubFieldSchemas);
- return s;
+ return new ResourceSchema().setFields(bagSubFieldSchemas);
}
@@ -288,7 +295,7 @@ class PigHCatUtil {
for (HCatFieldSchema subField : hfs.getStructSubSchema().getFields()) {
lrfs.add(getResourceSchemaFromFieldSchema(subField));
}
- s.setFields(lrfs.toArray(new ResourceFieldSchema[0]));
+ s.setFields(lrfs.toArray(new ResourceFieldSchema[lrfs.size()]));
return s;
}
@@ -300,9 +307,16 @@ class PigHCatUtil {
static public byte getPigType(HCatFieldSchema hfs) throws IOException {
return getPigType(hfs.getType());
}
-
+ /**
+ * Defines a mapping of HCatalog type to Pig type; not every mapping is exact,
+ * see {@link #extractPigObject(Object, org.apache.hive.hcatalog.data.schema.HCatFieldSchema)}
+ * See http://pig.apache.org/docs/r0.12.0/basic.html#data-types
+ * See {@link org.apache.hive.hcatalog.pig.HCatBaseStorer#validateSchema(org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema, org.apache.hive.hcatalog.data.schema.HCatFieldSchema, org.apache.pig.impl.logicalLayer.schema.Schema, org.apache.hive.hcatalog.data.schema.HCatSchema, int)}
+ * for Pig->Hive type mapping.
+ */
static public byte getPigType(Type type) throws IOException {
- if (type == Type.STRING) {
+ if (type == Type.STRING || type == Type.CHAR || type == Type.VARCHAR) {
+ //CHARARRAY is unbounded so Hive->Pig is lossless
return DataType.CHARARRAY;
}
@@ -341,6 +355,14 @@ class PigHCatUtil {
if (type == Type.BOOLEAN && pigHasBooleanSupport) {
return DataType.BOOLEAN;
}
+ if(type == Type.DECIMAL) {
+ //Hive is more restrictive, so Hive->Pig works
+ return DataType.BIGDECIMAL;
+ }
+ if(type == Type.DATE || type == Type.TIMESTAMP) {
+ //Hive Date is representable as Pig DATETIME
+ return DataType.DATETIME;
+ }
throw new PigException("HCatalog column type '" + type.toString()
+ "' is not supported in Pig as a column type", PIG_EXCEPTION_CODE);
@@ -353,22 +375,54 @@ class PigHCatUtil {
return transformToTuple(hr.getAll(), hs);
}
- @SuppressWarnings("unchecked")
+ /**
+ * Converts object from Hive's value system to Pig's value system
+ * see HCatBaseStorer#getJavaObj() for Pig->Hive conversion
+ * @param o object from Hive value system
+ * @return object in Pig value system
+ */
public static Object extractPigObject(Object o, HCatFieldSchema hfs) throws Exception {
+ if(o == null) {
+ return null;
+ }
Object result;
Type itemType = hfs.getType();
switch (itemType) {
case BINARY:
- result = (o == null) ? null : new DataByteArray((byte[]) o);
+ result = new DataByteArray((byte[]) o);
break;
case STRUCT:
- result = transformToTuple((List<Object>) o, hfs);
+ result = transformToTuple((List<?>) o, hfs);
break;
case ARRAY:
- result = transformToBag((List<? extends Object>) o, hfs);
+ result = transformToBag((List<?>) o, hfs);
break;
case MAP:
- result = transformToPigMap((Map<Object, Object>) o, hfs);
+ result = transformToPigMap((Map<?,?>) o, hfs);
+ break;
+ case DECIMAL:
+ result = ((HiveDecimal)o).bigDecimalValue();
+ break;
+ case CHAR:
+ result = ((HiveChar)o).getValue();
+ break;
+ case VARCHAR:
+ result = ((HiveVarchar)o).getValue();
+ break;
+ case DATE:
+ /*java.sql.Date is weird. It automatically adjusts it's millis value to be in the local TZ
+ * e.g. d = new java.sql.Date(System.currentMillis()).toString() so if you do this just after
+ * midnight in Palo Alto, you'll get yesterday's date printed out.*/
+ Date d = (Date)o;
+ result = new DateTime(d.getYear() + 1900, d.getMonth() + 1, d.getDate(), 0, 0);//uses local TZ
+ break;
+ case TIMESTAMP:
+ /*DATA TRUNCATION!!!
+ Timestamp may have nanos; we'll strip those away and create a Joda DateTime
+ object in local TZ; This is arbitrary, since Hive value doesn't have any TZ notion, but
+ we need to set something for TZ.
+ Timestamp is consistently in GMT (unless you call toString() on it) so we use millis*/
+ result = new DateTime(((Timestamp)o).getTime());//uses local TZ
break;
default:
result = o;
@@ -377,7 +431,7 @@ class PigHCatUtil {
return result;
}
- private static Tuple transformToTuple(List<? extends Object> objList, HCatFieldSchema hfs) throws Exception {
+ private static Tuple transformToTuple(List<?> objList, HCatFieldSchema hfs) throws Exception {
try {
return transformToTuple(objList, hfs.getStructSubSchema());
} catch (Exception e) {
@@ -389,7 +443,7 @@ class PigHCatUtil {
}
}
- private static Tuple transformToTuple(List<? extends Object> objList, HCatSchema hs) throws Exception {
+ private static Tuple transformToTuple(List<?> objList, HCatSchema hs) throws Exception {
if (objList == null) {
return null;
}
@@ -401,21 +455,20 @@ class PigHCatUtil {
return t;
}
- private static Map<String, Object> transformToPigMap(Map<Object, Object> map, HCatFieldSchema hfs) throws Exception {
+ private static Map<String, Object> transformToPigMap(Map<?, ?> map, HCatFieldSchema hfs) throws Exception {
if (map == null) {
return null;
}
Map<String, Object> result = new HashMap<String, Object>();
- for (Entry<Object, Object> entry : map.entrySet()) {
+ for (Entry<?, ?> entry : map.entrySet()) {
// since map key for Pig has to be Strings
result.put(entry.getKey().toString(), extractPigObject(entry.getValue(), hfs.getMapValueSchema().get(0)));
}
return result;
}
- @SuppressWarnings("unchecked")
- private static DataBag transformToBag(List<? extends Object> list, HCatFieldSchema hfs) throws Exception {
+ private static DataBag transformToBag(List<?> list, HCatFieldSchema hfs) throws Exception {
if (list == null) {
return null;
}
@@ -425,7 +478,7 @@ class PigHCatUtil {
for (Object o : list) {
Tuple tuple;
if (elementSubFieldSchema.getType() == Type.STRUCT) {
- tuple = transformToTuple((List<Object>) o, elementSubFieldSchema);
+ tuple = transformToTuple((List<?>) o, elementSubFieldSchema);
} else {
// bags always contain tuples
tuple = tupFac.newTuple(extractPigObject(o, elementSubFieldSchema));
Modified: hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java?rev=1564924&r1=1564923&r2=1564924&view=diff
==============================================================================
--- hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java (original)
+++ hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java Wed Feb 5 21:02:57 2014
@@ -18,14 +18,11 @@
*/
package org.apache.hive.hcatalog.pig;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.sql.Date;
+import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -40,12 +37,14 @@ import org.apache.hadoop.hive.cli.CliSes
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hive.hcatalog.HcatTestUtils;
import org.apache.hive.hcatalog.common.HCatUtil;
import org.apache.hive.hcatalog.common.HCatConstants;
import org.apache.hive.hcatalog.data.Pair;
+import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.ResourceStatistics;
@@ -53,11 +52,17 @@ import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+import org.joda.time.DateTime;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.*;
public class TestHCatLoader {
+ private static final Logger LOG = LoggerFactory.getLogger(TestHCatLoader.class);
private static final String TEST_DATA_DIR = HCatUtil.makePathASafeFileName(System.getProperty("java.io.tmpdir") +
File.separator + TestHCatLoader.class.getCanonicalName() + "-" + System.currentTimeMillis());
private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
@@ -78,26 +83,46 @@ public class TestHCatLoader {
}
private void dropTable(String tablename) throws IOException, CommandNeedRetryException {
+ dropTable(tablename, driver);
+ }
+ static void dropTable(String tablename, Driver driver) throws IOException, CommandNeedRetryException {
driver.run("drop table " + tablename);
}
private void createTable(String tablename, String schema, String partitionedBy) throws IOException, CommandNeedRetryException {
+ createTable(tablename, schema, partitionedBy, driver, storageFormat());
+ }
+ static void createTable(String tablename, String schema, String partitionedBy, Driver driver, String storageFormat)
+ throws IOException, CommandNeedRetryException {
String createTable;
createTable = "create table " + tablename + "(" + schema + ") ";
if ((partitionedBy != null) && (!partitionedBy.trim().isEmpty())) {
createTable = createTable + "partitioned by (" + partitionedBy + ") ";
}
- createTable = createTable + "stored as " +storageFormat();
- int retCode = driver.run(createTable).getResponseCode();
- if (retCode != 0) {
- throw new IOException("Failed to create table. [" + createTable + "], return code from hive driver : [" + retCode + "]");
- }
+ createTable = createTable + "stored as " +storageFormat;
+ executeStatementOnDriver(createTable, driver);
}
private void createTable(String tablename, String schema) throws IOException, CommandNeedRetryException {
createTable(tablename, schema, null);
}
-
+ /**
+ * Execute Hive CLI statement
+ * @param cmd arbitrary statement to execute
+ */
+ static void executeStatementOnDriver(String cmd, Driver driver) throws IOException, CommandNeedRetryException {
+ LOG.debug("Executing: " + cmd);
+ CommandProcessorResponse cpr = driver.run(cmd);
+ if(cpr.getResponseCode() != 0) {
+ throw new IOException("Failed to execute \"" + cmd + "\". Driver returned " + cpr.getResponseCode() + " Error: " + cpr.getErrorMessage());
+ }
+ }
+ private static void checkProjection(FieldSchema fs, String expectedName, byte expectedPigType) {
+ assertEquals(fs.alias, expectedName);
+ assertEquals("Expected " + DataType.findTypeName(expectedPigType) + "; got " +
+ DataType.findTypeName(fs.type), expectedPigType, fs.type);
+ }
+
@Before
public void setup() throws Exception {
@@ -127,6 +152,7 @@ public class TestHCatLoader {
createTable(PARTITIONED_TABLE, "a int, b string", "bkt string");
createTable(SPECIFIC_SIZE_TABLE, "a int, b string");
+ AllTypesTable.setupAllTypesTable(driver);
int LOOP_SIZE = 3;
String[] input = new String[LOOP_SIZE * LOOP_SIZE];
@@ -148,23 +174,23 @@ public class TestHCatLoader {
//"Edward Hyde\t1337\t(415-253-6367,anonymous@b44chan.org)\t{(CREATIVE_WRITING),(COPYRIGHT_LAW)},[CREATIVE_WRITING#A+,COPYRIGHT_LAW#D],{(415-253-6367,cell),(408-253-6367,landline)}",
}
);
-
PigServer server = new PigServer(ExecType.LOCAL);
server.setBatchOn();
- server.registerQuery("A = load '" + BASIC_FILE_NAME + "' as (a:int, b:chararray);");
+ int i = 0;
+ server.registerQuery("A = load '" + BASIC_FILE_NAME + "' as (a:int, b:chararray);", ++i);
- server.registerQuery("store A into '" + BASIC_TABLE + "' using org.apache.hive.hcatalog.pig.HCatStorer();");
- server.registerQuery("store A into '" + SPECIFIC_SIZE_TABLE + "' using org.apache.hive.hcatalog.pig.HCatStorer();");
- server.registerQuery("B = foreach A generate a,b;");
- server.registerQuery("B2 = filter B by a < 2;");
- server.registerQuery("store B2 into '" + PARTITIONED_TABLE + "' using org.apache.hive.hcatalog.pig.HCatStorer('bkt=0');");
-
- server.registerQuery("C = foreach A generate a,b;");
- server.registerQuery("C2 = filter C by a >= 2;");
- server.registerQuery("store C2 into '" + PARTITIONED_TABLE + "' using org.apache.hive.hcatalog.pig.HCatStorer('bkt=1');");
+ server.registerQuery("store A into '" + BASIC_TABLE + "' using org.apache.hive.hcatalog.pig.HCatStorer();", ++i);
+ server.registerQuery("store A into '" + SPECIFIC_SIZE_TABLE + "' using org.apache.hive.hcatalog.pig.HCatStorer();", ++i);
+ server.registerQuery("B = foreach A generate a,b;", ++i);
+ server.registerQuery("B2 = filter B by a < 2;", ++i);
+ server.registerQuery("store B2 into '" + PARTITIONED_TABLE + "' using org.apache.hive.hcatalog.pig.HCatStorer('bkt=0');", ++i);
+
+ server.registerQuery("C = foreach A generate a,b;", ++i);
+ server.registerQuery("C2 = filter C by a >= 2;", ++i);
+ server.registerQuery("store C2 into '" + PARTITIONED_TABLE + "' using org.apache.hive.hcatalog.pig.HCatStorer('bkt=1');", ++i);
- server.registerQuery("D = load '" + COMPLEX_FILE_NAME + "' as (name:chararray, studentid:int, contact:tuple(phno:chararray,email:chararray), currently_registered_courses:bag{innertup:tuple(course:chararray)}, current_grades:map[ ] , phnos :bag{innertup:tuple(phno:chararray,type:chararray)});");
- server.registerQuery("store D into '" + COMPLEX_TABLE + "' using org.apache.hive.hcatalog.pig.HCatStorer();");
+ server.registerQuery("D = load '" + COMPLEX_FILE_NAME + "' as (name:chararray, studentid:int, contact:tuple(phno:chararray,email:chararray), currently_registered_courses:bag{innertup:tuple(course:chararray)}, current_grades:map[ ] , phnos :bag{innertup:tuple(phno:chararray,type:chararray)});", ++i);
+ server.registerQuery("store D into '" + COMPLEX_TABLE + "' using org.apache.hive.hcatalog.pig.HCatStorer();", ++i);
server.executeBatch();
}
@@ -176,6 +202,7 @@ public class TestHCatLoader {
dropTable(COMPLEX_TABLE);
dropTable(PARTITIONED_TABLE);
dropTable(SPECIFIC_SIZE_TABLE);
+ dropTable(AllTypesTable.ALL_PRIMITIVE_TYPES_TABLE);
} finally {
FileUtils.deleteDirectory(new File(TEST_DATA_DIR));
}
@@ -197,6 +224,20 @@ public class TestHCatLoader {
assertTrue(Xfields.get(1).type == DataType.CHARARRAY);
}
+ /**
+ * Test that we properly translate data types in Hive/HCat table schema into Pig schema
+ */
+ @Test
+ public void testSchemaLoadPrimitiveTypes() throws IOException {
+ AllTypesTable.testSchemaLoadPrimitiveTypes();
+ }
+ /**
+ * Test that value from Hive table are read properly in Pig
+ */
+ @Test
+ public void testReadDataPrimitiveTypes() throws Exception {
+ AllTypesTable.testReadDataPrimitiveTypes();
+ }
@Test
public void testReadDataBasic() throws IOException {
@@ -450,4 +491,114 @@ public class TestHCatLoader {
assertEquals(0, t.get(1));
assertFalse(iterator.hasNext());
}
+
+ /**
+ * basic tests that cover each scalar type
+ * https://issues.apache.org/jira/browse/HIVE-5814
+ */
+ private static final class AllTypesTable {
+ private static final String ALL_TYPES_FILE_NAME = TEST_DATA_DIR + "/alltypes.input.data";
+ private static final String ALL_PRIMITIVE_TYPES_TABLE = "junit_unparted_alltypes";
+ private static final String ALL_TYPES_SCHEMA = "( c_boolean boolean, " + //0
+ "c_tinyint tinyint, " + //1
+ "c_smallint smallint, " + //2
+ "c_int int, " + //3
+ "c_bigint bigint, " + //4
+ "c_float float, " + //5
+ "c_double double, " + //6
+ "c_decimal decimal(5,2), " +//7
+ "c_string string, " + //8
+ "c_char char(10), " + //9
+ "c_varchar varchar(20), " + //10
+ "c_binary binary, " + //11
+ "c_date date, " + //12
+ "c_timestamp timestamp)"; //13
+ /**
+ * raw data for #ALL_PRIMITIVE_TYPES_TABLE
+ * All the values are within range of target data type (column)
+ */
+ private static final Object[][] primitiveRows = new Object[][] {
+ {Boolean.TRUE,Byte.MAX_VALUE,Short.MAX_VALUE, Integer.MAX_VALUE,Long.MAX_VALUE,Float.MAX_VALUE,Double.MAX_VALUE,555.22,"Kyiv","char(10)xx","varchar(20)","blah".getBytes(),Date.valueOf("2014-01-13"),Timestamp.valueOf("2014-01-13 19:26:25.0123")},
+ {Boolean.FALSE,Byte.MIN_VALUE,Short.MIN_VALUE, Integer.MIN_VALUE,Long.MIN_VALUE,Float.MIN_VALUE,Double.MIN_VALUE,-555.22,"Saint Petersburg","char(xx)00","varchar(yy)","doh".getBytes(),Date.valueOf("2014-01-14"), Timestamp.valueOf("2014-01-14 19:26:25.0123")}
+ };
+ /**
+ * Test that we properly translate data types in Hive/HCat table schema into Pig schema
+ */
+ private static void testSchemaLoadPrimitiveTypes() throws IOException {
+ PigServer server = new PigServer(ExecType.LOCAL);
+ server.registerQuery("X = load '" + ALL_PRIMITIVE_TYPES_TABLE + "' using " + HCatLoader.class.getName() + "();");
+ Schema dumpedXSchema = server.dumpSchema("X");
+ List<FieldSchema> Xfields = dumpedXSchema.getFields();
+ assertEquals("Expected " + HCatFieldSchema.Type.numPrimitiveTypes() + " fields, found " +
+ Xfields.size(), HCatFieldSchema.Type.numPrimitiveTypes(), Xfields.size());
+ checkProjection(Xfields.get(0), "c_boolean", DataType.BOOLEAN);
+ checkProjection(Xfields.get(1), "c_tinyint", DataType.INTEGER);
+ checkProjection(Xfields.get(2), "c_smallint", DataType.INTEGER);
+ checkProjection(Xfields.get(3), "c_int", DataType.INTEGER);
+ checkProjection(Xfields.get(4), "c_bigint", DataType.LONG);
+ checkProjection(Xfields.get(5), "c_float", DataType.FLOAT);
+ checkProjection(Xfields.get(6), "c_double", DataType.DOUBLE);
+ checkProjection(Xfields.get(7), "c_decimal", DataType.BIGDECIMAL);
+ checkProjection(Xfields.get(8), "c_string", DataType.CHARARRAY);
+ checkProjection(Xfields.get(9), "c_char", DataType.CHARARRAY);
+ checkProjection(Xfields.get(10), "c_varchar", DataType.CHARARRAY);
+ checkProjection(Xfields.get(11), "c_binary", DataType.BYTEARRAY);
+ checkProjection(Xfields.get(12), "c_date", DataType.DATETIME);
+ checkProjection(Xfields.get(13), "c_timestamp", DataType.DATETIME);
+ }
+ /**
+ * Test that value from Hive table are read properly in Pig
+ */
+ private static void testReadDataPrimitiveTypes() throws Exception {
+ PigServer server = new PigServer(ExecType.LOCAL);
+ server.registerQuery("X = load '" + ALL_PRIMITIVE_TYPES_TABLE + "' using " + HCatLoader.class.getName() + "();");
+ Iterator<Tuple> XIter = server.openIterator("X");
+ int numTuplesRead = 0;
+ while (XIter.hasNext()) {
+ Tuple t = XIter.next();
+ assertEquals(HCatFieldSchema.Type.numPrimitiveTypes(), t.size());
+ int colPos = 0;
+ for(Object referenceData : primitiveRows[numTuplesRead]) {
+ if(referenceData == null) {
+ assertTrue("rowNum=" + numTuplesRead + " colNum=" + colPos + " Reference data is null; actual " +
+ t.get(colPos), t.get(colPos) == null);
+ }
+ else if(referenceData instanceof java.util.Date) {
+ assertTrue("rowNum=" + numTuplesRead + " colNum=" + colPos + " Reference data=" + ((java.util.Date)referenceData).getTime() + " actual=" +
+ ((DateTime)t.get(colPos)).getMillis() + "; types=(" + referenceData.getClass() + "," + t.get(colPos).getClass() + ")",
+ ((java.util.Date)referenceData).getTime()== ((DateTime)t.get(colPos)).getMillis());
+ //note that here we ignore nanos part of Hive Timestamp since nanos are dropped when reading Hive from Pig by design
+ }
+ else {
+ assertTrue("rowNum=" + numTuplesRead + " colNum=" + colPos + " Reference data=" + referenceData + " actual=" +
+ t.get(colPos) + "; types=(" + referenceData.getClass() + "," + t.get(colPos).getClass() + ")",
+ referenceData.toString().equals(t.get(colPos).toString()));
+ //doing String comps here as value objects in Hive in Pig are different so equals() doesn't work
+ }
+ colPos++;
+ }
+ numTuplesRead++;
+ }
+ assertTrue("Expected " + primitiveRows.length + "; found " + numTuplesRead, numTuplesRead == primitiveRows.length);
+ }
+ private static void setupAllTypesTable(Driver driver) throws Exception {
+ String[] primitiveData = new String[primitiveRows.length];
+ for(int i = 0; i < primitiveRows.length; i++) {
+ Object[] rowData = primitiveRows[i];
+ StringBuilder row = new StringBuilder();
+ for(Object cell : rowData) {
+ row.append(row.length() == 0 ? "" : "\t").append(cell == null ? null : cell);
+ }
+ primitiveData[i] = row.toString();
+ }
+ HcatTestUtils.createTestDataFile(ALL_TYPES_FILE_NAME, primitiveData);
+ String cmd = "create table " + ALL_PRIMITIVE_TYPES_TABLE + ALL_TYPES_SCHEMA +
+ "ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'" +
+ " STORED AS TEXTFILE";
+ executeStatementOnDriver(cmd, driver);
+ cmd = "load data local inpath '" + HCatUtil.makePathASafeFileName(ALL_TYPES_FILE_NAME) +
+ "' into table " + ALL_PRIMITIVE_TYPES_TABLE;
+ executeStatementOnDriver(cmd, driver);
+ }
+ }
}
Modified: hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderStorer.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderStorer.java?rev=1564924&r1=1564923&r2=1564924&view=diff
==============================================================================
--- hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderStorer.java (original)
+++ hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderStorer.java Wed Feb 5 21:02:57 2014
@@ -134,7 +134,7 @@ public class TestHCatLoaderStorer extend
server.registerQuery("data = load '" + data +
"' using PigStorage('\t') as (my_small_int:int, my_tiny_int:int);");
server.registerQuery(
- "store data into 'test_tbl' using org.apache.hive.hcatalog.pig.HCatStorer();");
+ "store data into 'test_tbl' using org.apache.hive.hcatalog.pig.HCatStorer('','','-onOutOfRangeValue Throw');");
List<ExecJob> jobs = server.executeBatch();
Assert.assertEquals(expectedStatus, jobs.get(0).getStatus());
}
Modified: hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorer.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorer.java?rev=1564924&r1=1564923&r2=1564924&view=diff
==============================================================================
--- hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorer.java (original)
+++ hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorer.java Wed Feb 5 21:02:57 2014
@@ -18,13 +18,19 @@
*/
package org.apache.hive.hcatalog.pig;
+import java.io.BufferedReader;
import java.io.File;
+import java.io.FileReader;
import java.io.IOException;
+import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hive.hcatalog.HcatTestUtils;
import org.apache.hive.hcatalog.mapreduce.HCatBaseTest;
import org.apache.pig.EvalFunc;
@@ -35,13 +41,333 @@ import org.apache.pig.data.DataByteArray
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.LogUtils;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
import org.junit.Assert;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class TestHCatStorer extends HCatBaseTest {
+ private static final Logger LOG = LoggerFactory.getLogger(TestHCatStorer.class);
private static final String INPUT_FILE_NAME = TEST_DATA_DIR + "/input.data";
+ //Start: tests that check values from Pig that are out of range for target column
+ @Test
+ public void testWriteTinyint() throws Exception {
+ pigValueRangeTest("junitTypeTest1", "tinyint", "int", null, Integer.toString(1), Integer.toString(1));
+ pigValueRangeTestOverflow("junitTypeTest1", "tinyint", "int", null, Integer.toString(300));
+ pigValueRangeTestOverflow("junitTypeTest2", "tinyint", "int", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Null,
+ Integer.toString(300));
+ pigValueRangeTestOverflow("junitTypeTest3", "tinyint", "int", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Throw,
+ Integer.toString(300));
+ }
+ @Test
+ public void testWriteSmallint() throws Exception {
+ pigValueRangeTest("junitTypeTest1", "smallint", "int", null, Integer.toString(Short.MIN_VALUE),
+ Integer.toString(Short.MIN_VALUE));
+ pigValueRangeTestOverflow("junitTypeTest2", "smallint", "int", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Null,
+ Integer.toString(Short.MAX_VALUE + 1));
+ pigValueRangeTestOverflow("junitTypeTest3", "smallint", "int", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Throw,
+ Integer.toString(Short.MAX_VALUE + 1));
+ }
+ @Test
+ public void testWriteChar() throws Exception {
+ pigValueRangeTest("junitTypeTest1", "char(5)", "chararray", null, "xxx", "xxx ");
+ pigValueRangeTestOverflow("junitTypeTest1", "char(5)", "chararray", null, "too_long");
+ pigValueRangeTestOverflow("junitTypeTest2", "char(5)", "chararray", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Null,
+ "too_long");
+ pigValueRangeTestOverflow("junitTypeTest3", "char(5)", "chararray", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Throw,
+ "too_long2");
+ }
+ @Test
+ public void testWriteVarchar() throws Exception {
+ pigValueRangeTest("junitTypeTest1", "varchar(5)", "chararray", null, "xxx", "xxx");
+ pigValueRangeTestOverflow("junitTypeTest1", "varchar(5)", "chararray", null, "too_long");
+ pigValueRangeTestOverflow("junitTypeTest2", "varchar(5)", "chararray", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Null,
+ "too_long");
+ pigValueRangeTestOverflow("junitTypeTest3", "varchar(5)", "chararray", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Throw,
+ "too_long2");
+ }
+ @Test
+ public void testWriteDecimalXY() throws Exception {
+ pigValueRangeTest("junitTypeTest1", "decimal(5,2)", "bigdecimal", null, BigDecimal.valueOf(1.2).toString(),
+ BigDecimal.valueOf(1.2).toString());
+ pigValueRangeTestOverflow("junitTypeTest1", "decimal(5,2)", "bigdecimal", null, BigDecimal.valueOf(12345.12).toString());
+ pigValueRangeTestOverflow("junitTypeTest2", "decimal(5,2)", "bigdecimal", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Null,
+ BigDecimal.valueOf(500.123).toString());
+ pigValueRangeTestOverflow("junitTypeTest3", "decimal(5,2)", "bigdecimal", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Throw,
+ BigDecimal.valueOf(500.123).toString());
+ }
+ @Test
+ public void testWriteDecimalX() throws Exception {
+ //interestingly decimal(2) means decimal(2,0)
+ pigValueRangeTest("junitTypeTest1", "decimal(2)", "bigdecimal", null, BigDecimal.valueOf(12).toString(),
+ BigDecimal.valueOf(12).toString());
+ pigValueRangeTestOverflow("junitTypeTest2", "decimal(2)", "bigdecimal", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Null,
+ BigDecimal.valueOf(50.123).toString());
+ pigValueRangeTestOverflow("junitTypeTest3", "decimal(2)", "bigdecimal", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Throw,
+ BigDecimal.valueOf(50.123).toString());
+ }
+ @Test
+ public void testWriteDecimal() throws Exception {
+ //decimal means decimal(10,0)
+ pigValueRangeTest("junitTypeTest1", "decimal", "bigdecimal", null, BigDecimal.valueOf(1234567890).toString(),
+ BigDecimal.valueOf(1234567890).toString());
+ pigValueRangeTestOverflow("junitTypeTest2", "decimal", "bigdecimal", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Null,
+ BigDecimal.valueOf(12345678900L).toString());
+ pigValueRangeTestOverflow("junitTypeTest3", "decimal", "bigdecimal", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Throw,
+ BigDecimal.valueOf(12345678900L).toString());
+ }
+ /**
+ * because we want to ignore TZ which is included in toString()
+ * include time to make sure it's 0
+ */
+ private static final String FORMAT_4_DATE = "yyyy-MM-dd HH:mm:ss";
+ @Test
+ public void testWriteDate() throws Exception {
+ DateTime d = new DateTime(1991,10,11,0,0);
+ pigValueRangeTest("junitTypeTest1", "date", "datetime", null, d.toString(),
+ d.toString(FORMAT_4_DATE), FORMAT_4_DATE);
+ pigValueRangeTestOverflow("junitTypeTest2", "date", "datetime", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Null,
+ d.plusHours(2).toString(), FORMAT_4_DATE);//time != 0
+ pigValueRangeTestOverflow("junitTypeTest3", "date", "datetime", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Throw,
+ d.plusMinutes(1).toString(), FORMAT_4_DATE);//time != 0
+ d = new DateTime(1991,10,11,0,0,DateTimeZone.forOffsetHours(-11));
+ pigValueRangeTest("junitTypeTest4", "date", "datetime", null, d.toString(),
+ d.toString(FORMAT_4_DATE), FORMAT_4_DATE);
+ pigValueRangeTestOverflow("junitTypeTest5", "date", "datetime", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Null,
+ d.plusHours(2).toString(), FORMAT_4_DATE);//date out of range due to time != 0
+ pigValueRangeTestOverflow("junitTypeTest6", "date", "datetime", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Throw,
+ d.plusMinutes(1).toString(), FORMAT_4_DATE);//date out of range due to time!=0
+ }
+ @Test
+ public void testWriteDate3() throws Exception {
+ DateTime d = new DateTime(1991,10,11,23,10,DateTimeZone.forOffsetHours(-11));
+ FrontendException fe = null;
+ //expect to fail since the time component is not 0
+ pigValueRangeTestOverflow("junitTypeTest4", "date", "datetime", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Throw,
+ d.toString(), FORMAT_4_DATE);
+ pigValueRangeTestOverflow("junitTypeTest5", "date", "datetime", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Null,
+ d.plusHours(2).toString(), FORMAT_4_DATE);
+ pigValueRangeTestOverflow("junitTypeTest6", "date", "datetime", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Throw,
+ d.plusMinutes(1).toString(), FORMAT_4_DATE);
+ }
+ @Test
+ public void testWriteDate2() throws Exception {
+ DateTime d = new DateTime(1991,11,12,0,0, DateTimeZone.forID("US/Eastern"));
+ pigValueRangeTest("junitTypeTest1", "date", "datetime", null, d.toString(),
+ d.toString(FORMAT_4_DATE), FORMAT_4_DATE);
+ pigValueRangeTestOverflow("junitTypeTest2", "date", "datetime", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Null,
+ d.plusHours(2).toString(), FORMAT_4_DATE);
+ pigValueRangeTestOverflow("junitTypeTest2", "date", "datetime", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Null,
+ d.plusMillis(20).toString(), FORMAT_4_DATE);
+ pigValueRangeTestOverflow("junitTypeTest2", "date", "datetime", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Throw,
+ d.plusMillis(12).toString(), FORMAT_4_DATE);
+ pigValueRangeTestOverflow("junitTypeTest3", "date", "datetime", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Throw,
+ d.plusMinutes(1).toString(), FORMAT_4_DATE);
+ }
+ /**
+ * Note that the value that comes back from Hive will have local TZ on it. Using local is
+ * arbitrary but DateTime needs TZ (or will assume default) and Hive does not have TZ.
+ * So if you start with Pig value in TZ=x and write to Hive, when you read it back the TZ may
+ * be different. The millis value should match, of course.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testWriteTimestamp() throws Exception {
+ DateTime d = new DateTime(1991,10,11,14,23,30, 10);//uses default TZ
+ pigValueRangeTest("junitTypeTest1", "timestamp", "datetime", null, d.toString(),
+ d.toDateTime(DateTimeZone.getDefault()).toString());
+ d = d.plusHours(2);
+ pigValueRangeTest("junitTypeTest2", "timestamp", "datetime", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Null,
+ d.toString(), d.toDateTime(DateTimeZone.getDefault()).toString());
+ d = d.toDateTime(DateTimeZone.UTC);
+ pigValueRangeTest("junitTypeTest3", "timestamp", "datetime", null, d.toString(),
+ d.toDateTime(DateTimeZone.getDefault()).toString());
+
+ d = new DateTime(1991,10,11,23,24,25, 26);
+ pigValueRangeTest("junitTypeTest1", "timestamp", "datetime", null, d.toString(),
+ d.toDateTime(DateTimeZone.getDefault()).toString());
+ d = d.toDateTime(DateTimeZone.UTC);
+ pigValueRangeTest("junitTypeTest3", "timestamp", "datetime", null, d.toString(),
+ d.toDateTime(DateTimeZone.getDefault()).toString());
+ }
+ //End: tests that check values from Pig that are out of range for target column
+
+
+ private void pigValueRangeTestOverflow(String tblName, String hiveType, String pigType,
+ HCatBaseStorer.OOR_VALUE_OPT_VALUES goal, String inputValue, String format) throws Exception {
+ pigValueRangeTest(tblName, hiveType, pigType, goal, inputValue, null, format);
+ }
+ private void pigValueRangeTestOverflow(String tblName, String hiveType, String pigType,
+ HCatBaseStorer.OOR_VALUE_OPT_VALUES goal, String inputValue) throws Exception {
+ pigValueRangeTest(tblName, hiveType, pigType, goal, inputValue, null, null);
+ }
+ private void pigValueRangeTest(String tblName, String hiveType, String pigType,
+ HCatBaseStorer.OOR_VALUE_OPT_VALUES goal, String inputValue,
+ String expectedValue) throws Exception {
+ pigValueRangeTest(tblName, hiveType, pigType, goal, inputValue, expectedValue, null);
+ }
+ /**
+ * This is used to test how Pig values of various data types which are out of range for Hive target
+ * column are handled. Currently the options are to raise an error or write NULL.
+ * 1. create a data file with 1 column, 1 row
+ * 2. load into pig
+ * 3. use pig to store into Hive table
+ * 4. read from Hive table using Pig
+ * 5. check that read value is what is expected
+ * @param tblName Hive table name to create
+ * @param hiveType datatype to use for the single column in table
+ * @param pigType corresponding Pig type when loading file into Pig
+ * @param goal how out-of-range values from Pig are handled by HCat, may be {@code null}
+ * @param inputValue written to file which is read by Pig, thus must be something Pig can read
+ * (e.g. DateTime.toString(), rather than java.sql.Date)
+ * @param expectedValue what Pig should see when reading Hive table
+ * @param format date format to use for comparison of values since default DateTime.toString()
+ * includes TZ which is meaningless for Hive DATE type
+ */
+ private void pigValueRangeTest(String tblName, String hiveType, String pigType,
+ HCatBaseStorer.OOR_VALUE_OPT_VALUES goal, String inputValue, String expectedValue, String format)
+ throws Exception {
+ TestHCatLoader.dropTable(tblName, driver);
+ final String field = "f1";
+ TestHCatLoader.createTable(tblName, field + " " + hiveType, null, driver, "RCFILE");
+ HcatTestUtils.createTestDataFile(INPUT_FILE_NAME, new String[] {inputValue});
+ LOG.debug("File=" + INPUT_FILE_NAME);
+ dumpFile(INPUT_FILE_NAME);
+ PigServer server = createPigServer(true);
+ int queryNumber = 1;
+ logAndRegister(server,
+ "A = load '" + INPUT_FILE_NAME + "' as (" + field + ":" + pigType + ");", queryNumber++);
+ Iterator<Tuple> firstLoad = server.openIterator("A");
+ if(goal == null) {
+ logAndRegister(server,
+ "store A into '" + tblName + "' using " + HCatStorer.class.getName() + "();", queryNumber++);
+ }
+ else {
+ FrontendException fe = null;
+ try {
+ logAndRegister(server,
+ "store A into '" + tblName + "' using " + HCatStorer.class.getName() + "('','','-" +
+ HCatStorer.ON_OOR_VALUE_OPT + " " + goal + "');",
+ queryNumber++);
+ }
+ catch(FrontendException e) {
+ fe = e;
+ }
+ switch (goal) {
+ case Null:
+ //do nothing, fall through and verify the data
+ break;
+ case Throw:
+ Assert.assertTrue("Expected a FrontendException", fe != null);
+ Assert.assertEquals("Expected a different FrontendException.", fe.getMessage(), "Unable to store alias A");
+ return;//this test is done
+ default:
+ Assert.assertFalse("Unexpected goal: " + goal, 1 == 1);
+ }
+ }
+ logAndRegister(server, "B = load '" + tblName + "' using " + HCatLoader.class.getName() + "();", queryNumber);
+ CommandProcessorResponse cpr = driver.run("select * from " + tblName);
+ LOG.debug("cpr.respCode=" + cpr.getResponseCode() + " cpr.errMsg=" + cpr.getErrorMessage() +
+ " for table " + tblName);
+ List l = new ArrayList();
+ driver.getResults(l);
+ LOG.debug("Dumping rows via SQL from " + tblName);
+ for(Object t : l) {
+ LOG.debug(t == null ? null : t.toString() + " t.class=" + t.getClass());
+ }
+ Iterator<Tuple> itr = server.openIterator("B");
+ int numRowsRead = 0;
+ while(itr.hasNext()) {
+ Tuple t = itr.next();
+ if("date".equals(hiveType)) {
+ DateTime dateTime = (DateTime)t.get(0);
+ Assert.assertTrue(format != null);
+ Assert.assertEquals("Comparing Pig to Raw data for table " + tblName, expectedValue, dateTime== null ? null : dateTime.toString(format));
+ }
+ else {
+ Assert.assertEquals("Comparing Pig to Raw data for table " + tblName, expectedValue, t.isNull(0) ? null : t.get(0).toString());
+ }
+ //see comment at "Dumping rows via SQL..." for why this doesn't work
+ //Assert.assertEquals("Comparing Pig to Hive", t.get(0), l.get(0));
+ numRowsRead++;
+ }
+ Assert.assertEquals("Expected " + 1 + " rows; got " + numRowsRead + " file=" + INPUT_FILE_NAME + "; table " +
+ tblName, 1, numRowsRead);
+ /* Misc notes:
+ Unfortunately Timestamp.toString() adjusts the value for local TZ and 't' is a String
+ thus the timestamp in 't' doesn't match rawData*/
+ }
+ /**
+ * Create a data file with datatypes added in 0.13. Read it with Pig and use
+ * Pig + HCatStorer to write to a Hive table. Then read it using Pig and Hive
+ * and make sure results match.
+ */
+ @Test
+ public void testDateCharTypes() throws Exception {
+ final String tblName = "junit_date_char";
+ TestHCatLoader.dropTable(tblName, driver);
+ TestHCatLoader.createTable(tblName,
+ "id int, char5 char(5), varchar10 varchar(10), dec52 decimal(5,2)", null, driver, "RCFILE");
+ int NUM_ROWS = 5;
+ String[] rows = new String[NUM_ROWS];
+ for(int i = 0; i < NUM_ROWS; i++) {
+ //since the file is read by Pig, we need to make sure the values are in format that Pig understands
+ //otherwise it will turn the value to NULL on read
+ rows[i] = i + "\txxxxx\tyyy\t" + 5.2;
+ }
+ HcatTestUtils.createTestDataFile(INPUT_FILE_NAME, rows);
+ LOG.debug("File=" + INPUT_FILE_NAME);
+// dumpFile(INPUT_FILE_NAME);
+ PigServer server = createPigServer(true);
+ int queryNumber = 1;
+ logAndRegister(server,
+ "A = load '" + INPUT_FILE_NAME + "' as (id:int, char5:chararray, varchar10:chararray, dec52:bigdecimal);",
+ queryNumber++);
+ logAndRegister(server,
+ "store A into '" + tblName + "' using " + HCatStorer.class.getName() + "();", queryNumber++);
+ logAndRegister(server, "B = load '" + tblName + "' using " + HCatLoader.class.getName() + "();",
+ queryNumber);
+ CommandProcessorResponse cpr = driver.run("select * from " + tblName);
+ LOG.debug("cpr.respCode=" + cpr.getResponseCode() + " cpr.errMsg=" + cpr.getErrorMessage());
+ List l = new ArrayList();
+ driver.getResults(l);
+ LOG.debug("Dumping rows via SQL from " + tblName);
+ /*Unfortunately Timestamp.toString() adjusts the value for local TZ and 't' is a String
+ * thus the timestamp in 't' doesn't match rawData*/
+ for(Object t : l) {
+ LOG.debug(t == null ? null : t.toString());
+ }
+ Iterator<Tuple> itr = server.openIterator("B");
+ int numRowsRead = 0;
+ while (itr.hasNext()) {
+ Tuple t = itr.next();
+ StringBuilder rowFromPig = new StringBuilder();
+ for(int i = 0; i < t.size(); i++) {
+ rowFromPig.append(t.get(i)).append("\t");
+ }
+ rowFromPig.setLength(rowFromPig.length() - 1);
+ Assert.assertEquals("Comparing Pig to Raw data", rowFromPig.toString(), rows[numRowsRead]);
+ //see comment at "Dumping rows via SQL..." for why this doesn't work (for all types)
+ //Assert.assertEquals("Comparing Pig to Hive", rowFromPig.toString(), l.get(numRowsRead));
+ numRowsRead++;
+ }
+ Assert.assertEquals("Expected " + NUM_ROWS + " rows; got " + numRowsRead + " file=" + INPUT_FILE_NAME, NUM_ROWS, numRowsRead);
+ }
+ private static void dumpFile(String fileName) throws Exception {
+ File file = new File(fileName);
+ BufferedReader reader = new BufferedReader(new FileReader(file));
+ String line = null;
+ LOG.debug("Dumping raw file: " + fileName);
+ while((line = reader.readLine()) != null) {
+ LOG.debug(line);
+ }
+ reader.close();
+ }
@Test
public void testPartColsInData() throws IOException, CommandNeedRetryException {
@@ -74,7 +400,7 @@ public class TestHCatStorer extends HCat
}
Assert.assertFalse(itr.hasNext());
- Assert.assertEquals(11, i);
+ Assert.assertEquals(LOOP_SIZE, i);
}
@Test
@@ -597,7 +923,7 @@ public class TestHCatStorer extends HCat
Assert.assertEquals(0, results.size());
driver.run("drop table employee");
}
-
+ @Test
public void testPartitionPublish()
throws IOException, CommandNeedRetryException {
@@ -619,9 +945,9 @@ public class TestHCatStorer extends HCat
server.registerQuery("A = load '" + INPUT_FILE_NAME
+ "' as (a:int, c:chararray);");
server.registerQuery("B = filter A by " + FailEvalFunc.class.getName()
- + "($0);");
+ + "($0);");
server.registerQuery("store B into 'ptn_fail' using "
- + HCatStorer.class.getName() + "('b=math');");
+ + HCatStorer.class.getName() + "('b=math');");
server.executeBatch();
String query = "show partitions ptn_fail";
@@ -639,7 +965,7 @@ public class TestHCatStorer extends HCat
// Make sure the partitions directory is not in hdfs.
Assert.assertTrue((new File(TEST_WAREHOUSE_DIR + "/ptn_fail")).exists());
Assert.assertFalse((new File(TEST_WAREHOUSE_DIR + "/ptn_fail/b=math"))
- .exists());
+ .exists());
}
static public class FailEvalFunc extends EvalFunc<Boolean> {
Modified: hive/trunk/itests/hcatalog-unit/pom.xml
URL: http://svn.apache.org/viewvc/hive/trunk/itests/hcatalog-unit/pom.xml?rev=1564924&r1=1564923&r2=1564924&view=diff
==============================================================================
--- hive/trunk/itests/hcatalog-unit/pom.xml (original)
+++ hive/trunk/itests/hcatalog-unit/pom.xml Wed Feb 5 21:02:57 2014
@@ -192,6 +192,14 @@
<version>${pig.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <!--this should be automatically brought in by Pig, it's not in Pig 0.12 due to a bug
+ in Pig which requires it This is fixed in Pig's pom file in ASF trunk (pig 13)-->
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ <version>2.2</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</profile>
<profile>
@@ -332,6 +340,14 @@
<classifier>h2</classifier>
<scope>test</scope>
</dependency>
+ <dependency>
+ <!--this should be automatically brought in by Pig, it's not in Pig 0.12 due to a bug
+ in Pig which requires it This is fixed in Pig's pom file in ASF trunk (pig 13)-->
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ <version>2.2</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</profile>
</profiles>
Modified: hive/trunk/pom.xml
URL: http://svn.apache.org/viewvc/hive/trunk/pom.xml?rev=1564924&r1=1564923&r2=1564924&view=diff
==============================================================================
--- hive/trunk/pom.xml (original)
+++ hive/trunk/pom.xml Wed Feb 5 21:02:57 2014
@@ -127,7 +127,7 @@
requires netty < 3.6.0 we force hadoops version
-->
<netty.version>3.4.0.Final</netty.version>
- <pig.version>0.10.1</pig.version>
+ <pig.version>0.12.0</pig.version>
<protobuf.version>2.5.0</protobuf.version>
<stax.version>1.0.1</stax.version>
<slf4j.version>1.7.5</slf4j.version>