You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/07/11 02:04:59 UTC

[27/32] hive git commit: HIVE-18545: Add UDF to parse complex types from json (Zoltan Haindrich reviewed by Ashutosh Chauhan)

http://git-wip-us.apache.org/repos/asf/hive/blob/1105ef39/serde/src/java/org/apache/hadoop/hive/serde2/JsonSerDe.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/JsonSerDe.java b/serde/src/java/org/apache/hadoop/hive/serde2/JsonSerDe.java
index 2bb4a0f..f1c8477 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/JsonSerDe.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/JsonSerDe.java
@@ -20,30 +20,21 @@ package org.apache.hadoop.hive.serde2;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
-import java.nio.charset.CharacterCodingException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.type.Date;
-import org.apache.hadoop.hive.common.type.HiveChar;
-import org.apache.hadoop.hive.common.type.HiveDecimal;
-import org.apache.hadoop.hive.common.type.HiveVarchar;
 import org.apache.hadoop.hive.common.type.Timestamp;
-import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.json.HiveJsonStructReader;
 import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
@@ -61,10 +52,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspect
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.BaseCharTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
@@ -73,42 +60,32 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hive.common.util.HiveStringUtils;
 import org.apache.hive.common.util.TimestampParser;
-import org.codehaus.jackson.JsonFactory;
-import org.codehaus.jackson.JsonParseException;
-import org.codehaus.jackson.JsonParser;
-import org.codehaus.jackson.JsonToken;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SerDeSpec(schemaProps = {serdeConstants.LIST_COLUMNS,
-  serdeConstants.LIST_COLUMN_TYPES,
-  serdeConstants.TIMESTAMP_FORMATS})
+    serdeConstants.LIST_COLUMN_TYPES,
+    serdeConstants.TIMESTAMP_FORMATS })
 
-// FIXME: move TestJsonSerDe from hcat to serde2
 public class JsonSerDe extends AbstractSerDe {
 
   private static final Logger LOG = LoggerFactory.getLogger(JsonSerDe.class);
   private List<String> columnNames;
-  private StructTypeInfo schema;
 
-  private JsonFactory jsonFactory = null;
-
-  private StandardStructObjectInspector cachedObjectInspector;
-  private TimestampParser tsParser;
+  private HiveJsonStructReader structReader;
+  private StructTypeInfo rowTypeInfo;
 
   @Override
   public void initialize(Configuration conf, Properties tbl)
     throws SerDeException {
     List<TypeInfo> columnTypes;
-    StructTypeInfo rowTypeInfo;
-
     LOG.debug("Initializing JsonSerDe: {}", tbl.entrySet());
 
-    // Get column names and types
+    // Get column names
     String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS);
-    String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
     final String columnNameDelimiter = tbl.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ? tbl
-      .getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : String.valueOf(SerDeUtils.COMMA);
+        .getProperty(serdeConstants.COLUMN_NAME_DELIMITER)
+      : String.valueOf(SerDeUtils.COMMA);
     // all table column names
     if (columnNameProperty.isEmpty()) {
       columnNames = Collections.emptyList();
@@ -117,6 +94,7 @@ public class JsonSerDe extends AbstractSerDe {
     }
 
     // all column types
+    String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
     if (columnTypeProperty.isEmpty()) {
       columnTypes = Collections.emptyList();
     } else {
@@ -129,298 +107,34 @@ public class JsonSerDe extends AbstractSerDe {
     assert (columnNames.size() == columnTypes.size());
 
     rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes);
-    schema = rowTypeInfo;
-    LOG.debug("schema : {}", schema);
-    cachedObjectInspector = (StandardStructObjectInspector) TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(rowTypeInfo);
 
-    jsonFactory = new JsonFactory();
-    tsParser = new TimestampParser(
-      HiveStringUtils.splitAndUnEscape(tbl.getProperty(serdeConstants.TIMESTAMP_FORMATS)));
+    TimestampParser tsParser = new TimestampParser(
+        HiveStringUtils.splitAndUnEscape(tbl.getProperty(serdeConstants.TIMESTAMP_FORMATS)));
+    structReader = new HiveJsonStructReader(rowTypeInfo, tsParser);
+    structReader.setIgnoreUnknownFields(true);
+    structReader.enableHiveColIndexParsing(true);
+    structReader.setWritablesUsage(true);
   }
 
   /**
    * Takes JSON string in Text form, and has to return an object representation above
    * it that's readable by the corresponding object inspector.
+   *
    * For this implementation, since we're using the jackson parser, we can construct
    * our own object implementation, and we use HCatRecord for it
    */
   @Override
   public Object deserialize(Writable blob) throws SerDeException {
 
+    Object row;
     Text t = (Text) blob;
-    JsonParser p;
-    List<Object> r = new ArrayList<>(Collections.nCopies(columnNames.size(), null));
     try {
-      p = jsonFactory.createJsonParser(new ByteArrayInputStream((t.getBytes())));
-      if (p.nextToken() != JsonToken.START_OBJECT) {
-        throw new IOException("Start token not found where expected");
-      }
-      JsonToken token;
-      while (((token = p.nextToken()) != JsonToken.END_OBJECT) && (token != null)) {
-        // iterate through each token, and create appropriate object here.
-        populateRecord(r, token, p, schema);
-      }
-    } catch (JsonParseException e) {
-      LOG.warn("Error [{}] parsing json text [{}].", e, t);
-      throw new SerDeException(e);
-    } catch (IOException e) {
+      row = structReader.parseStruct(new ByteArrayInputStream((t.getBytes()), 0, t.getLength()));
+      return row;
+    } catch (Exception e) {
       LOG.warn("Error [{}] parsing json text [{}].", e, t);
       throw new SerDeException(e);
     }
-
-    return r;
-  }
-
-  private void populateRecord(List<Object> r, JsonToken token, JsonParser p, StructTypeInfo s) throws IOException {
-    if (token != JsonToken.FIELD_NAME) {
-      throw new IOException("Field name expected");
-    }
-    String fieldName = p.getText().toLowerCase();
-    int fpos = s.getAllStructFieldNames().indexOf(fieldName);
-    if (fpos == -1) {
-      fpos = getPositionFromHiveInternalColumnName(fieldName);
-      LOG.debug("NPE finding position for field [{}] in schema [{}],"
-        + " attempting to check if it is an internal column name like _col0", fieldName, s);
-      if (fpos == -1) {
-        skipValue(p);
-        return; // unknown field, we return. We'll continue from the next field onwards.
-      }
-      // If we get past this, then the column name did match the hive pattern for an internal
-      // column name, such as _col0, etc, so it *MUST* match the schema for the appropriate column.
-      // This means people can't use arbitrary column names such as _col0, and expect us to ignore it
-      // if we find it.
-      if (!fieldName.equalsIgnoreCase(getHiveInternalColumnName(fpos))) {
-        LOG.error("Hive internal column name {} and position "
-          + "encoding {} for the column name are at odds", fieldName, fpos);
-        throw new IOException("Hive internal column name (" + fieldName
-          + ") and position encoding (" + fpos
-          + ") for the column name are at odds");
-      }
-      // If we reached here, then we were successful at finding an alternate internal
-      // column mapping, and we're about to proceed.
-    }
-    Object currField = extractCurrentField(p, s.getStructFieldTypeInfo(fieldName), false);
-    r.set(fpos, currField);
-  }
-
-  public String getHiveInternalColumnName(int fpos) {
-    return HiveConf.getColumnInternalName(fpos);
-  }
-
-  public int getPositionFromHiveInternalColumnName(String internalName) {
-    //    return HiveConf.getPositionFromInternalName(fieldName);
-    // The above line should have been all the implementation that
-    // we need, but due to a bug in that impl which recognizes
-    // only single-digit columns, we need another impl here.
-    Pattern internalPattern = Pattern.compile("_col([0-9]+)");
-    Matcher m = internalPattern.matcher(internalName);
-    if (!m.matches()) {
-      return -1;
-    } else {
-      return Integer.parseInt(m.group(1));
-    }
-  }
-
-  /**
-   * Utility method to extract (and forget) the next value token from the JsonParser,
-   * as a whole. The reason this function gets called is to yank out the next value altogether,
-   * because it corresponds to a field name that we do not recognize, and thus, do not have
-   * a schema/type for. Thus, this field is to be ignored.
-   *
-   * @throws IOException
-   * @throws JsonParseException
-   */
-  private void skipValue(JsonParser p) throws JsonParseException, IOException {
-    JsonToken valueToken = p.nextToken();
-
-    if ((valueToken == JsonToken.START_ARRAY) || (valueToken == JsonToken.START_OBJECT)) {
-      // if the currently read token is a beginning of an array or object, move stream forward
-      // skipping any child tokens till we're at the corresponding END_ARRAY or END_OBJECT token
-      p.skipChildren();
-    }
-    // At the end of this function, the stream should be pointing to the last token that
-    // corresponds to the value being skipped. This way, the next call to nextToken
-    // will advance it to the next field name.
-  }
-
-  /**
-   * Utility method to extract current expected field from given JsonParser
-   * isTokenCurrent is a boolean variable also passed in, which determines
-   * if the JsonParser is already at the token we expect to read next, or
-   * needs advancing to the next before we read.
-   */
-  private Object extractCurrentField(JsonParser p, TypeInfo fieldTypeInfo,
-    boolean isTokenCurrent) throws IOException {
-    Object val = null;
-    JsonToken valueToken;
-    if (isTokenCurrent) {
-      valueToken = p.getCurrentToken();
-    } else {
-      valueToken = p.nextToken();
-    }
-
-    switch (fieldTypeInfo.getCategory()) {
-      case PRIMITIVE:
-        PrimitiveObjectInspector.PrimitiveCategory primitiveCategory = PrimitiveObjectInspector.PrimitiveCategory.UNKNOWN;
-        if (fieldTypeInfo instanceof PrimitiveTypeInfo) {
-          primitiveCategory = ((PrimitiveTypeInfo) fieldTypeInfo).getPrimitiveCategory();
-        }
-        switch (primitiveCategory) {
-          case INT:
-            val = (valueToken == JsonToken.VALUE_NULL) ? null : p.getIntValue();
-            break;
-          case BYTE:
-            val = (valueToken == JsonToken.VALUE_NULL) ? null : p.getByteValue();
-            break;
-          case SHORT:
-            val = (valueToken == JsonToken.VALUE_NULL) ? null : p.getShortValue();
-            break;
-          case LONG:
-            val = (valueToken == JsonToken.VALUE_NULL) ? null : p.getLongValue();
-            break;
-          case BOOLEAN:
-            String bval = (valueToken == JsonToken.VALUE_NULL) ? null : p.getText();
-            if (bval != null) {
-              val = Boolean.valueOf(bval);
-            } else {
-              val = null;
-            }
-            break;
-          case FLOAT:
-            val = (valueToken == JsonToken.VALUE_NULL) ? null : p.getFloatValue();
-            break;
-          case DOUBLE:
-            val = (valueToken == JsonToken.VALUE_NULL) ? null : p.getDoubleValue();
-            break;
-          case STRING:
-            val = (valueToken == JsonToken.VALUE_NULL) ? null : p.getText();
-            break;
-          case BINARY:
-            String b = (valueToken == JsonToken.VALUE_NULL) ? null : p.getText();
-            if (b != null) {
-              try {
-                String t = Text.decode(b.getBytes(), 0, b.getBytes().length);
-                return t.getBytes();
-              } catch (CharacterCodingException e) {
-                LOG.warn("Error generating json binary type from object.", e);
-                return null;
-              }
-            } else {
-              val = null;
-            }
-            break;
-          case DATE:
-            val = (valueToken == JsonToken.VALUE_NULL) ? null : Date.valueOf(p.getText());
-            break;
-          case TIMESTAMP:
-            val = (valueToken == JsonToken.VALUE_NULL) ? null : tsParser.parseTimestamp(p.getText());
-            break;
-          case DECIMAL:
-            val = (valueToken == JsonToken.VALUE_NULL) ? null : HiveDecimal.create(p.getText());
-            break;
-          case VARCHAR:
-            int vLen = ((BaseCharTypeInfo) fieldTypeInfo).getLength();
-            val = (valueToken == JsonToken.VALUE_NULL) ? null : new HiveVarchar(p.getText(), vLen);
-            break;
-          case CHAR:
-            int cLen = ((BaseCharTypeInfo) fieldTypeInfo).getLength();
-            val = (valueToken == JsonToken.VALUE_NULL) ? null : new HiveChar(p.getText(), cLen);
-            break;
-        }
-        break;
-      case LIST:
-        if (valueToken == JsonToken.VALUE_NULL) {
-          val = null;
-          break;
-        }
-        if (valueToken != JsonToken.START_ARRAY) {
-          throw new IOException("Start of Array expected");
-        }
-        List<Object> arr = new ArrayList<Object>();
-        while ((valueToken = p.nextToken()) != JsonToken.END_ARRAY) {
-          arr.add(extractCurrentField(p, ((ListTypeInfo)fieldTypeInfo).getListElementTypeInfo(), true));
-        }
-        val = arr;
-        break;
-      case MAP:
-        if (valueToken == JsonToken.VALUE_NULL) {
-          val = null;
-          break;
-        }
-        if (valueToken != JsonToken.START_OBJECT) {
-          throw new IOException("Start of Object expected");
-        }
-        Map<Object, Object> map = new LinkedHashMap<Object, Object>();
-        while ((valueToken = p.nextToken()) != JsonToken.END_OBJECT) {
-          Object k = getObjectOfCorrespondingPrimitiveType(p.getCurrentName(),
-            (PrimitiveTypeInfo) ((MapTypeInfo)fieldTypeInfo).getMapKeyTypeInfo());
-          Object v = extractCurrentField(p, ((MapTypeInfo) fieldTypeInfo).getMapValueTypeInfo(), false);
-          map.put(k, v);
-        }
-        val = map;
-        break;
-      case STRUCT:
-        if (valueToken == JsonToken.VALUE_NULL) {
-          val = null;
-          break;
-        }
-        if (valueToken != JsonToken.START_OBJECT) {
-          throw new IOException("Start of Object expected");
-        }
-        ArrayList<TypeInfo> subSchema = ((StructTypeInfo)fieldTypeInfo).getAllStructFieldTypeInfos();
-        int sz = subSchema.size();
-        List<Object> struct = new ArrayList<Object>(Collections.nCopies(sz, null));
-        while ((valueToken = p.nextToken()) != JsonToken.END_OBJECT) {
-          populateRecord(struct, valueToken, p, ((StructTypeInfo) fieldTypeInfo));
-        }
-        val = struct;
-        break;
-      default:
-        LOG.error("Unknown type found: " + fieldTypeInfo);
-        return null;
-    }
-    return val;
-  }
-
-  private Object getObjectOfCorrespondingPrimitiveType(String s, PrimitiveTypeInfo mapKeyType)
-    throws IOException {
-    switch (mapKeyType.getPrimitiveCategory()) {
-      case INT:
-        return Integer.valueOf(s);
-      case BYTE:
-        return Byte.valueOf(s);
-      case SHORT:
-        return Short.valueOf(s);
-      case LONG:
-        return Long.valueOf(s);
-      case BOOLEAN:
-        return (s.equalsIgnoreCase("true"));
-      case FLOAT:
-        return Float.valueOf(s);
-      case DOUBLE:
-        return Double.valueOf(s);
-      case STRING:
-        return s;
-      case BINARY:
-        try {
-          String t = Text.decode(s.getBytes(), 0, s.getBytes().length);
-          return t.getBytes();
-        } catch (CharacterCodingException e) {
-          LOG.warn("Error generating json binary type from object.", e);
-          return null;
-        }
-      case DATE:
-        return Date.valueOf(s);
-      case TIMESTAMP:
-        return Timestamp.valueOf(s);
-      case DECIMAL:
-        return HiveDecimal.create(s);
-      case VARCHAR:
-        return new HiveVarchar(s, ((BaseCharTypeInfo) mapKeyType).getLength());
-      case CHAR:
-        return new HiveChar(s, ((BaseCharTypeInfo) mapKeyType).getLength());
-    }
-    throw new IOException("Could not convert from string to map type " + mapKeyType.getTypeName());
   }
 
   /**
@@ -462,7 +176,6 @@ public class JsonSerDe extends AbstractSerDe {
   private static StringBuilder appendWithQuotes(StringBuilder sb, String value) {
     return sb == null ? null : sb.append(SerDeUtils.QUOTE).append(value).append(SerDeUtils.QUOTE);
   }
-
   // TODO : code section copied over from SerDeUtils because of non-standard json production there
   // should use quotes for all field names. We should fix this there, and then remove this copy.
   // See http://jackson.codehaus.org/1.7.3/javadoc/org/codehaus/jackson/JsonParser.Feature.html#ALLOW_UNQUOTED_FIELD_NAMES
@@ -472,184 +185,184 @@ public class JsonSerDe extends AbstractSerDe {
   private static void buildJSONString(StringBuilder sb, Object o, ObjectInspector oi) throws IOException {
 
     switch (oi.getCategory()) {
-      case PRIMITIVE: {
-        PrimitiveObjectInspector poi = (PrimitiveObjectInspector) oi;
-        if (o == null) {
-          sb.append("null");
-        } else {
-          switch (poi.getPrimitiveCategory()) {
-            case BOOLEAN: {
-              boolean b = ((BooleanObjectInspector) poi).get(o);
-              sb.append(b ? "true" : "false");
-              break;
-            }
-            case BYTE: {
-              sb.append(((ByteObjectInspector) poi).get(o));
-              break;
-            }
-            case SHORT: {
-              sb.append(((ShortObjectInspector) poi).get(o));
-              break;
-            }
-            case INT: {
-              sb.append(((IntObjectInspector) poi).get(o));
-              break;
-            }
-            case LONG: {
-              sb.append(((LongObjectInspector) poi).get(o));
-              break;
-            }
-            case FLOAT: {
-              sb.append(((FloatObjectInspector) poi).get(o));
-              break;
-            }
-            case DOUBLE: {
-              sb.append(((DoubleObjectInspector) poi).get(o));
-              break;
-            }
-            case STRING: {
-              String s =
-                SerDeUtils.escapeString(((StringObjectInspector) poi).getPrimitiveJavaObject(o));
-              appendWithQuotes(sb, s);
-              break;
-            }
-            case BINARY:
-              byte[] b = ((BinaryObjectInspector) oi).getPrimitiveJavaObject(o);
-              Text txt = new Text();
-              txt.set(b, 0, b.length);
-              appendWithQuotes(sb, SerDeUtils.escapeString(txt.toString()));
-              break;
-            case DATE:
-              Date d = ((DateObjectInspector) poi).getPrimitiveJavaObject(o);
-              appendWithQuotes(sb, d.toString());
-              break;
-            case TIMESTAMP: {
-              Timestamp t = ((TimestampObjectInspector) poi).getPrimitiveJavaObject(o);
-              appendWithQuotes(sb, t.toString());
-              break;
-            }
-            case DECIMAL:
-              sb.append(((HiveDecimalObjectInspector) poi).getPrimitiveJavaObject(o));
-              break;
-            case VARCHAR: {
-              String s = SerDeUtils.escapeString(
-                ((HiveVarcharObjectInspector) poi).getPrimitiveJavaObject(o).toString());
-              appendWithQuotes(sb, s);
-              break;
-            }
-            case CHAR: {
-              //this should use HiveChar.getPaddedValue() but it's protected; currently (v0.13)
-              // HiveChar.toString() returns getPaddedValue()
-              String s = SerDeUtils.escapeString(
-                ((HiveCharObjectInspector) poi).getPrimitiveJavaObject(o).toString());
-              appendWithQuotes(sb, s);
-              break;
-            }
-            default:
-              throw new RuntimeException("Unknown primitive type: " + poi.getPrimitiveCategory());
-          }
+    case PRIMITIVE: {
+      PrimitiveObjectInspector poi = (PrimitiveObjectInspector) oi;
+      if (o == null) {
+        sb.append("null");
+      } else {
+        switch (poi.getPrimitiveCategory()) {
+        case BOOLEAN: {
+          boolean b = ((BooleanObjectInspector) poi).get(o);
+          sb.append(b ? "true" : "false");
+          break;
+        }
+        case BYTE: {
+          sb.append(((ByteObjectInspector) poi).get(o));
+          break;
+        }
+        case SHORT: {
+          sb.append(((ShortObjectInspector) poi).get(o));
+          break;
+        }
+        case INT: {
+          sb.append(((IntObjectInspector) poi).get(o));
+          break;
+        }
+        case LONG: {
+          sb.append(((LongObjectInspector) poi).get(o));
+          break;
+        }
+        case FLOAT: {
+          sb.append(((FloatObjectInspector) poi).get(o));
+          break;
+        }
+        case DOUBLE: {
+          sb.append(((DoubleObjectInspector) poi).get(o));
+          break;
+        }
+        case STRING: {
+          String s =
+              SerDeUtils.escapeString(((StringObjectInspector) poi).getPrimitiveJavaObject(o));
+          appendWithQuotes(sb, s);
+          break;
+        }
+        case BINARY:
+          byte[] b = ((BinaryObjectInspector) oi).getPrimitiveJavaObject(o);
+          Text txt = new Text();
+          txt.set(b, 0, b.length);
+          appendWithQuotes(sb, SerDeUtils.escapeString(txt.toString()));
+          break;
+        case DATE:
+          Date d = ((DateObjectInspector) poi).getPrimitiveJavaObject(o);
+          appendWithQuotes(sb, d.toString());
+          break;
+        case TIMESTAMP: {
+          Timestamp t = ((TimestampObjectInspector) poi).getPrimitiveJavaObject(o);
+          appendWithQuotes(sb, t.toString());
+          break;
+        }
+        case DECIMAL:
+          sb.append(((HiveDecimalObjectInspector) poi).getPrimitiveJavaObject(o));
+          break;
+        case VARCHAR: {
+          String s = SerDeUtils.escapeString(
+              ((HiveVarcharObjectInspector) poi).getPrimitiveJavaObject(o).toString());
+          appendWithQuotes(sb, s);
+          break;
+        }
+        case CHAR: {
+          //this should use HiveChar.getPaddedValue() but it's protected; currently (v0.13)
+          // HiveChar.toString() returns getPaddedValue()
+          String s = SerDeUtils.escapeString(
+              ((HiveCharObjectInspector) poi).getPrimitiveJavaObject(o).toString());
+          appendWithQuotes(sb, s);
+          break;
+        }
+        default:
+          throw new RuntimeException("Unknown primitive type: " + poi.getPrimitiveCategory());
         }
-        break;
       }
-      case LIST: {
-        ListObjectInspector loi = (ListObjectInspector) oi;
-        ObjectInspector listElementObjectInspector = loi
+      break;
+    }
+    case LIST: {
+      ListObjectInspector loi = (ListObjectInspector) oi;
+      ObjectInspector listElementObjectInspector = loi
           .getListElementObjectInspector();
-        List<?> olist = loi.getList(o);
-        if (olist == null) {
-          sb.append("null");
-        } else {
-          sb.append(SerDeUtils.LBRACKET);
-          for (int i = 0; i < olist.size(); i++) {
-            if (i > 0) {
-              sb.append(SerDeUtils.COMMA);
-            }
-            buildJSONString(sb, olist.get(i), listElementObjectInspector);
+      List<?> olist = loi.getList(o);
+      if (olist == null) {
+        sb.append("null");
+      } else {
+        sb.append(SerDeUtils.LBRACKET);
+        for (int i = 0; i < olist.size(); i++) {
+          if (i > 0) {
+            sb.append(SerDeUtils.COMMA);
           }
-          sb.append(SerDeUtils.RBRACKET);
+          buildJSONString(sb, olist.get(i), listElementObjectInspector);
         }
-        break;
+        sb.append(SerDeUtils.RBRACKET);
       }
-      case MAP: {
-        MapObjectInspector moi = (MapObjectInspector) oi;
-        ObjectInspector mapKeyObjectInspector = moi.getMapKeyObjectInspector();
-        ObjectInspector mapValueObjectInspector = moi
+      break;
+    }
+    case MAP: {
+      MapObjectInspector moi = (MapObjectInspector) oi;
+      ObjectInspector mapKeyObjectInspector = moi.getMapKeyObjectInspector();
+      ObjectInspector mapValueObjectInspector = moi
           .getMapValueObjectInspector();
-        Map<?, ?> omap = moi.getMap(o);
-        if (omap == null) {
-          sb.append("null");
-        } else {
-          sb.append(SerDeUtils.LBRACE);
-          boolean first = true;
-          for (Object entry : omap.entrySet()) {
-            if (first) {
-              first = false;
-            } else {
-              sb.append(SerDeUtils.COMMA);
-            }
-            Map.Entry<?, ?> e = (Map.Entry<?, ?>) entry;
-            StringBuilder keyBuilder = new StringBuilder();
-            buildJSONString(keyBuilder, e.getKey(), mapKeyObjectInspector);
-            String keyString = keyBuilder.toString().trim();
-            if ((!keyString.isEmpty()) && (keyString.charAt(0) != SerDeUtils.QUOTE)) {
-              appendWithQuotes(sb, keyString);
-            } else {
-              sb.append(keyString);
-            }
-            sb.append(SerDeUtils.COLON);
-            buildJSONString(sb, e.getValue(), mapValueObjectInspector);
+      Map<?, ?> omap = moi.getMap(o);
+      if (omap == null) {
+        sb.append("null");
+      } else {
+        sb.append(SerDeUtils.LBRACE);
+        boolean first = true;
+        for (Object entry : omap.entrySet()) {
+          if (first) {
+            first = false;
+          } else {
+            sb.append(SerDeUtils.COMMA);
+          }
+          Map.Entry<?, ?> e = (Map.Entry<?, ?>) entry;
+          StringBuilder keyBuilder = new StringBuilder();
+          buildJSONString(keyBuilder, e.getKey(), mapKeyObjectInspector);
+          String keyString = keyBuilder.toString().trim();
+          if ((!keyString.isEmpty()) && (keyString.charAt(0) != SerDeUtils.QUOTE)) {
+            appendWithQuotes(sb, keyString);
+          } else {
+            sb.append(keyString);
           }
-          sb.append(SerDeUtils.RBRACE);
+          sb.append(SerDeUtils.COLON);
+          buildJSONString(sb, e.getValue(), mapValueObjectInspector);
         }
-        break;
+        sb.append(SerDeUtils.RBRACE);
       }
-      case STRUCT: {
-        StructObjectInspector soi = (StructObjectInspector) oi;
-        List<? extends StructField> structFields = soi.getAllStructFieldRefs();
-        if (o == null) {
-          sb.append("null");
-        } else {
-          sb.append(SerDeUtils.LBRACE);
-          for (int i = 0; i < structFields.size(); i++) {
-            if (i > 0) {
-              sb.append(SerDeUtils.COMMA);
-            }
-            appendWithQuotes(sb, structFields.get(i).getFieldName());
-            sb.append(SerDeUtils.COLON);
-            buildJSONString(sb, soi.getStructFieldData(o, structFields.get(i)),
-              structFields.get(i).getFieldObjectInspector());
+      break;
+    }
+    case STRUCT: {
+      StructObjectInspector soi = (StructObjectInspector) oi;
+      List<? extends StructField> structFields = soi.getAllStructFieldRefs();
+      if (o == null) {
+        sb.append("null");
+      } else {
+        sb.append(SerDeUtils.LBRACE);
+        for (int i = 0; i < structFields.size(); i++) {
+          if (i > 0) {
+            sb.append(SerDeUtils.COMMA);
           }
-          sb.append(SerDeUtils.RBRACE);
+          appendWithQuotes(sb, structFields.get(i).getFieldName());
+          sb.append(SerDeUtils.COLON);
+          buildJSONString(sb, soi.getStructFieldData(o, structFields.get(i)),
+              structFields.get(i).getFieldObjectInspector());
         }
-        break;
+        sb.append(SerDeUtils.RBRACE);
       }
-      case UNION: {
-        UnionObjectInspector uoi = (UnionObjectInspector) oi;
-        if (o == null) {
-          sb.append("null");
-        } else {
-          sb.append(SerDeUtils.LBRACE);
-          sb.append(uoi.getTag(o));
-          sb.append(SerDeUtils.COLON);
-          buildJSONString(sb, uoi.getField(o),
+      break;
+    }
+    case UNION: {
+      UnionObjectInspector uoi = (UnionObjectInspector) oi;
+      if (o == null) {
+        sb.append("null");
+      } else {
+        sb.append(SerDeUtils.LBRACE);
+        sb.append(uoi.getTag(o));
+        sb.append(SerDeUtils.COLON);
+        buildJSONString(sb, uoi.getField(o),
             uoi.getObjectInspectors().get(uoi.getTag(o)));
-          sb.append(SerDeUtils.RBRACE);
-        }
-        break;
+        sb.append(SerDeUtils.RBRACE);
       }
-      default:
-        throw new RuntimeException("Unknown type in ObjectInspector!");
+      break;
+    }
+    default:
+      throw new RuntimeException("Unknown type in ObjectInspector!");
     }
   }
 
 
   /**
-   * Returns an object inspector for the specified schema that
-   * is capable of reading in the object representation of the JSON string
+   *  Returns an object inspector for the specified schema that
+   *  is capable of reading in the object representation of the JSON string
    */
   @Override
   public ObjectInspector getObjectInspector() throws SerDeException {
-    return cachedObjectInspector;
+    return structReader.getObjectInspector();
   }
 
   @Override
@@ -663,4 +376,12 @@ public class JsonSerDe extends AbstractSerDe {
     return null;
   }
 
+  public StructTypeInfo getTypeInfo() {
+    return rowTypeInfo;
+  }
+
+  public void setWriteablesUsage(boolean b) {
+    structReader.setWritablesUsage(b);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/1105ef39/serde/src/java/org/apache/hadoop/hive/serde2/json/HiveJsonStructReader.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/json/HiveJsonStructReader.java b/serde/src/java/org/apache/hadoop/hive/serde2/json/HiveJsonStructReader.java
new file mode 100644
index 0000000..ec4efad
--- /dev/null
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/json/HiveJsonStructReader.java
@@ -0,0 +1,402 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.serde2.json;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.CharacterCodingException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.hive.common.type.Date;
+import org.apache.hadoop.hive.common.type.HiveChar;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.common.type.HiveVarchar;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.BaseCharTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hive.common.util.TimestampParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+
+public class HiveJsonStructReader {
+
+  private static final Logger LOG = LoggerFactory.getLogger(HiveJsonStructReader.class);
+
+  private ObjectInspector oi;
+  private JsonFactory factory;
+
+
+  Set<String> reportedUnknownFieldNames = new HashSet<>();
+
+  private static boolean ignoreUnknownFields;
+  private static boolean hiveColIndexParsing;
+  private boolean writeablePrimitives;
+
+  private TimestampParser tsParser;
+
+  public HiveJsonStructReader(TypeInfo t) {
+    this(t, new TimestampParser());
+  }
+
+  public HiveJsonStructReader(TypeInfo t, TimestampParser tsParser) {
+    this.tsParser = tsParser;
+    oi = TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(t);
+    factory = new JsonFactory();
+  }
+
+  public Object parseStruct(String text) throws JsonParseException, IOException, SerDeException {
+    JsonParser parser = factory.createParser(text);
+    return parseInternal(parser);
+  }
+
+  public Object parseStruct(InputStream is) throws JsonParseException, IOException, SerDeException {
+    JsonParser parser = factory.createParser(is);
+    return parseInternal(parser);
+  }
+
+  private Object parseInternal(JsonParser parser) throws SerDeException {
+    try {
+      parser.nextToken();
+      Object res = parseDispatcher(parser, oi);
+      return res;
+    } catch (Exception e) {
+      String locationStr = parser.getCurrentLocation().getLineNr() + "," + parser.getCurrentLocation().getColumnNr();
+      throw new SerDeException("at[" + locationStr + "]: " + e.getMessage(), e);
+    }
+  }
+
+  private Object parseDispatcher(JsonParser parser, ObjectInspector oi)
+      throws JsonParseException, IOException, SerDeException {
+
+    switch (oi.getCategory()) {
+    case PRIMITIVE:
+      return parsePrimitive(parser, (PrimitiveObjectInspector) oi);
+    case LIST:
+      return parseList(parser, (ListObjectInspector) oi);
+    case STRUCT:
+      return parseStruct(parser, (StructObjectInspector) oi);
+    case MAP:
+      return parseMap(parser, (MapObjectInspector) oi);
+    default:
+      throw new SerDeException("parsing of: " + oi.getCategory() + " is not handled");
+    }
+  }
+
+  private Object parseMap(JsonParser parser, MapObjectInspector oi) throws IOException, SerDeException {
+
+    if (parser.getCurrentToken() == JsonToken.VALUE_NULL) {
+      parser.nextToken();
+      return null;
+    }
+
+    Map<Object, Object> ret = new LinkedHashMap<>();
+
+    if (parser.getCurrentToken() != JsonToken.START_OBJECT) {
+      throw new SerDeException("struct expected");
+    }
+
+    if (!(oi.getMapKeyObjectInspector() instanceof PrimitiveObjectInspector)) {
+      throw new SerDeException("map key must be a primitive");
+    }
+    PrimitiveObjectInspector keyOI = (PrimitiveObjectInspector) oi.getMapKeyObjectInspector();
+    ObjectInspector valOI = oi.getMapValueObjectInspector();
+
+    JsonToken currentToken = parser.nextToken();
+    while (currentToken != null && currentToken != JsonToken.END_OBJECT) {
+
+      if (currentToken != JsonToken.FIELD_NAME) {
+        throw new SerDeException("unexpected token: " + currentToken);
+      }
+
+      Object key = parseMapKey(parser, keyOI);
+      Object val = parseDispatcher(parser, valOI);
+      ret.put(key, val);
+
+      currentToken = parser.getCurrentToken();
+    }
+    if (currentToken != null) {
+      parser.nextToken();
+    }
+    return ret;
+
+  }
+
+  private Object parseStruct(JsonParser parser, StructObjectInspector oi)
+      throws JsonParseException, IOException, SerDeException {
+
+    Object[] ret = new Object[oi.getAllStructFieldRefs().size()];
+
+    if (parser.getCurrentToken() == JsonToken.VALUE_NULL) {
+      parser.nextToken();
+      return null;
+    }
+    if (parser.getCurrentToken() != JsonToken.START_OBJECT) {
+      throw new SerDeException("struct expected");
+    }
+    JsonToken currentToken = parser.nextToken();
+    while (currentToken != null && currentToken != JsonToken.END_OBJECT) {
+
+      switch (currentToken) {
+      case FIELD_NAME:
+        String name = parser.getCurrentName();
+        try {
+          StructField field = null;
+          try {
+            field = getStructField(oi, name);
+          } catch (RuntimeException e) {
+            if (ignoreUnknownFields) {
+              if (!reportedUnknownFieldNames.contains(name)) {
+                LOG.warn("ignoring field:" + name);
+                reportedUnknownFieldNames.add(name);
+              }
+              parser.nextToken();
+              skipValue(parser);
+              break;
+            }
+          }
+          if (field == null) {
+            throw new SerDeException("undeclared field");
+          }
+          parser.nextToken();
+          ret[field.getFieldID()] = parseDispatcher(parser, field.getFieldObjectInspector());
+        } catch (Exception e) {
+          throw new SerDeException("struct field " + name + ": " + e.getMessage(), e);
+        }
+        break;
+      default:
+        throw new SerDeException("unexpected token: " + currentToken);
+      }
+      currentToken = parser.getCurrentToken();
+    }
+    if (currentToken != null) {
+      parser.nextToken();
+    }
+    return ret;
+  }
+
+  private StructField getStructField(StructObjectInspector oi, String name) {
+    if (hiveColIndexParsing) {
+      int colIndex = getColIndex(name);
+      if (colIndex >= 0) {
+        return oi.getAllStructFieldRefs().get(colIndex);
+      }
+    }
+    // FIXME: linear scan inside the below method...get a map here or something..
+    return oi.getStructFieldRef(name);
+  }
+
+  Pattern internalPattern = Pattern.compile("^_col([0-9]+)$");
+
+  private int getColIndex(String internalName) {
+    // The above line should have been all the implementation that
+    // we need, but due to a bug in that impl which recognizes
+    // only single-digit columns, we need another impl here.
+    Matcher m = internalPattern.matcher(internalName);
+    if (!m.matches()) {
+      return -1;
+    } else {
+      return Integer.parseInt(m.group(1));
+    }
+  }
+
+  private static void skipValue(JsonParser parser) throws JsonParseException, IOException {
+
+    int array = 0;
+    int object = 0;
+    do {
+      JsonToken currentToken = parser.getCurrentToken();
+      if(currentToken == JsonToken.START_ARRAY) {
+        array++;
+      }
+      if (currentToken == JsonToken.END_ARRAY) {
+        array--;
+      }
+      if (currentToken == JsonToken.START_OBJECT) {
+        object++;
+      }
+      if (currentToken == JsonToken.END_OBJECT) {
+        object--;
+      }
+
+      parser.nextToken();
+
+    } while (array > 0 || object > 0);
+
+  }
+
+  private Object parseList(JsonParser parser, ListObjectInspector oi)
+      throws JsonParseException, IOException, SerDeException {
+    List<Object> ret = new ArrayList<>();
+
+    if (parser.getCurrentToken() == JsonToken.VALUE_NULL) {
+      parser.nextToken();
+      return null;
+    }
+
+    if (parser.getCurrentToken() != JsonToken.START_ARRAY) {
+      throw new SerDeException("array expected");
+    }
+    ObjectInspector eOI = oi.getListElementObjectInspector();
+    JsonToken currentToken = parser.nextToken();
+    try {
+      while (currentToken != null && currentToken != JsonToken.END_ARRAY) {
+        ret.add(parseDispatcher(parser, eOI));
+        currentToken = parser.getCurrentToken();
+      }
+    } catch (Exception e) {
+      throw new SerDeException("array: " + e.getMessage(), e);
+    }
+
+    currentToken = parser.nextToken();
+
+    return ret;
+  }
+
+  private Object parsePrimitive(JsonParser parser, PrimitiveObjectInspector oi)
+      throws SerDeException, IOException {
+    JsonToken currentToken = parser.getCurrentToken();
+    if (currentToken == null) {
+      return null;
+    }
+    try {
+      switch (parser.getCurrentToken()) {
+      case VALUE_FALSE:
+      case VALUE_TRUE:
+      case VALUE_NUMBER_INT:
+      case VALUE_NUMBER_FLOAT:
+      case VALUE_STRING:
+        return getObjectOfCorrespondingPrimitiveType(parser.getValueAsString(), oi);
+      case VALUE_NULL:
+        return null;
+      default:
+        throw new SerDeException("unexpected token type: " + currentToken);
+      }
+    } finally {
+      parser.nextToken();
+
+    }
+  }
+
+  private Object getObjectOfCorrespondingPrimitiveType(String s, PrimitiveObjectInspector oi)
+      throws IOException {
+    PrimitiveTypeInfo typeInfo = oi.getTypeInfo();
+    if (writeablePrimitives) {
+      Converter c = ObjectInspectorConverters.getConverter(PrimitiveObjectInspectorFactory.javaStringObjectInspector, oi);
+      return c.convert(s);
+    }
+
+    switch (typeInfo.getPrimitiveCategory()) {
+    case INT:
+      return Integer.valueOf(s);
+    case BYTE:
+      return Byte.valueOf(s);
+    case SHORT:
+      return Short.valueOf(s);
+    case LONG:
+      return Long.valueOf(s);
+    case BOOLEAN:
+      return (s.equalsIgnoreCase("true"));
+    case FLOAT:
+      return Float.valueOf(s);
+    case DOUBLE:
+      return Double.valueOf(s);
+    case STRING:
+      return s;
+    case BINARY:
+      try {
+        String t = Text.decode(s.getBytes(), 0, s.getBytes().length);
+        return t.getBytes();
+      } catch (CharacterCodingException e) {
+        LOG.warn("Error generating json binary type from object.", e);
+        return null;
+      }
+    case DATE:
+      return Date.valueOf(s);
+    case TIMESTAMP:
+      return tsParser.parseTimestamp(s);
+    case DECIMAL:
+      return HiveDecimal.create(s);
+    case VARCHAR:
+      return new HiveVarchar(s, ((BaseCharTypeInfo) typeInfo).getLength());
+    case CHAR:
+      return new HiveChar(s, ((BaseCharTypeInfo) typeInfo).getLength());
+    }
+    throw new IOException("Could not convert from string to map type " + typeInfo.getTypeName());
+  }
+
+  private Object parseMapKey(JsonParser parser, PrimitiveObjectInspector oi) throws SerDeException, IOException {
+    JsonToken currentToken = parser.getCurrentToken();
+    if (currentToken == null) {
+      return null;
+    }
+    try {
+      switch (parser.getCurrentToken()) {
+      case FIELD_NAME:
+        return getObjectOfCorrespondingPrimitiveType(parser.getValueAsString(), oi);
+      case VALUE_NULL:
+        return null;
+      default:
+        throw new SerDeException("unexpected token type: " + currentToken);
+      }
+    } finally {
+      parser.nextToken();
+
+    }
+  }
+
+  public void setIgnoreUnknownFields(boolean b) {
+    ignoreUnknownFields = b;
+  }
+
+  public void enableHiveColIndexParsing(boolean b) {
+    hiveColIndexParsing = b;
+  }
+
+  public void setWritablesUsage(boolean b) {
+    writeablePrimitives = b;
+  }
+
+  public ObjectInspector getObjectInspector() {
+    return oi;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/1105ef39/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java b/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java
index cabb64c..416bd67 100644
--- a/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java
+++ b/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java
@@ -30,7 +30,7 @@ import com.google.common.base.Joiner;
 
 /**
  * Streaming Writer handles utf8 encoded Json (Strict syntax).
- * Uses org.apache.hadoop.hive.serde2.JsonSerDe to process Json input
+ * Uses {@link JsonSerDe} to process Json input
  *
  * NOTE: This record writer is NOT thread-safe. Use one record writer per streaming connection.
  */