You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/09/05 23:45:33 UTC

svn commit: r1622804 - in /hive/trunk: contrib/src/java/org/apache/hadoop/hive/contrib/serde2/ serde/src/java/org/apache/hadoop/hive/serde2/lazy/ serde/src/test/org/apache/hadoop/hive/serde2/lazy/

Author: brock
Date: Fri Sep  5 21:45:32 2014
New Revision: 1622804

URL: http://svn.apache.org/r1622804
Log:
HIVE-5871 - Use multiple-characters as field delimiter (Rui Li via Brock)

Added:
    hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/serde2/MultiDelimitSerDe.java
Modified:
    hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyBinary.java
    hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyStruct.java
    hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/lazy/TestLazyPrimitive.java

Added: hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/serde2/MultiDelimitSerDe.java
URL: http://svn.apache.org/viewvc/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/serde2/MultiDelimitSerDe.java?rev=1622804&view=auto
==============================================================================
--- hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/serde2/MultiDelimitSerDe.java (added)
+++ hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/serde2/MultiDelimitSerDe.java Fri Sep  5 21:45:32 2014
@@ -0,0 +1,287 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.contrib.serde2;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.*;
+import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef;
+import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.hive.serde2.lazy.LazyStruct;
+import org.apache.hadoop.hive.serde2.lazy.LazyUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * This SerDe allows user to use multiple characters as the field delimiter for a table.
+ * To use this SerDe, user has to specify field.delim in SERDEPROPERTIES.
+ * If the table contains a column which is a collection or map, user can optionally
+ * specify collection.delim or mapkey.delim as well.
+ * Currently field.delim can be multiple character while collection.delim
+ * and mapkey.delim should be just single character.
+ */
+public class MultiDelimitSerDe extends AbstractSerDe {
+  private static final Log LOG = LogFactory.getLog(MultiDelimitSerDe.class.getName());
+  private static final byte[] DEFAULT_SEPARATORS = {(byte) 1, (byte) 2, (byte) 3};
+  // Due to HIVE-6404, define our own constant
+  private static final String COLLECTION_DELIM = "collection.delim";
+
+  private int numColumns;
+  private String fieldDelimited;
+  // we don't support using multiple chars as delimiters within complex types
+  // collection separator
+  private byte collSep;
+  // map key separator
+  private byte keySep;
+
+  // The object for storing row data
+  private LazyStruct cachedLazyStruct;
+  //the lazy struct object inspector
+  private ObjectInspector cachedObjectInspector;
+
+  // The wrapper for byte array
+  private ByteArrayRef byteArrayRef;
+
+  private LazySimpleSerDe.SerDeParameters serdeParams = null;
+  // The output stream of serialized objects
+  private final ByteStream.Output serializeStream = new ByteStream.Output();
+  // The Writable to return in serialize
+  private final Text serializeCache = new Text();
+
+  @Override
+  public void initialize(Configuration conf, Properties tbl) throws SerDeException {
+    // get the SerDe parameters
+    serdeParams = LazySimpleSerDe.initSerdeParams(conf, tbl, getClass().getName());
+
+    fieldDelimited = tbl.getProperty(serdeConstants.FIELD_DELIM);
+    if (fieldDelimited == null || fieldDelimited.isEmpty()) {
+      throw new SerDeException("This table does not have serde property \"field.delim\"!");
+    }
+
+    // get the collection separator and map key separator
+    // TODO: use serdeConstants.COLLECTION_DELIM when the typo is fixed
+    collSep = LazySimpleSerDe.getByte(tbl.getProperty(COLLECTION_DELIM),
+        DEFAULT_SEPARATORS[1]);
+    keySep = LazySimpleSerDe.getByte(tbl.getProperty(serdeConstants.MAPKEY_DELIM),
+        DEFAULT_SEPARATORS[2]);
+    serdeParams.getSeparators()[1] = collSep;
+    serdeParams.getSeparators()[2] = keySep;
+
+    // Create the ObjectInspectors for the fields
+    cachedObjectInspector = LazyFactory.createLazyStructInspector(serdeParams
+        .getColumnNames(), serdeParams.getColumnTypes(), serdeParams
+        .getSeparators(), serdeParams.getNullSequence(), serdeParams
+        .isLastColumnTakesRest(), serdeParams.isEscaped(), serdeParams
+        .getEscapeChar());
+
+    cachedLazyStruct = (LazyStruct) LazyFactory.createLazyObject(cachedObjectInspector);
+
+    assert serdeParams.getColumnNames().size() == serdeParams.getColumnTypes().size();
+    numColumns = serdeParams.getColumnNames().size();
+  }
+
+
+  @Override
+  public ObjectInspector getObjectInspector() throws SerDeException {
+    return cachedObjectInspector;
+  }
+
+  @Override
+  public Class<? extends Writable> getSerializedClass() {
+    return Text.class;
+  }
+
+  @Override
+  public Object deserialize(Writable blob) throws SerDeException {
+    if (byteArrayRef == null) {
+      byteArrayRef = new ByteArrayRef();
+    }
+
+    // we use the default field delimiter('\1') to replace the multiple-char field delimiter
+    // but we cannot use it to parse the row since column data can contain '\1' as well
+    String rowStr;
+    if (blob instanceof BytesWritable) {
+      BytesWritable b = (BytesWritable) blob;
+      rowStr = new String(b.getBytes());
+    } else if (blob instanceof Text) {
+      Text rowText = (Text) blob;
+      rowStr = rowText.toString();
+    } else {
+      throw new SerDeException(getClass() + ": expects either BytesWritable or Text object!");
+    }
+    byteArrayRef.setData(rowStr.replaceAll(Pattern.quote(fieldDelimited), "\1").getBytes());
+    cachedLazyStruct.init(byteArrayRef, 0, byteArrayRef.getData().length);
+    // use the multi-char delimiter to parse the lazy struct
+    cachedLazyStruct.parseMultiDelimit(rowStr.getBytes(), fieldDelimited.getBytes());
+    return cachedLazyStruct;
+  }
+
+  @Override
+  public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException {
+    StructObjectInspector soi = (StructObjectInspector) objInspector;
+    List<? extends StructField> fields = soi.getAllStructFieldRefs();
+    List<Object> list = soi.getStructFieldsDataAsList(obj);
+    if (fields.size() != numColumns) {
+      throw new SerDeException("Cannot serialize the object because there are "
+          + fields.size() + " fields but the table has " + numColumns
+          + " columns.");
+    }
+
+    serializeStream.reset();
+    // Get all data out.
+    for (int c = 0; c < numColumns; c++) {
+      //write the delimiter to the stream, which means we don't need output.format.string anymore
+      if (c > 0) {
+        serializeStream.write(fieldDelimited.getBytes(), 0, fieldDelimited.getBytes().length);
+      }
+
+      Object field = list == null ? null : list.get(c);
+      ObjectInspector fieldOI = fields.get(c).getFieldObjectInspector();
+
+      try {
+        serializeNoEncode(serializeStream, field, fieldOI, serdeParams.getSeparators(), 1,
+            serdeParams.getNullSequence(), serdeParams.isEscaped(), serdeParams.getEscapeChar(),
+            serdeParams.getNeedsEscape());
+      } catch (IOException e) {
+        throw new SerDeException(e);
+      }
+    }
+
+    serializeCache.set(serializeStream.getData(), 0, serializeStream.getLength());
+    return serializeCache;
+  }
+
+  // This is basically the same as LazySimpleSerDe.serialize. Except that we don't use
+  // Base64 to encode binary data because we're using printable string as delimiter.
+  // Consider such a row "strAQ==\1", str is a string, AQ== is the delimiter and \1
+  // is the binary data.
+  private static void serializeNoEncode(ByteStream.Output out, Object obj,
+      ObjectInspector objInspector, byte[] separators, int level,
+      Text nullSequence, boolean escaped, byte escapeChar, boolean[] needsEscape)
+      throws IOException, SerDeException {
+    if (obj == null) {
+      out.write(nullSequence.getBytes(), 0, nullSequence.getLength());
+      return;
+    }
+
+    char separator;
+    List<?> list;
+    switch (objInspector.getCategory()) {
+      case PRIMITIVE:
+        PrimitiveObjectInspector oi = (PrimitiveObjectInspector) objInspector;
+        if (oi.getPrimitiveCategory() == PrimitiveCategory.BINARY) {
+          BytesWritable bw = ((BinaryObjectInspector) oi).getPrimitiveWritableObject(obj);
+          byte[] toWrite = new byte[bw.getLength()];
+          System.arraycopy(bw.getBytes(), 0, toWrite, 0, bw.getLength());
+          out.write(toWrite, 0, toWrite.length);
+        } else {
+          LazyUtils.writePrimitiveUTF8(out, obj, oi, escaped, escapeChar, needsEscape);
+        }
+        return;
+      case LIST:
+        separator = (char) separators[level];
+        ListObjectInspector loi = (ListObjectInspector) objInspector;
+        list = loi.getList(obj);
+        ObjectInspector eoi = loi.getListElementObjectInspector();
+        if (list == null) {
+          out.write(nullSequence.getBytes(), 0, nullSequence.getLength());
+        } else {
+          for (int i = 0; i < list.size(); i++) {
+            if (i > 0) {
+              out.write(separator);
+            }
+            serializeNoEncode(out, list.get(i), eoi, separators, level + 1, nullSequence,
+                escaped, escapeChar, needsEscape);
+          }
+        }
+        return;
+      case MAP:
+        separator = (char) separators[level];
+        char keyValueSeparator = (char) separators[level + 1];
+
+        MapObjectInspector moi = (MapObjectInspector) objInspector;
+        ObjectInspector koi = moi.getMapKeyObjectInspector();
+        ObjectInspector voi = moi.getMapValueObjectInspector();
+        Map<?, ?> map = moi.getMap(obj);
+        if (map == null) {
+          out.write(nullSequence.getBytes(), 0, nullSequence.getLength());
+        } else {
+          boolean first = true;
+          for (Map.Entry<?, ?> entry : map.entrySet()) {
+            if (first) {
+              first = false;
+            } else {
+              out.write(separator);
+            }
+            serializeNoEncode(out, entry.getKey(), koi, separators, level + 2,
+                nullSequence, escaped, escapeChar, needsEscape);
+            out.write(keyValueSeparator);
+            serializeNoEncode(out, entry.getValue(), voi, separators, level + 2,
+                nullSequence, escaped, escapeChar, needsEscape);
+          }
+        }
+        return;
+      case STRUCT:
+        separator = (char) separators[level];
+        StructObjectInspector soi = (StructObjectInspector) objInspector;
+        List<? extends StructField> fields = soi.getAllStructFieldRefs();
+        list = soi.getStructFieldsDataAsList(obj);
+        if (list == null) {
+          out.write(nullSequence.getBytes(), 0, nullSequence.getLength());
+        } else {
+          for (int i = 0; i < list.size(); i++) {
+            if (i > 0) {
+              out.write(separator);
+            }
+            serializeNoEncode(out, list.get(i), fields.get(i).getFieldObjectInspector(),
+                separators, level + 1, nullSequence, escaped, escapeChar,
+                needsEscape);
+          }
+        }
+        return;
+    }
+    throw new RuntimeException("Unknown category type: "+ objInspector.getCategory());
+  }
+
+  @Override
+  public SerDeStats getSerDeStats() {
+    // no support for statistics
+    return null;
+  }
+
+}

Modified: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyBinary.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyBinary.java?rev=1622804&r1=1622803&r2=1622804&view=diff
==============================================================================
--- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyBinary.java (original)
+++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyBinary.java Fri Sep  5 21:45:32 2014
@@ -55,6 +55,8 @@ public class LazyBinary extends LazyPrim
                 "decoded the data.");
     }
     byte[] decoded = arrayByteBase64 ? Base64.decodeBase64(recv) : recv;
+    // use the original bytes in case decoding should fail
+    decoded = decoded.length > 0 ? decoded : recv;
     data.set(decoded, 0, decoded.length);
   }
 

Modified: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyStruct.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyStruct.java?rev=1622804&r1=1622803&r2=1622804&view=diff
==============================================================================
--- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyStruct.java (original)
+++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyStruct.java Fri Sep  5 21:45:32 2014
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import com.google.common.primitives.Bytes;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.serde2.SerDeException;
@@ -28,6 +29,7 @@ import org.apache.hadoop.hive.serde2.Ser
 import org.apache.hadoop.hive.serde2.StructObject;
 import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.io.Text;
 
 /**
@@ -285,4 +287,59 @@ public class LazyStruct extends LazyNonP
   public long getRawDataSerializedSize() {
     return serializedSize;
   }
+
+  // parse the struct using multi-char delimiter
+  public void parseMultiDelimit(byte[] rawRow, byte[] fieldDelimit) {
+    if (rawRow == null || fieldDelimit == null) {
+      return;
+    }
+    if (fields == null) {
+      List<? extends StructField> fieldRefs = ((StructObjectInspector) oi).getAllStructFieldRefs();
+      fields = new LazyObject[fieldRefs.size()];
+      for (int i = 0; i < fields.length; i++) {
+        fields[i] = LazyFactory.createLazyObject(fieldRefs.get(i).getFieldObjectInspector());
+      }
+      fieldInited = new boolean[fields.length];
+      startPosition = new int[fields.length + 1];
+    }
+    // the indexes of the delimiters
+    int[] delimitIndexes = findIndexes(rawRow, fieldDelimit);
+    int diff = fieldDelimit.length - 1;
+    // first field always starts from 0, even when missing
+    startPosition[0] = 0;
+    for (int i = 1; i < fields.length; i++) {
+      if (delimitIndexes[i - 1] != -1) {
+        int start = delimitIndexes[i - 1] + fieldDelimit.length;
+        startPosition[i] = start - i * diff;
+      } else {
+        startPosition[i] = length + 1;
+      }
+    }
+    startPosition[fields.length] = length + 1;
+    Arrays.fill(fieldInited, false);
+    parsed = true;
+  }
+
+  // find all the indexes of the sub byte[]
+  private int[] findIndexes(byte[] array, byte[] target) {
+    if (fields.length <= 1) {
+      return new int[0];
+    }
+    int[] indexes = new int[fields.length - 1];
+    Arrays.fill(indexes, -1);
+    indexes[0] = Bytes.indexOf(array, target);
+    if (indexes[0] == -1) {
+      return indexes;
+    }
+    int indexInNewArray = indexes[0];
+    for (int i = 1; i < indexes.length; i++) {
+      array = Arrays.copyOfRange(array, indexInNewArray + target.length, array.length);
+      indexInNewArray = Bytes.indexOf(array, target);
+      if (indexInNewArray == -1) {
+        break;
+      }
+      indexes[i] = indexInNewArray + indexes[i - 1] + target.length;
+    }
+    return indexes;
+  }
 }

Modified: hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/lazy/TestLazyPrimitive.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/lazy/TestLazyPrimitive.java?rev=1622804&r1=1622803&r2=1622804&view=diff
==============================================================================
--- hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/lazy/TestLazyPrimitive.java (original)
+++ hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/lazy/TestLazyPrimitive.java Fri Sep  5 21:45:32 2014
@@ -388,7 +388,7 @@ public class TestLazyPrimitive extends T
     initLazyObject(ba, new byte[] {'2', '?', '3'}, 0, 3);
     assertEquals(new BytesWritable(new byte[] {'2', '?', '3'}), ba.getWritableObject());
     initLazyObject(ba, new byte[] {'\n'}, 0, 1);
-    assertEquals(new BytesWritable(new byte[] {}), ba.getWritableObject());
+    assertEquals(new BytesWritable(new byte[] {'\n'}), ba.getWritableObject());
   }
 
   public void testLazyTimestamp() throws Throwable {