You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/07/11 02:04:59 UTC
[27/32] hive git commit: HIVE-18545: Add UDF to parse complex types
from json (Zoltan Haindrich reviewed by Ashutosh Chauhan)
http://git-wip-us.apache.org/repos/asf/hive/blob/1105ef39/serde/src/java/org/apache/hadoop/hive/serde2/JsonSerDe.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/JsonSerDe.java b/serde/src/java/org/apache/hadoop/hive/serde2/JsonSerDe.java
index 2bb4a0f..f1c8477 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/JsonSerDe.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/JsonSerDe.java
@@ -20,30 +20,21 @@ package org.apache.hadoop.hive.serde2;
import java.io.ByteArrayInputStream;
import java.io.IOException;
-import java.nio.charset.CharacterCodingException;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.type.Date;
-import org.apache.hadoop.hive.common.type.HiveChar;
-import org.apache.hadoop.hive.common.type.HiveDecimal;
-import org.apache.hadoop.hive.common.type.HiveVarchar;
import org.apache.hadoop.hive.common.type.Timestamp;
-import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.json.HiveJsonStructReader;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
@@ -61,10 +52,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspect
import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.BaseCharTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
@@ -73,42 +60,32 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hive.common.util.HiveStringUtils;
import org.apache.hive.common.util.TimestampParser;
-import org.codehaus.jackson.JsonFactory;
-import org.codehaus.jackson.JsonParseException;
-import org.codehaus.jackson.JsonParser;
-import org.codehaus.jackson.JsonToken;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SerDeSpec(schemaProps = {serdeConstants.LIST_COLUMNS,
- serdeConstants.LIST_COLUMN_TYPES,
- serdeConstants.TIMESTAMP_FORMATS})
+ serdeConstants.LIST_COLUMN_TYPES,
+ serdeConstants.TIMESTAMP_FORMATS })
-// FIXME: move TestJsonSerDe from hcat to serde2
public class JsonSerDe extends AbstractSerDe {
private static final Logger LOG = LoggerFactory.getLogger(JsonSerDe.class);
private List<String> columnNames;
- private StructTypeInfo schema;
- private JsonFactory jsonFactory = null;
-
- private StandardStructObjectInspector cachedObjectInspector;
- private TimestampParser tsParser;
+ private HiveJsonStructReader structReader;
+ private StructTypeInfo rowTypeInfo;
@Override
public void initialize(Configuration conf, Properties tbl)
throws SerDeException {
List<TypeInfo> columnTypes;
- StructTypeInfo rowTypeInfo;
-
LOG.debug("Initializing JsonSerDe: {}", tbl.entrySet());
- // Get column names and types
+ // Get column names
String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS);
- String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
final String columnNameDelimiter = tbl.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ? tbl
- .getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : String.valueOf(SerDeUtils.COMMA);
+ .getProperty(serdeConstants.COLUMN_NAME_DELIMITER)
+ : String.valueOf(SerDeUtils.COMMA);
// all table column names
if (columnNameProperty.isEmpty()) {
columnNames = Collections.emptyList();
@@ -117,6 +94,7 @@ public class JsonSerDe extends AbstractSerDe {
}
// all column types
+ String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
if (columnTypeProperty.isEmpty()) {
columnTypes = Collections.emptyList();
} else {
@@ -129,298 +107,34 @@ public class JsonSerDe extends AbstractSerDe {
assert (columnNames.size() == columnTypes.size());
rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes);
- schema = rowTypeInfo;
- LOG.debug("schema : {}", schema);
- cachedObjectInspector = (StandardStructObjectInspector) TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(rowTypeInfo);
- jsonFactory = new JsonFactory();
- tsParser = new TimestampParser(
- HiveStringUtils.splitAndUnEscape(tbl.getProperty(serdeConstants.TIMESTAMP_FORMATS)));
+ TimestampParser tsParser = new TimestampParser(
+ HiveStringUtils.splitAndUnEscape(tbl.getProperty(serdeConstants.TIMESTAMP_FORMATS)));
+ structReader = new HiveJsonStructReader(rowTypeInfo, tsParser);
+ structReader.setIgnoreUnknownFields(true);
+ structReader.enableHiveColIndexParsing(true);
+ structReader.setWritablesUsage(true);
}
/**
* Takes JSON string in Text form, and has to return an object representation above
* it that's readable by the corresponding object inspector.
+ *
* For this implementation, since we're using the jackson parser, we can construct
* our own object implementation, and we use HCatRecord for it
*/
@Override
public Object deserialize(Writable blob) throws SerDeException {
+ Object row;
Text t = (Text) blob;
- JsonParser p;
- List<Object> r = new ArrayList<>(Collections.nCopies(columnNames.size(), null));
try {
- p = jsonFactory.createJsonParser(new ByteArrayInputStream((t.getBytes())));
- if (p.nextToken() != JsonToken.START_OBJECT) {
- throw new IOException("Start token not found where expected");
- }
- JsonToken token;
- while (((token = p.nextToken()) != JsonToken.END_OBJECT) && (token != null)) {
- // iterate through each token, and create appropriate object here.
- populateRecord(r, token, p, schema);
- }
- } catch (JsonParseException e) {
- LOG.warn("Error [{}] parsing json text [{}].", e, t);
- throw new SerDeException(e);
- } catch (IOException e) {
+ row = structReader.parseStruct(new ByteArrayInputStream((t.getBytes()), 0, t.getLength()));
+ return row;
+ } catch (Exception e) {
LOG.warn("Error [{}] parsing json text [{}].", e, t);
throw new SerDeException(e);
}
-
- return r;
- }
-
- private void populateRecord(List<Object> r, JsonToken token, JsonParser p, StructTypeInfo s) throws IOException {
- if (token != JsonToken.FIELD_NAME) {
- throw new IOException("Field name expected");
- }
- String fieldName = p.getText().toLowerCase();
- int fpos = s.getAllStructFieldNames().indexOf(fieldName);
- if (fpos == -1) {
- fpos = getPositionFromHiveInternalColumnName(fieldName);
- LOG.debug("NPE finding position for field [{}] in schema [{}],"
- + " attempting to check if it is an internal column name like _col0", fieldName, s);
- if (fpos == -1) {
- skipValue(p);
- return; // unknown field, we return. We'll continue from the next field onwards.
- }
- // If we get past this, then the column name did match the hive pattern for an internal
- // column name, such as _col0, etc, so it *MUST* match the schema for the appropriate column.
- // This means people can't use arbitrary column names such as _col0, and expect us to ignore it
- // if we find it.
- if (!fieldName.equalsIgnoreCase(getHiveInternalColumnName(fpos))) {
- LOG.error("Hive internal column name {} and position "
- + "encoding {} for the column name are at odds", fieldName, fpos);
- throw new IOException("Hive internal column name (" + fieldName
- + ") and position encoding (" + fpos
- + ") for the column name are at odds");
- }
- // If we reached here, then we were successful at finding an alternate internal
- // column mapping, and we're about to proceed.
- }
- Object currField = extractCurrentField(p, s.getStructFieldTypeInfo(fieldName), false);
- r.set(fpos, currField);
- }
-
- public String getHiveInternalColumnName(int fpos) {
- return HiveConf.getColumnInternalName(fpos);
- }
-
- public int getPositionFromHiveInternalColumnName(String internalName) {
- // return HiveConf.getPositionFromInternalName(fieldName);
- // The above line should have been all the implementation that
- // we need, but due to a bug in that impl which recognizes
- // only single-digit columns, we need another impl here.
- Pattern internalPattern = Pattern.compile("_col([0-9]+)");
- Matcher m = internalPattern.matcher(internalName);
- if (!m.matches()) {
- return -1;
- } else {
- return Integer.parseInt(m.group(1));
- }
- }
-
- /**
- * Utility method to extract (and forget) the next value token from the JsonParser,
- * as a whole. The reason this function gets called is to yank out the next value altogether,
- * because it corresponds to a field name that we do not recognize, and thus, do not have
- * a schema/type for. Thus, this field is to be ignored.
- *
- * @throws IOException
- * @throws JsonParseException
- */
- private void skipValue(JsonParser p) throws JsonParseException, IOException {
- JsonToken valueToken = p.nextToken();
-
- if ((valueToken == JsonToken.START_ARRAY) || (valueToken == JsonToken.START_OBJECT)) {
- // if the currently read token is a beginning of an array or object, move stream forward
- // skipping any child tokens till we're at the corresponding END_ARRAY or END_OBJECT token
- p.skipChildren();
- }
- // At the end of this function, the stream should be pointing to the last token that
- // corresponds to the value being skipped. This way, the next call to nextToken
- // will advance it to the next field name.
- }
-
- /**
- * Utility method to extract current expected field from given JsonParser
- * isTokenCurrent is a boolean variable also passed in, which determines
- * if the JsonParser is already at the token we expect to read next, or
- * needs advancing to the next before we read.
- */
- private Object extractCurrentField(JsonParser p, TypeInfo fieldTypeInfo,
- boolean isTokenCurrent) throws IOException {
- Object val = null;
- JsonToken valueToken;
- if (isTokenCurrent) {
- valueToken = p.getCurrentToken();
- } else {
- valueToken = p.nextToken();
- }
-
- switch (fieldTypeInfo.getCategory()) {
- case PRIMITIVE:
- PrimitiveObjectInspector.PrimitiveCategory primitiveCategory = PrimitiveObjectInspector.PrimitiveCategory.UNKNOWN;
- if (fieldTypeInfo instanceof PrimitiveTypeInfo) {
- primitiveCategory = ((PrimitiveTypeInfo) fieldTypeInfo).getPrimitiveCategory();
- }
- switch (primitiveCategory) {
- case INT:
- val = (valueToken == JsonToken.VALUE_NULL) ? null : p.getIntValue();
- break;
- case BYTE:
- val = (valueToken == JsonToken.VALUE_NULL) ? null : p.getByteValue();
- break;
- case SHORT:
- val = (valueToken == JsonToken.VALUE_NULL) ? null : p.getShortValue();
- break;
- case LONG:
- val = (valueToken == JsonToken.VALUE_NULL) ? null : p.getLongValue();
- break;
- case BOOLEAN:
- String bval = (valueToken == JsonToken.VALUE_NULL) ? null : p.getText();
- if (bval != null) {
- val = Boolean.valueOf(bval);
- } else {
- val = null;
- }
- break;
- case FLOAT:
- val = (valueToken == JsonToken.VALUE_NULL) ? null : p.getFloatValue();
- break;
- case DOUBLE:
- val = (valueToken == JsonToken.VALUE_NULL) ? null : p.getDoubleValue();
- break;
- case STRING:
- val = (valueToken == JsonToken.VALUE_NULL) ? null : p.getText();
- break;
- case BINARY:
- String b = (valueToken == JsonToken.VALUE_NULL) ? null : p.getText();
- if (b != null) {
- try {
- String t = Text.decode(b.getBytes(), 0, b.getBytes().length);
- return t.getBytes();
- } catch (CharacterCodingException e) {
- LOG.warn("Error generating json binary type from object.", e);
- return null;
- }
- } else {
- val = null;
- }
- break;
- case DATE:
- val = (valueToken == JsonToken.VALUE_NULL) ? null : Date.valueOf(p.getText());
- break;
- case TIMESTAMP:
- val = (valueToken == JsonToken.VALUE_NULL) ? null : tsParser.parseTimestamp(p.getText());
- break;
- case DECIMAL:
- val = (valueToken == JsonToken.VALUE_NULL) ? null : HiveDecimal.create(p.getText());
- break;
- case VARCHAR:
- int vLen = ((BaseCharTypeInfo) fieldTypeInfo).getLength();
- val = (valueToken == JsonToken.VALUE_NULL) ? null : new HiveVarchar(p.getText(), vLen);
- break;
- case CHAR:
- int cLen = ((BaseCharTypeInfo) fieldTypeInfo).getLength();
- val = (valueToken == JsonToken.VALUE_NULL) ? null : new HiveChar(p.getText(), cLen);
- break;
- }
- break;
- case LIST:
- if (valueToken == JsonToken.VALUE_NULL) {
- val = null;
- break;
- }
- if (valueToken != JsonToken.START_ARRAY) {
- throw new IOException("Start of Array expected");
- }
- List<Object> arr = new ArrayList<Object>();
- while ((valueToken = p.nextToken()) != JsonToken.END_ARRAY) {
- arr.add(extractCurrentField(p, ((ListTypeInfo)fieldTypeInfo).getListElementTypeInfo(), true));
- }
- val = arr;
- break;
- case MAP:
- if (valueToken == JsonToken.VALUE_NULL) {
- val = null;
- break;
- }
- if (valueToken != JsonToken.START_OBJECT) {
- throw new IOException("Start of Object expected");
- }
- Map<Object, Object> map = new LinkedHashMap<Object, Object>();
- while ((valueToken = p.nextToken()) != JsonToken.END_OBJECT) {
- Object k = getObjectOfCorrespondingPrimitiveType(p.getCurrentName(),
- (PrimitiveTypeInfo) ((MapTypeInfo)fieldTypeInfo).getMapKeyTypeInfo());
- Object v = extractCurrentField(p, ((MapTypeInfo) fieldTypeInfo).getMapValueTypeInfo(), false);
- map.put(k, v);
- }
- val = map;
- break;
- case STRUCT:
- if (valueToken == JsonToken.VALUE_NULL) {
- val = null;
- break;
- }
- if (valueToken != JsonToken.START_OBJECT) {
- throw new IOException("Start of Object expected");
- }
- ArrayList<TypeInfo> subSchema = ((StructTypeInfo)fieldTypeInfo).getAllStructFieldTypeInfos();
- int sz = subSchema.size();
- List<Object> struct = new ArrayList<Object>(Collections.nCopies(sz, null));
- while ((valueToken = p.nextToken()) != JsonToken.END_OBJECT) {
- populateRecord(struct, valueToken, p, ((StructTypeInfo) fieldTypeInfo));
- }
- val = struct;
- break;
- default:
- LOG.error("Unknown type found: " + fieldTypeInfo);
- return null;
- }
- return val;
- }
-
- private Object getObjectOfCorrespondingPrimitiveType(String s, PrimitiveTypeInfo mapKeyType)
- throws IOException {
- switch (mapKeyType.getPrimitiveCategory()) {
- case INT:
- return Integer.valueOf(s);
- case BYTE:
- return Byte.valueOf(s);
- case SHORT:
- return Short.valueOf(s);
- case LONG:
- return Long.valueOf(s);
- case BOOLEAN:
- return (s.equalsIgnoreCase("true"));
- case FLOAT:
- return Float.valueOf(s);
- case DOUBLE:
- return Double.valueOf(s);
- case STRING:
- return s;
- case BINARY:
- try {
- String t = Text.decode(s.getBytes(), 0, s.getBytes().length);
- return t.getBytes();
- } catch (CharacterCodingException e) {
- LOG.warn("Error generating json binary type from object.", e);
- return null;
- }
- case DATE:
- return Date.valueOf(s);
- case TIMESTAMP:
- return Timestamp.valueOf(s);
- case DECIMAL:
- return HiveDecimal.create(s);
- case VARCHAR:
- return new HiveVarchar(s, ((BaseCharTypeInfo) mapKeyType).getLength());
- case CHAR:
- return new HiveChar(s, ((BaseCharTypeInfo) mapKeyType).getLength());
- }
- throw new IOException("Could not convert from string to map type " + mapKeyType.getTypeName());
}
/**
@@ -462,7 +176,6 @@ public class JsonSerDe extends AbstractSerDe {
private static StringBuilder appendWithQuotes(StringBuilder sb, String value) {
return sb == null ? null : sb.append(SerDeUtils.QUOTE).append(value).append(SerDeUtils.QUOTE);
}
-
// TODO : code section copied over from SerDeUtils because of non-standard json production there
// should use quotes for all field names. We should fix this there, and then remove this copy.
// See http://jackson.codehaus.org/1.7.3/javadoc/org/codehaus/jackson/JsonParser.Feature.html#ALLOW_UNQUOTED_FIELD_NAMES
@@ -472,184 +185,184 @@ public class JsonSerDe extends AbstractSerDe {
private static void buildJSONString(StringBuilder sb, Object o, ObjectInspector oi) throws IOException {
switch (oi.getCategory()) {
- case PRIMITIVE: {
- PrimitiveObjectInspector poi = (PrimitiveObjectInspector) oi;
- if (o == null) {
- sb.append("null");
- } else {
- switch (poi.getPrimitiveCategory()) {
- case BOOLEAN: {
- boolean b = ((BooleanObjectInspector) poi).get(o);
- sb.append(b ? "true" : "false");
- break;
- }
- case BYTE: {
- sb.append(((ByteObjectInspector) poi).get(o));
- break;
- }
- case SHORT: {
- sb.append(((ShortObjectInspector) poi).get(o));
- break;
- }
- case INT: {
- sb.append(((IntObjectInspector) poi).get(o));
- break;
- }
- case LONG: {
- sb.append(((LongObjectInspector) poi).get(o));
- break;
- }
- case FLOAT: {
- sb.append(((FloatObjectInspector) poi).get(o));
- break;
- }
- case DOUBLE: {
- sb.append(((DoubleObjectInspector) poi).get(o));
- break;
- }
- case STRING: {
- String s =
- SerDeUtils.escapeString(((StringObjectInspector) poi).getPrimitiveJavaObject(o));
- appendWithQuotes(sb, s);
- break;
- }
- case BINARY:
- byte[] b = ((BinaryObjectInspector) oi).getPrimitiveJavaObject(o);
- Text txt = new Text();
- txt.set(b, 0, b.length);
- appendWithQuotes(sb, SerDeUtils.escapeString(txt.toString()));
- break;
- case DATE:
- Date d = ((DateObjectInspector) poi).getPrimitiveJavaObject(o);
- appendWithQuotes(sb, d.toString());
- break;
- case TIMESTAMP: {
- Timestamp t = ((TimestampObjectInspector) poi).getPrimitiveJavaObject(o);
- appendWithQuotes(sb, t.toString());
- break;
- }
- case DECIMAL:
- sb.append(((HiveDecimalObjectInspector) poi).getPrimitiveJavaObject(o));
- break;
- case VARCHAR: {
- String s = SerDeUtils.escapeString(
- ((HiveVarcharObjectInspector) poi).getPrimitiveJavaObject(o).toString());
- appendWithQuotes(sb, s);
- break;
- }
- case CHAR: {
- //this should use HiveChar.getPaddedValue() but it's protected; currently (v0.13)
- // HiveChar.toString() returns getPaddedValue()
- String s = SerDeUtils.escapeString(
- ((HiveCharObjectInspector) poi).getPrimitiveJavaObject(o).toString());
- appendWithQuotes(sb, s);
- break;
- }
- default:
- throw new RuntimeException("Unknown primitive type: " + poi.getPrimitiveCategory());
- }
+ case PRIMITIVE: {
+ PrimitiveObjectInspector poi = (PrimitiveObjectInspector) oi;
+ if (o == null) {
+ sb.append("null");
+ } else {
+ switch (poi.getPrimitiveCategory()) {
+ case BOOLEAN: {
+ boolean b = ((BooleanObjectInspector) poi).get(o);
+ sb.append(b ? "true" : "false");
+ break;
+ }
+ case BYTE: {
+ sb.append(((ByteObjectInspector) poi).get(o));
+ break;
+ }
+ case SHORT: {
+ sb.append(((ShortObjectInspector) poi).get(o));
+ break;
+ }
+ case INT: {
+ sb.append(((IntObjectInspector) poi).get(o));
+ break;
+ }
+ case LONG: {
+ sb.append(((LongObjectInspector) poi).get(o));
+ break;
+ }
+ case FLOAT: {
+ sb.append(((FloatObjectInspector) poi).get(o));
+ break;
+ }
+ case DOUBLE: {
+ sb.append(((DoubleObjectInspector) poi).get(o));
+ break;
+ }
+ case STRING: {
+ String s =
+ SerDeUtils.escapeString(((StringObjectInspector) poi).getPrimitiveJavaObject(o));
+ appendWithQuotes(sb, s);
+ break;
+ }
+ case BINARY:
+ byte[] b = ((BinaryObjectInspector) oi).getPrimitiveJavaObject(o);
+ Text txt = new Text();
+ txt.set(b, 0, b.length);
+ appendWithQuotes(sb, SerDeUtils.escapeString(txt.toString()));
+ break;
+ case DATE:
+ Date d = ((DateObjectInspector) poi).getPrimitiveJavaObject(o);
+ appendWithQuotes(sb, d.toString());
+ break;
+ case TIMESTAMP: {
+ Timestamp t = ((TimestampObjectInspector) poi).getPrimitiveJavaObject(o);
+ appendWithQuotes(sb, t.toString());
+ break;
+ }
+ case DECIMAL:
+ sb.append(((HiveDecimalObjectInspector) poi).getPrimitiveJavaObject(o));
+ break;
+ case VARCHAR: {
+ String s = SerDeUtils.escapeString(
+ ((HiveVarcharObjectInspector) poi).getPrimitiveJavaObject(o).toString());
+ appendWithQuotes(sb, s);
+ break;
+ }
+ case CHAR: {
+ //this should use HiveChar.getPaddedValue() but it's protected; currently (v0.13)
+ // HiveChar.toString() returns getPaddedValue()
+ String s = SerDeUtils.escapeString(
+ ((HiveCharObjectInspector) poi).getPrimitiveJavaObject(o).toString());
+ appendWithQuotes(sb, s);
+ break;
+ }
+ default:
+ throw new RuntimeException("Unknown primitive type: " + poi.getPrimitiveCategory());
}
- break;
}
- case LIST: {
- ListObjectInspector loi = (ListObjectInspector) oi;
- ObjectInspector listElementObjectInspector = loi
+ break;
+ }
+ case LIST: {
+ ListObjectInspector loi = (ListObjectInspector) oi;
+ ObjectInspector listElementObjectInspector = loi
.getListElementObjectInspector();
- List<?> olist = loi.getList(o);
- if (olist == null) {
- sb.append("null");
- } else {
- sb.append(SerDeUtils.LBRACKET);
- for (int i = 0; i < olist.size(); i++) {
- if (i > 0) {
- sb.append(SerDeUtils.COMMA);
- }
- buildJSONString(sb, olist.get(i), listElementObjectInspector);
+ List<?> olist = loi.getList(o);
+ if (olist == null) {
+ sb.append("null");
+ } else {
+ sb.append(SerDeUtils.LBRACKET);
+ for (int i = 0; i < olist.size(); i++) {
+ if (i > 0) {
+ sb.append(SerDeUtils.COMMA);
}
- sb.append(SerDeUtils.RBRACKET);
+ buildJSONString(sb, olist.get(i), listElementObjectInspector);
}
- break;
+ sb.append(SerDeUtils.RBRACKET);
}
- case MAP: {
- MapObjectInspector moi = (MapObjectInspector) oi;
- ObjectInspector mapKeyObjectInspector = moi.getMapKeyObjectInspector();
- ObjectInspector mapValueObjectInspector = moi
+ break;
+ }
+ case MAP: {
+ MapObjectInspector moi = (MapObjectInspector) oi;
+ ObjectInspector mapKeyObjectInspector = moi.getMapKeyObjectInspector();
+ ObjectInspector mapValueObjectInspector = moi
.getMapValueObjectInspector();
- Map<?, ?> omap = moi.getMap(o);
- if (omap == null) {
- sb.append("null");
- } else {
- sb.append(SerDeUtils.LBRACE);
- boolean first = true;
- for (Object entry : omap.entrySet()) {
- if (first) {
- first = false;
- } else {
- sb.append(SerDeUtils.COMMA);
- }
- Map.Entry<?, ?> e = (Map.Entry<?, ?>) entry;
- StringBuilder keyBuilder = new StringBuilder();
- buildJSONString(keyBuilder, e.getKey(), mapKeyObjectInspector);
- String keyString = keyBuilder.toString().trim();
- if ((!keyString.isEmpty()) && (keyString.charAt(0) != SerDeUtils.QUOTE)) {
- appendWithQuotes(sb, keyString);
- } else {
- sb.append(keyString);
- }
- sb.append(SerDeUtils.COLON);
- buildJSONString(sb, e.getValue(), mapValueObjectInspector);
+ Map<?, ?> omap = moi.getMap(o);
+ if (omap == null) {
+ sb.append("null");
+ } else {
+ sb.append(SerDeUtils.LBRACE);
+ boolean first = true;
+ for (Object entry : omap.entrySet()) {
+ if (first) {
+ first = false;
+ } else {
+ sb.append(SerDeUtils.COMMA);
+ }
+ Map.Entry<?, ?> e = (Map.Entry<?, ?>) entry;
+ StringBuilder keyBuilder = new StringBuilder();
+ buildJSONString(keyBuilder, e.getKey(), mapKeyObjectInspector);
+ String keyString = keyBuilder.toString().trim();
+ if ((!keyString.isEmpty()) && (keyString.charAt(0) != SerDeUtils.QUOTE)) {
+ appendWithQuotes(sb, keyString);
+ } else {
+ sb.append(keyString);
}
- sb.append(SerDeUtils.RBRACE);
+ sb.append(SerDeUtils.COLON);
+ buildJSONString(sb, e.getValue(), mapValueObjectInspector);
}
- break;
+ sb.append(SerDeUtils.RBRACE);
}
- case STRUCT: {
- StructObjectInspector soi = (StructObjectInspector) oi;
- List<? extends StructField> structFields = soi.getAllStructFieldRefs();
- if (o == null) {
- sb.append("null");
- } else {
- sb.append(SerDeUtils.LBRACE);
- for (int i = 0; i < structFields.size(); i++) {
- if (i > 0) {
- sb.append(SerDeUtils.COMMA);
- }
- appendWithQuotes(sb, structFields.get(i).getFieldName());
- sb.append(SerDeUtils.COLON);
- buildJSONString(sb, soi.getStructFieldData(o, structFields.get(i)),
- structFields.get(i).getFieldObjectInspector());
+ break;
+ }
+ case STRUCT: {
+ StructObjectInspector soi = (StructObjectInspector) oi;
+ List<? extends StructField> structFields = soi.getAllStructFieldRefs();
+ if (o == null) {
+ sb.append("null");
+ } else {
+ sb.append(SerDeUtils.LBRACE);
+ for (int i = 0; i < structFields.size(); i++) {
+ if (i > 0) {
+ sb.append(SerDeUtils.COMMA);
}
- sb.append(SerDeUtils.RBRACE);
+ appendWithQuotes(sb, structFields.get(i).getFieldName());
+ sb.append(SerDeUtils.COLON);
+ buildJSONString(sb, soi.getStructFieldData(o, structFields.get(i)),
+ structFields.get(i).getFieldObjectInspector());
}
- break;
+ sb.append(SerDeUtils.RBRACE);
}
- case UNION: {
- UnionObjectInspector uoi = (UnionObjectInspector) oi;
- if (o == null) {
- sb.append("null");
- } else {
- sb.append(SerDeUtils.LBRACE);
- sb.append(uoi.getTag(o));
- sb.append(SerDeUtils.COLON);
- buildJSONString(sb, uoi.getField(o),
+ break;
+ }
+ case UNION: {
+ UnionObjectInspector uoi = (UnionObjectInspector) oi;
+ if (o == null) {
+ sb.append("null");
+ } else {
+ sb.append(SerDeUtils.LBRACE);
+ sb.append(uoi.getTag(o));
+ sb.append(SerDeUtils.COLON);
+ buildJSONString(sb, uoi.getField(o),
uoi.getObjectInspectors().get(uoi.getTag(o)));
- sb.append(SerDeUtils.RBRACE);
- }
- break;
+ sb.append(SerDeUtils.RBRACE);
}
- default:
- throw new RuntimeException("Unknown type in ObjectInspector!");
+ break;
+ }
+ default:
+ throw new RuntimeException("Unknown type in ObjectInspector!");
}
}
/**
- * Returns an object inspector for the specified schema that
- * is capable of reading in the object representation of the JSON string
+ * Returns an object inspector for the specified schema that
+ * is capable of reading in the object representation of the JSON string
*/
@Override
public ObjectInspector getObjectInspector() throws SerDeException {
- return cachedObjectInspector;
+ return structReader.getObjectInspector();
}
@Override
@@ -663,4 +376,12 @@ public class JsonSerDe extends AbstractSerDe {
return null;
}
+ public StructTypeInfo getTypeInfo() {
+ return rowTypeInfo;
+ }
+
+ public void setWriteablesUsage(boolean b) {
+ structReader.setWritablesUsage(b);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1105ef39/serde/src/java/org/apache/hadoop/hive/serde2/json/HiveJsonStructReader.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/json/HiveJsonStructReader.java b/serde/src/java/org/apache/hadoop/hive/serde2/json/HiveJsonStructReader.java
new file mode 100644
index 0000000..ec4efad
--- /dev/null
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/json/HiveJsonStructReader.java
@@ -0,0 +1,402 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.serde2.json;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.CharacterCodingException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.hive.common.type.Date;
+import org.apache.hadoop.hive.common.type.HiveChar;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.common.type.HiveVarchar;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.BaseCharTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hive.common.util.TimestampParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+
+public class HiveJsonStructReader {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HiveJsonStructReader.class);
+
+ private ObjectInspector oi;
+ private JsonFactory factory;
+
+
+ Set<String> reportedUnknownFieldNames = new HashSet<>();
+
+ private static boolean ignoreUnknownFields;
+ private static boolean hiveColIndexParsing;
+ private boolean writeablePrimitives;
+
+ private TimestampParser tsParser;
+
+ public HiveJsonStructReader(TypeInfo t) {
+ this(t, new TimestampParser());
+ }
+
+ public HiveJsonStructReader(TypeInfo t, TimestampParser tsParser) {
+ this.tsParser = tsParser;
+ oi = TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(t);
+ factory = new JsonFactory();
+ }
+
+ public Object parseStruct(String text) throws JsonParseException, IOException, SerDeException {
+ JsonParser parser = factory.createParser(text);
+ return parseInternal(parser);
+ }
+
+ public Object parseStruct(InputStream is) throws JsonParseException, IOException, SerDeException {
+ JsonParser parser = factory.createParser(is);
+ return parseInternal(parser);
+ }
+
+ private Object parseInternal(JsonParser parser) throws SerDeException {
+ try {
+ parser.nextToken();
+ Object res = parseDispatcher(parser, oi);
+ return res;
+ } catch (Exception e) {
+ String locationStr = parser.getCurrentLocation().getLineNr() + "," + parser.getCurrentLocation().getColumnNr();
+ throw new SerDeException("at[" + locationStr + "]: " + e.getMessage(), e);
+ }
+ }
+
+ private Object parseDispatcher(JsonParser parser, ObjectInspector oi)
+ throws JsonParseException, IOException, SerDeException {
+
+ switch (oi.getCategory()) {
+ case PRIMITIVE:
+ return parsePrimitive(parser, (PrimitiveObjectInspector) oi);
+ case LIST:
+ return parseList(parser, (ListObjectInspector) oi);
+ case STRUCT:
+ return parseStruct(parser, (StructObjectInspector) oi);
+ case MAP:
+ return parseMap(parser, (MapObjectInspector) oi);
+ default:
+ throw new SerDeException("parsing of: " + oi.getCategory() + " is not handled");
+ }
+ }
+
+ private Object parseMap(JsonParser parser, MapObjectInspector oi) throws IOException, SerDeException {
+
+ if (parser.getCurrentToken() == JsonToken.VALUE_NULL) {
+ parser.nextToken();
+ return null;
+ }
+
+ Map<Object, Object> ret = new LinkedHashMap<>();
+
+ if (parser.getCurrentToken() != JsonToken.START_OBJECT) {
+ throw new SerDeException("struct expected");
+ }
+
+ if (!(oi.getMapKeyObjectInspector() instanceof PrimitiveObjectInspector)) {
+ throw new SerDeException("map key must be a primitive");
+ }
+ PrimitiveObjectInspector keyOI = (PrimitiveObjectInspector) oi.getMapKeyObjectInspector();
+ ObjectInspector valOI = oi.getMapValueObjectInspector();
+
+ JsonToken currentToken = parser.nextToken();
+ while (currentToken != null && currentToken != JsonToken.END_OBJECT) {
+
+ if (currentToken != JsonToken.FIELD_NAME) {
+ throw new SerDeException("unexpected token: " + currentToken);
+ }
+
+ Object key = parseMapKey(parser, keyOI);
+ Object val = parseDispatcher(parser, valOI);
+ ret.put(key, val);
+
+ currentToken = parser.getCurrentToken();
+ }
+ if (currentToken != null) {
+ parser.nextToken();
+ }
+ return ret;
+
+ }
+
+ private Object parseStruct(JsonParser parser, StructObjectInspector oi)
+ throws JsonParseException, IOException, SerDeException {
+
+ Object[] ret = new Object[oi.getAllStructFieldRefs().size()];
+
+ if (parser.getCurrentToken() == JsonToken.VALUE_NULL) {
+ parser.nextToken();
+ return null;
+ }
+ if (parser.getCurrentToken() != JsonToken.START_OBJECT) {
+ throw new SerDeException("struct expected");
+ }
+ JsonToken currentToken = parser.nextToken();
+ while (currentToken != null && currentToken != JsonToken.END_OBJECT) {
+
+ switch (currentToken) {
+ case FIELD_NAME:
+ String name = parser.getCurrentName();
+ try {
+ StructField field = null;
+ try {
+ field = getStructField(oi, name);
+ } catch (RuntimeException e) {
+ if (ignoreUnknownFields) {
+ if (!reportedUnknownFieldNames.contains(name)) {
+ LOG.warn("ignoring field:" + name);
+ reportedUnknownFieldNames.add(name);
+ }
+ parser.nextToken();
+ skipValue(parser);
+ break;
+ }
+ }
+ if (field == null) {
+ throw new SerDeException("undeclared field");
+ }
+ parser.nextToken();
+ ret[field.getFieldID()] = parseDispatcher(parser, field.getFieldObjectInspector());
+ } catch (Exception e) {
+ throw new SerDeException("struct field " + name + ": " + e.getMessage(), e);
+ }
+ break;
+ default:
+ throw new SerDeException("unexpected token: " + currentToken);
+ }
+ currentToken = parser.getCurrentToken();
+ }
+ if (currentToken != null) {
+ parser.nextToken();
+ }
+ return ret;
+ }
+
+ private StructField getStructField(StructObjectInspector oi, String name) {
+ if (hiveColIndexParsing) {
+ int colIndex = getColIndex(name);
+ if (colIndex >= 0) {
+ return oi.getAllStructFieldRefs().get(colIndex);
+ }
+ }
+ // FIXME: linear scan inside the below method...get a map here or something..
+ return oi.getStructFieldRef(name);
+ }
+
+ Pattern internalPattern = Pattern.compile("^_col([0-9]+)$");
+
+ private int getColIndex(String internalName) {
+ // The above line should have been all the implementation that
+ // we need, but due to a bug in that impl which recognizes
+ // only single-digit columns, we need another impl here.
+ Matcher m = internalPattern.matcher(internalName);
+ if (!m.matches()) {
+ return -1;
+ } else {
+ return Integer.parseInt(m.group(1));
+ }
+ }
+
+ private static void skipValue(JsonParser parser) throws JsonParseException, IOException {
+
+ int array = 0;
+ int object = 0;
+ do {
+ JsonToken currentToken = parser.getCurrentToken();
+ if(currentToken == JsonToken.START_ARRAY) {
+ array++;
+ }
+ if (currentToken == JsonToken.END_ARRAY) {
+ array--;
+ }
+ if (currentToken == JsonToken.START_OBJECT) {
+ object++;
+ }
+ if (currentToken == JsonToken.END_OBJECT) {
+ object--;
+ }
+
+ parser.nextToken();
+
+ } while (array > 0 || object > 0);
+
+ }
+
+ private Object parseList(JsonParser parser, ListObjectInspector oi)
+ throws JsonParseException, IOException, SerDeException {
+ List<Object> ret = new ArrayList<>();
+
+ if (parser.getCurrentToken() == JsonToken.VALUE_NULL) {
+ parser.nextToken();
+ return null;
+ }
+
+ if (parser.getCurrentToken() != JsonToken.START_ARRAY) {
+ throw new SerDeException("array expected");
+ }
+ ObjectInspector eOI = oi.getListElementObjectInspector();
+ JsonToken currentToken = parser.nextToken();
+ try {
+ while (currentToken != null && currentToken != JsonToken.END_ARRAY) {
+ ret.add(parseDispatcher(parser, eOI));
+ currentToken = parser.getCurrentToken();
+ }
+ } catch (Exception e) {
+ throw new SerDeException("array: " + e.getMessage(), e);
+ }
+
+ currentToken = parser.nextToken();
+
+ return ret;
+ }
+
+ private Object parsePrimitive(JsonParser parser, PrimitiveObjectInspector oi)
+ throws SerDeException, IOException {
+ JsonToken currentToken = parser.getCurrentToken();
+ if (currentToken == null) {
+ return null;
+ }
+ try {
+ switch (parser.getCurrentToken()) {
+ case VALUE_FALSE:
+ case VALUE_TRUE:
+ case VALUE_NUMBER_INT:
+ case VALUE_NUMBER_FLOAT:
+ case VALUE_STRING:
+ return getObjectOfCorrespondingPrimitiveType(parser.getValueAsString(), oi);
+ case VALUE_NULL:
+ return null;
+ default:
+ throw new SerDeException("unexpected token type: " + currentToken);
+ }
+ } finally {
+ parser.nextToken();
+
+ }
+ }
+
+ private Object getObjectOfCorrespondingPrimitiveType(String s, PrimitiveObjectInspector oi)
+ throws IOException {
+ PrimitiveTypeInfo typeInfo = oi.getTypeInfo();
+ if (writeablePrimitives) {
+ Converter c = ObjectInspectorConverters.getConverter(PrimitiveObjectInspectorFactory.javaStringObjectInspector, oi);
+ return c.convert(s);
+ }
+
+ switch (typeInfo.getPrimitiveCategory()) {
+ case INT:
+ return Integer.valueOf(s);
+ case BYTE:
+ return Byte.valueOf(s);
+ case SHORT:
+ return Short.valueOf(s);
+ case LONG:
+ return Long.valueOf(s);
+ case BOOLEAN:
+ return (s.equalsIgnoreCase("true"));
+ case FLOAT:
+ return Float.valueOf(s);
+ case DOUBLE:
+ return Double.valueOf(s);
+ case STRING:
+ return s;
+ case BINARY:
+ try {
+ String t = Text.decode(s.getBytes(), 0, s.getBytes().length);
+ return t.getBytes();
+ } catch (CharacterCodingException e) {
+ LOG.warn("Error generating json binary type from object.", e);
+ return null;
+ }
+ case DATE:
+ return Date.valueOf(s);
+ case TIMESTAMP:
+ return tsParser.parseTimestamp(s);
+ case DECIMAL:
+ return HiveDecimal.create(s);
+ case VARCHAR:
+ return new HiveVarchar(s, ((BaseCharTypeInfo) typeInfo).getLength());
+ case CHAR:
+ return new HiveChar(s, ((BaseCharTypeInfo) typeInfo).getLength());
+ }
+ throw new IOException("Could not convert from string to map type " + typeInfo.getTypeName());
+ }
+
+ private Object parseMapKey(JsonParser parser, PrimitiveObjectInspector oi) throws SerDeException, IOException {
+ JsonToken currentToken = parser.getCurrentToken();
+ if (currentToken == null) {
+ return null;
+ }
+ try {
+ switch (parser.getCurrentToken()) {
+ case FIELD_NAME:
+ return getObjectOfCorrespondingPrimitiveType(parser.getValueAsString(), oi);
+ case VALUE_NULL:
+ return null;
+ default:
+ throw new SerDeException("unexpected token type: " + currentToken);
+ }
+ } finally {
+ parser.nextToken();
+
+ }
+ }
+
+ public void setIgnoreUnknownFields(boolean b) {
+ ignoreUnknownFields = b;
+ }
+
+ public void enableHiveColIndexParsing(boolean b) {
+ hiveColIndexParsing = b;
+ }
+
+ public void setWritablesUsage(boolean b) {
+ writeablePrimitives = b;
+ }
+
+ public ObjectInspector getObjectInspector() {
+ return oi;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/1105ef39/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java b/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java
index cabb64c..416bd67 100644
--- a/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java
+++ b/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java
@@ -30,7 +30,7 @@ import com.google.common.base.Joiner;
/**
* Streaming Writer handles utf8 encoded Json (Strict syntax).
- * Uses org.apache.hadoop.hive.serde2.JsonSerDe to process Json input
+ * Uses {@link JsonSerDe} to process Json input
*
* NOTE: This record writer is NOT thread-safe. Use one record writer per streaming connection.
*/