You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2015/07/20 19:15:02 UTC

[32/50] [abbrv] phoenix git commit: PHOENIX-2067 Sort order incorrect for variable length DESC columns

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2620a80c/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataType.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataType.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataType.java
index 4e32cc0..dd11569 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataType.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataType.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.schema.types;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.sql.Types;
 import java.text.Format;
 import java.util.LinkedList;
 import java.util.List;
@@ -34,61 +35,88 @@ import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.ValueSchema;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TrustedByteArrayOutputStream;
 
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 
 /**
- * The datatype for PColummns that are Arrays. Any variable length array would follow the below order. 
- * Every element would be seperated by a seperator byte '0'. Null elements are counted and once a first 
- * non null element appears we write the count of the nulls prefixed with a seperator byte.
- * Trailing nulls are not taken into account. The last non null element is followed by two seperator bytes. 
- * For eg a, b, null, null, c, null -> 65 0 66 0 0 2 67 0 0 0 
- * a null null null b c null d -> 65 0 0 3 66 0 67 0 0 1 68 0 0 0.
- * The reason we use this serialization format is to allow the
- * byte array of arrays of the same type to be directly comparable against each other. 
- * This prevents a costly deserialization on compare and allows an array column to be used as the last column in a primary key constraint.
+ * The datatype for PColummns that are Arrays. Any variable length array would follow the below order. Every element
+ * would be seperated by a seperator byte '0'. Null elements are counted and once a first non null element appears we
+ * write the count of the nulls prefixed with a seperator byte. Trailing nulls are not taken into account. The last non
+ * null element is followed by two seperator bytes. For eg a, b, null, null, c, null -> 65 0 66 0 0 2 67 0 0 0 a null
+ * null null b c null d -> 65 0 0 3 66 0 67 0 0 1 68 0 0 0. The reason we use this serialization format is to allow the
+ * byte array of arrays of the same type to be directly comparable against each other. This prevents a costly
+ * deserialization on compare and allows an array column to be used as the last column in a primary key constraint.
  */
 public abstract class PArrayDataType<T> extends PDataType<T> {
 
+    @Override
+    public final int getResultSetSqlType() {
+      return Types.ARRAY;
+    }
+
+    @Override
+    public final void coerceBytes(ImmutableBytesWritable ptr, Object object, PDataType actualType,
+        Integer maxLength, Integer scale, SortOrder actualModifer, Integer desiredMaxLength,
+        Integer desiredScale, SortOrder desiredModifier, boolean expectedRowKeyOrderOptimizable) {
+      coerceBytes(ptr, object, actualType, maxLength, scale, desiredMaxLength, desiredScale,
+          this, actualModifer, desiredModifier, expectedRowKeyOrderOptimizable);
+    }
+
+    @Override
+    public final void coerceBytes(ImmutableBytesWritable ptr, Object object, PDataType actualType,
+        Integer maxLength, Integer scale, SortOrder actualModifer, Integer desiredMaxLength,
+        Integer desiredScale, SortOrder desiredModifier) {
+      coerceBytes(ptr, object, actualType, maxLength, scale, desiredMaxLength, desiredScale,
+          this, actualModifer, desiredModifier, true);
+    }
+
     public static final byte ARRAY_SERIALIZATION_VERSION = 1;
 
-  protected PArrayDataType(String sqlTypeName, int sqlType, Class clazz, PDataCodec codec, int ordinal) {
-    super(sqlTypeName, sqlType, clazz, codec, ordinal);
-  }
+    protected PArrayDataType(String sqlTypeName, int sqlType, Class clazz, PDataCodec codec, int ordinal) {
+        super(sqlTypeName, sqlType, clazz, codec, ordinal);
+    }
+
+    private static byte getSeparatorByte(boolean rowKeyOrderOptimizable, SortOrder sortOrder) {
+        return SchemaUtil.getSeparatorByte(rowKeyOrderOptimizable, false, sortOrder);
+    }
 
-  public byte[] toBytes(Object object, PDataType baseType, SortOrder sortOrder) {
-		if(object == null) {
-			throw new ConstraintViolationException(this + " may not be null");
-		}
-		PhoenixArray arr = ((PhoenixArray)object);
+    public byte[] toBytes(Object object, PDataType baseType, SortOrder sortOrder) {
+        return toBytes(object, baseType, sortOrder, true);
+    }
+    
+    public byte[] toBytes(Object object, PDataType baseType, SortOrder sortOrder, boolean rowKeyOrderOptimizable) {
+        if (object == null) { throw new ConstraintViolationException(this + " may not be null"); }
+        PhoenixArray arr = ((PhoenixArray)object);
         int noOfElements = arr.numElements;
-        if(noOfElements == 0) {
-        	return ByteUtil.EMPTY_BYTE_ARRAY;
-        }
+        if (noOfElements == 0) { return ByteUtil.EMPTY_BYTE_ARRAY; }
         TrustedByteArrayOutputStream byteStream = null;
-		if (!baseType.isFixedWidth()) {
-	        Pair<Integer, Integer> nullsVsNullRepeationCounter = new Pair<>();
-	        int size = estimateByteSize(object, nullsVsNullRepeationCounter,
-	                PDataType.fromTypeId((baseType.getSqlType() + PDataType.ARRAY_TYPE_BASE)));
-		    size += ((2 * Bytes.SIZEOF_BYTE) + (noOfElements - nullsVsNullRepeationCounter.getFirst()) * Bytes.SIZEOF_BYTE)
-		                                + (nullsVsNullRepeationCounter.getSecond() * 2 * Bytes.SIZEOF_BYTE);
-		    // Assume an offset array that fit into Short.MAX_VALUE.  Also not considering nulls that could be > 255
-		    // In both of these cases, finally an array copy would happen
-		    int capacity = noOfElements * Bytes.SIZEOF_SHORT;
-		    // Here the int for noofelements, byte for the version, int for the offsetarray position and 2 bytes for the end seperator
-            byteStream = new TrustedByteArrayOutputStream(size + capacity + Bytes.SIZEOF_INT + Bytes.SIZEOF_BYTE +  Bytes.SIZEOF_INT);
-		} else {
-		    int size = arr.getMaxLength() * noOfElements;
-		    // Here the int for noofelements, byte for the version
-		    byteStream = new TrustedByteArrayOutputStream(size);
-		}
-		DataOutputStream oStream = new DataOutputStream(byteStream);
-		// Handles bit inversion also
-		return createArrayBytes(byteStream, oStream, (PhoenixArray)object, noOfElements, baseType, sortOrder);
-	}
-	
+        if (!baseType.isFixedWidth()) {
+            Pair<Integer, Integer> nullsVsNullRepeationCounter = new Pair<>();
+            int size = estimateByteSize(object, nullsVsNullRepeationCounter,
+                    PDataType.fromTypeId((baseType.getSqlType() + PDataType.ARRAY_TYPE_BASE)));
+            size += ((2 * Bytes.SIZEOF_BYTE) + (noOfElements - nullsVsNullRepeationCounter.getFirst())
+                    * Bytes.SIZEOF_BYTE)
+                    + (nullsVsNullRepeationCounter.getSecond() * 2 * Bytes.SIZEOF_BYTE);
+            // Assume an offset array that fit into Short.MAX_VALUE. Also not considering nulls that could be > 255
+            // In both of these cases, finally an array copy would happen
+            int capacity = noOfElements * Bytes.SIZEOF_SHORT;
+            // Here the int for noofelements, byte for the version, int for the offsetarray position and 2 bytes for the
+            // end seperator
+            byteStream = new TrustedByteArrayOutputStream(size + capacity + Bytes.SIZEOF_INT + Bytes.SIZEOF_BYTE
+                    + Bytes.SIZEOF_INT);
+        } else {
+            int size = arr.getMaxLength() * noOfElements;
+            // Here the int for noofelements, byte for the version
+            byteStream = new TrustedByteArrayOutputStream(size);
+        }
+        DataOutputStream oStream = new DataOutputStream(byteStream);
+        // Handles bit inversion also
+        return createArrayBytes(byteStream, oStream, (PhoenixArray)object, noOfElements, baseType, sortOrder, rowKeyOrderOptimizable);
+    }
+
     public static int serializeNulls(DataOutputStream oStream, int nulls) throws IOException {
         // We need to handle 3 different cases here
         // 1) Arrays with repeating nulls in the middle which is less than 255
@@ -97,12 +125,15 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
         // Take a case where we have two arrays that has the following elements
         // Array 1 - size : 240, elements = abc, bcd, null, null, bcd,null,null......,null, abc
         // Array 2 - size : 16 : elements = abc, bcd, null, null, bcd, null, null...null, abc
-        // In both case the elements and the value array will be the same but the Array 1 is actually smaller because it has more nulls.
-        // Now we should have mechanism to show that we treat arrays with more nulls as lesser.  Hence in the above case as 
+        // In both case the elements and the value array will be the same but the Array 1 is actually smaller because it
+        // has more nulls.
+        // Now we should have mechanism to show that we treat arrays with more nulls as lesser. Hence in the above case
+        // as
         // 240 > Bytes.MAX_VALUE, by always inverting the number of nulls we would get a +ve value
-        // For Array 2, by inverting we would get a -ve value.  On comparison Array 2 > Array 1.
-        // Now for cases where the number of nulls is greater than 255, we would write an those many (byte)1, it is bigger than 255.
-        // This would ensure that we don't compare with triple zero which is used as an end  byte
+        // For Array 2, by inverting we would get a -ve value. On comparison Array 2 > Array 1.
+        // Now for cases where the number of nulls is greater than 255, we would write an those many (byte)1, it is
+        // bigger than 255.
+        // This would ensure that we don't compare with triple zero which is used as an end byte
         if (nulls > 0) {
             oStream.write(QueryConstants.SEPARATOR_BYTE);
             int nMultiplesOver255 = nulls / 255;
@@ -110,61 +141,65 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
                 // Don't write a zero byte, as we need to ensure that the only triple zero
                 // byte occurs at the end of the array (i.e. the terminator byte for the
                 // element plus the double zero byte at the end of the array).
-                oStream.write((byte)1); 
+                oStream.write((byte)1);
             }
             int nRemainingNulls = nulls % 255; // From 0 to 254
             // Write a byte for the remaining null elements
             if (nRemainingNulls > 0) {
                 // Remaining null elements is from 1 to 254.
-                // Subtract one and invert so that more remaining nulls becomes smaller than less 
-                // remaining nulls and min byte value is always greater than 1, the repeating value  
+                // Subtract one and invert so that more remaining nulls becomes smaller than less
+                // remaining nulls and min byte value is always greater than 1, the repeating value
                 // used for arrays with more than 255 repeating null elements.
-                // The reason we invert is that  an array with less null elements has a non
+                // The reason we invert is that an array with less null elements has a non
                 // null element sooner than an array with more null elements. Thus, the more
                 // null elements you have, the smaller the array becomes.
-                byte nNullByte = SortOrder.invert((byte)(nRemainingNulls-1));
+                byte nNullByte = SortOrder.invert((byte)(nRemainingNulls - 1));
                 oStream.write(nNullByte); // Single byte for repeating nulls
             }
         }
         return 0;
     }
 
-    public static int serializeNulls(byte[] bytes, int position, int nulls){
+    public static int serializeNulls(byte[] bytes, int position, int nulls) {
         int nMultiplesOver255 = nulls / 255;
         while (nMultiplesOver255-- > 0) {
             bytes[position++] = 1;
         }
         int nRemainingNulls = nulls % 255;
         if (nRemainingNulls > 0) {
-            byte nNullByte = SortOrder.invert((byte)(nRemainingNulls-1));
+            byte nNullByte = SortOrder.invert((byte)(nRemainingNulls - 1));
             bytes[position++] = nNullByte;
         }
         return position;
     }
- 
-    public static void writeEndSeperatorForVarLengthArray(DataOutputStream oStream) throws IOException {
-        oStream.write(QueryConstants.SEPARATOR_BYTE);
-        oStream.write(QueryConstants.SEPARATOR_BYTE);
+
+    public static void writeEndSeperatorForVarLengthArray(DataOutputStream oStream, SortOrder sortOrder) throws IOException {
+        writeEndSeperatorForVarLengthArray(oStream, sortOrder, true);
+    }
+    
+    private static void writeEndSeperatorForVarLengthArray(DataOutputStream oStream, SortOrder sortOrder, boolean rowKeyOrderOptimizable)
+            throws IOException {
+        byte sepByte = getSeparatorByte(rowKeyOrderOptimizable, sortOrder);
+        oStream.write(sepByte);
+        oStream.write(sepByte);
     }
 
-	public static boolean useShortForOffsetArray(int maxOffset) {
-		// If the max offset is less than Short.MAX_VALUE then offset array can use short
-		if (maxOffset <= (2 * Short.MAX_VALUE)) {
-			return true;
-		}
-		// else offset array can use Int
-		return false;
-	}
-
-	public int toBytes(Object object, byte[] bytes, int offset) {
-	    PhoenixArray array = (PhoenixArray)object;
-        if (array == null || array.baseType == null) {
-            return 0;
-        }
-        return estimateByteSize(object, null, PDataType.fromTypeId((array.baseType.getSqlType() + PDataType.ARRAY_TYPE_BASE)));
-	}
+    public static boolean useShortForOffsetArray(int maxOffset) {
+        // If the max offset is less than Short.MAX_VALUE then offset array can use short
+        if (maxOffset <= (2 * Short.MAX_VALUE)) { return true; }
+        // else offset array can use Int
+        return false;
+    }
 
-	// Estimates the size of the given array and also calculates the number of nulls and its repetition factor
+    @Override
+    public int toBytes(Object object, byte[] bytes, int offset) {
+        PhoenixArray array = (PhoenixArray)object;
+        if (array == null || array.baseType == null) { return 0; }
+        return estimateByteSize(object, null,
+                PDataType.fromTypeId((array.baseType.getSqlType() + PDataType.ARRAY_TYPE_BASE)));
+    }
+
+    // Estimates the size of the given array and also calculates the number of nulls and its repetition factor
     public int estimateByteSize(Object o, Pair<Integer, Integer> nullsVsNullRepeationCounter, PDataType baseType) {
         if (baseType.isFixedWidth()) { return baseType.getByteSize(); }
         if (baseType.isArrayType()) {
@@ -201,43 +236,41 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
         // Non fixed width types must override this
         throw new UnsupportedOperationException();
     }
-    
-	public boolean isCoercibleTo(PDataType targetType, Object value) {
-	    return targetType.isCoercibleTo(targetType, value);
-	}
-	
-	public boolean isCoercibleTo(PDataType targetType, PDataType expectedTargetType) {
-		if(!targetType.isArrayType()) {
-			return false;
-		} else {
-			PDataType targetElementType = PDataType.fromTypeId(targetType.getSqlType()
-					- PDataType.ARRAY_TYPE_BASE);
-			PDataType expectedTargetElementType = PDataType.fromTypeId(expectedTargetType
-					.getSqlType() - PDataType.ARRAY_TYPE_BASE);
-			return expectedTargetElementType.isCoercibleTo(targetElementType);
-		}
+
+    @Override
+    public boolean isCoercibleTo(PDataType targetType, Object value) {
+        return targetType.isCoercibleTo(targetType, value);
     }
 
-  @Override
-	public boolean isSizeCompatible(ImmutableBytesWritable ptr, Object value,
-      PDataType srcType, Integer maxLength, Integer scale,
-      Integer desiredMaxLength, Integer desiredScale) {
-    if (value == null) return true;
-		PhoenixArray pArr = (PhoenixArray) value;
-    PDataType baseType = PDataType.fromTypeId(srcType.getSqlType() - PDataType.ARRAY_TYPE_BASE);
-    for (int i = 0 ; i < pArr.numElements; i++) {
-      Object val = pArr.getElement(i);
-      if (!baseType.isSizeCompatible(ptr, val, baseType, srcType.getMaxLength(val),
-          scale, desiredMaxLength, desiredScale)) {
-        return false;
-      }
-		}
-		return true;
-	}
-
-    public void coerceBytes(ImmutableBytesWritable ptr, Object value, PDataType actualType, Integer maxLength,
-        Integer scale, Integer desiredMaxLength, Integer desiredScale, PDataType desiredType,
-            SortOrder actualModifer, SortOrder expectedModifier) {
+    public boolean isCoercibleTo(PDataType targetType, PDataType expectedTargetType) {
+        if (!targetType.isArrayType()) {
+            return false;
+        } else {
+            PDataType targetElementType = PDataType.fromTypeId(targetType.getSqlType() - PDataType.ARRAY_TYPE_BASE);
+            PDataType expectedTargetElementType = PDataType.fromTypeId(expectedTargetType.getSqlType()
+                    - PDataType.ARRAY_TYPE_BASE);
+            return expectedTargetElementType.isCoercibleTo(targetElementType);
+        }
+    }
+
+    @Override
+    public boolean isSizeCompatible(ImmutableBytesWritable ptr, Object value, PDataType srcType, Integer maxLength,
+            Integer scale, Integer desiredMaxLength, Integer desiredScale) {
+        if (value == null) return true;
+        PhoenixArray pArr = (PhoenixArray)value;
+        PDataType baseType = PDataType.fromTypeId(srcType.getSqlType() - PDataType.ARRAY_TYPE_BASE);
+        for (int i = 0; i < pArr.numElements; i++) {
+            Object val = pArr.getElement(i);
+            if (!baseType.isSizeCompatible(ptr, val, baseType, srcType.getMaxLength(val), scale, desiredMaxLength,
+                    desiredScale)) { return false; }
+        }
+        return true;
+    }
+
+    private void coerceBytes(ImmutableBytesWritable ptr, Object value, PDataType actualType, Integer maxLength,
+            Integer scale, Integer desiredMaxLength, Integer desiredScale, PDataType desiredType,
+            SortOrder actualSortOrder, SortOrder desiredSortOrder, 
+            boolean expectedRowKeyOrderOptimizable) {
         if (ptr.getLength() == 0) { // a zero length ptr means null which will not be coerced to anything different
             return;
         }
@@ -245,42 +278,62 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
         PDataType desiredBaseType = PDataType.fromTypeId(desiredType.getSqlType() - PDataType.ARRAY_TYPE_BASE);
         if ((Objects.equal(maxLength, desiredMaxLength) || maxLength == null || desiredMaxLength == null)
                 && actualType.isBytesComparableWith(desiredType)
-                && baseType.isFixedWidth() == desiredBaseType.isFixedWidth() && actualModifer == expectedModifier) { 
+                && baseType.isFixedWidth() == desiredBaseType.isFixedWidth() 
+                && actualSortOrder == desiredSortOrder
+                && (desiredSortOrder == SortOrder.ASC || desiredBaseType.isFixedWidth() || isRowKeyOrderOptimized(actualType, actualSortOrder, ptr) == expectedRowKeyOrderOptimizable)) {
             return; 
         }
+        PhoenixArray pArr;
         if (value == null || actualType != desiredType) {
-            value = toObject(ptr.get(), ptr.getOffset(), ptr.getLength(), baseType, actualModifer, maxLength,
+            value = toObject(ptr.get(), ptr.getOffset(), ptr.getLength(), baseType, actualSortOrder, maxLength,
                     desiredScale, desiredBaseType);
-            PhoenixArray pArr = (PhoenixArray)value;
+            pArr = (PhoenixArray)value;
             // VARCHAR <=> CHAR
-            if(baseType.isFixedWidth() != desiredBaseType.isFixedWidth()) {
+            if (baseType.isFixedWidth() != desiredBaseType.isFixedWidth()) {
                 if (!pArr.isPrimitiveType()) {
                     pArr = new PhoenixArray(pArr, desiredMaxLength);
                 }
             }
-            //Coerce to new max length when only max lengths differ
-            if(actualType == desiredType && !pArr.isPrimitiveType() && maxLength != null && maxLength != desiredMaxLength){
-               pArr = new PhoenixArray(pArr, desiredMaxLength);
+            // Coerce to new max length when only max lengths differ
+            if (actualType == desiredType && !pArr.isPrimitiveType() && maxLength != null
+                    && maxLength != desiredMaxLength) {
+                pArr = new PhoenixArray(pArr, desiredMaxLength);
             }
             baseType = desiredBaseType;
-            ptr.set(toBytes(pArr, baseType, expectedModifier));
         } else {
-            PhoenixArray pArr = (PhoenixArray)value;
+            pArr = (PhoenixArray)value;
             pArr = new PhoenixArray(pArr, desiredMaxLength);
-            ptr.set(toBytes(pArr, baseType, expectedModifier));
         }
+        ptr.set(toBytes(pArr, baseType, desiredSortOrder, expectedRowKeyOrderOptimizable));
     }
 
+    public static boolean isRowKeyOrderOptimized(PDataType type, SortOrder sortOrder, ImmutableBytesWritable ptr) {
+        return isRowKeyOrderOptimized(type, sortOrder, ptr.get(), ptr.getOffset(), ptr.getLength());
+    }
+    
+    public static boolean isRowKeyOrderOptimized(PDataType type, SortOrder sortOrder, byte[] buf, int offset, int length) {
+        PDataType baseType = PDataType.fromTypeId(type.getSqlType() - PDataType.ARRAY_TYPE_BASE);
+        return isRowKeyOrderOptimized(baseType.isFixedWidth(), sortOrder, buf, offset, length);
+    }
+    
+    private static boolean isRowKeyOrderOptimized(boolean isFixedWidth, SortOrder sortOrder, byte[] buf, int offset, int length) {
+        if (length == 0 || sortOrder == SortOrder.ASC || isFixedWidth) {
+            return true;
+        }
+        int offsetToHeaderOffset = offset + length - Bytes.SIZEOF_BYTE - Bytes.SIZEOF_INT * 2;
+        int offsetToSeparatorByte = Bytes.readAsInt(buf, offsetToHeaderOffset, Bytes.SIZEOF_INT) - 1;
+        return buf[offsetToSeparatorByte] == QueryConstants.DESC_SEPARATOR_BYTE;
+    }
 
+    @Override
     public Object toObject(String value) {
-		throw new IllegalArgumentException("This operation is not suppported");
-	}
+        throw new IllegalArgumentException("This operation is not suppported");
+    }
 
-	public Object toObject(byte[] bytes, int offset, int length, PDataType baseType, 
-			SortOrder sortOrder, Integer maxLength, Integer scale, PDataType desiredDataType) {
-		return createPhoenixArray(bytes, offset, length, sortOrder,
-				baseType, maxLength, desiredDataType);
-	}
+    public Object toObject(byte[] bytes, int offset, int length, PDataType baseType, SortOrder sortOrder,
+            Integer maxLength, Integer scale, PDataType desiredDataType) {
+        return createPhoenixArray(bytes, offset, length, sortOrder, baseType, maxLength, desiredDataType);
+    }
 
     public static boolean positionAtArrayElement(Tuple tuple, ImmutableBytesWritable ptr, int index,
             Expression arrayExpr, PDataType pDataType, Integer maxLen) {
@@ -293,13 +346,14 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
         positionAtArrayElement(ptr, index - 1, pDataType, maxLen);
         return true;
     }
+
     public static void positionAtArrayElement(ImmutableBytesWritable ptr, int arrayIndex, PDataType baseDataType,
-        Integer byteSize) {
+            Integer byteSize) {
         byte[] bytes = ptr.get();
         int initPos = ptr.getOffset();
         if (!baseDataType.isFixedWidth()) {
-            int noOfElements = Bytes.toInt(bytes, (ptr.getOffset() + ptr.getLength() - (Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT)),
-                    Bytes.SIZEOF_INT);
+            int noOfElements = Bytes.toInt(bytes,
+                    (ptr.getOffset() + ptr.getLength() - (Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT)), Bytes.SIZEOF_INT);
             boolean useShort = true;
             if (noOfElements < 0) {
                 noOfElements = -noOfElements;
@@ -312,7 +366,7 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
 
             int indexOffset = Bytes.toInt(bytes,
                     (ptr.getOffset() + ptr.getLength() - (Bytes.SIZEOF_BYTE + 2 * Bytes.SIZEOF_INT))) + ptr.getOffset();
-            if(arrayIndex >= noOfElements) {
+            if (arrayIndex >= noOfElements) {
                 ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
             } else {
                 // Skip those many offsets as given in the arrayIndex
@@ -343,9 +397,9 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
             }
         }
     }
-    
+
     public static void positionAtArrayElement(ImmutableBytesWritable ptr, int arrayIndex, PDataType baseDataType,
-        Integer byteSize, int offset, int length, int noOfElements, boolean first) {
+            Integer byteSize, int offset, int length, int noOfElements, boolean first) {
         byte[] bytes = ptr.get();
         if (!baseDataType.isFixedWidth()) {
             int indexOffset = Bytes.toInt(bytes, (offset + length - (Bytes.SIZEOF_BYTE + 2 * Bytes.SIZEOF_INT)))
@@ -401,10 +455,10 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
             return Bytes.toInt(bytes, offset, Bytes.SIZEOF_INT);
         }
     }
-    
-    private static int getOffset(ByteBuffer indexBuffer, int arrayIndex, boolean useShort, int indexOffset ) {
+
+    private static int getOffset(ByteBuffer indexBuffer, int arrayIndex, boolean useShort, int indexOffset) {
         int offset;
-        if(useShort) {
+        if (useShort) {
             offset = indexBuffer.getShort() + Short.MAX_VALUE;
         } else {
             offset = indexBuffer.getInt();
@@ -412,20 +466,22 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
         return offset;
     }
 
-	public Object toObject(Object object, PDataType actualType) {
-		return object;
-	}
-
-	public Object toObject(Object object, PDataType actualType, SortOrder sortOrder) {
-		// How to use the sortOrder ? Just reverse the elements
-		return toObject(object, actualType);
-	}
-	
-	/**
-	 * creates array bytes
-	 */
+    @Override
+    public Object toObject(Object object, PDataType actualType) {
+        return object;
+    }
+
+    public Object toObject(Object object, PDataType actualType, SortOrder sortOrder) {
+        // How to use the sortOrder ? Just reverse the elements
+        return toObject(object, actualType);
+    }
+
+    /**
+     * creates array bytes
+     * @param rowKeyOrderOptimizable TODO
+     */
     private byte[] createArrayBytes(TrustedByteArrayOutputStream byteStream, DataOutputStream oStream,
-            PhoenixArray array, int noOfElements, PDataType baseType, SortOrder sortOrder) {
+            PhoenixArray array, int noOfElements, PDataType baseType, SortOrder sortOrder, boolean rowKeyOrderOptimizable) {
         try {
             if (!baseType.isFixedWidth()) {
                 int[] offsetPos = new int[noOfElements];
@@ -442,11 +498,11 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
                             SortOrder.invert(bytes, 0, bytes, 0, bytes.length);
                         }
                         oStream.write(bytes, 0, bytes.length);
-                        oStream.write(QueryConstants.SEPARATOR_BYTE);
+                        oStream.write(getSeparatorByte(rowKeyOrderOptimizable, sortOrder));
                     }
                 }
                 // Double seperator byte to show end of the non null array
-                PArrayDataType.writeEndSeperatorForVarLengthArray(oStream);
+                writeEndSeperatorForVarLengthArray(oStream, sortOrder, rowKeyOrderOptimizable);
                 noOfElements = PArrayDataType.serailizeOffsetArrayIntoStream(oStream, byteStream, noOfElements,
                         offsetPos[offsetPos.length - 1], offsetPos);
                 serializeHeaderInfoIntoStream(oStream, noOfElements);
@@ -475,7 +531,8 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
         return null;
     }
 
-    public static boolean appendItemToArray(ImmutableBytesWritable ptr, int length, int offset, byte[] arrayBytes, PDataType baseType, int arrayLength, Integer maxLength, SortOrder sortOrder) {
+    public static boolean appendItemToArray(ImmutableBytesWritable ptr, int length, int offset, byte[] arrayBytes,
+            PDataType baseType, int arrayLength, Integer maxLength, SortOrder sortOrder) {
         if (ptr.getLength() == 0) {
             ptr.set(arrayBytes, offset, length);
             return true;
@@ -483,7 +540,7 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
 
         int elementLength = maxLength == null ? ptr.getLength() : maxLength;
 
-        //padding
+        // padding
         if (elementLength > ptr.getLength()) {
             baseType.pad(ptr, elementLength, sortOrder);
         }
@@ -494,10 +551,12 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
         byte[] newArray;
         if (!baseType.isFixedWidth()) {
 
-            int offsetArrayPosition = Bytes.toInt(arrayBytes, offset + length - Bytes.SIZEOF_INT - Bytes.SIZEOF_INT - Bytes.SIZEOF_BYTE, Bytes.SIZEOF_INT);
-            int offsetArrayLength = length - offsetArrayPosition - Bytes.SIZEOF_INT - Bytes.SIZEOF_INT - Bytes.SIZEOF_BYTE;
+            int offsetArrayPosition = Bytes.toInt(arrayBytes, offset + length - Bytes.SIZEOF_INT - Bytes.SIZEOF_INT
+                    - Bytes.SIZEOF_BYTE, Bytes.SIZEOF_INT);
+            int offsetArrayLength = length - offsetArrayPosition - Bytes.SIZEOF_INT - Bytes.SIZEOF_INT
+                    - Bytes.SIZEOF_BYTE;
 
-            //checks whether offset array consists of shorts or integers
+            // checks whether offset array consists of shorts or integers
             boolean useInt = offsetArrayLength / Math.abs(arrayLength) == Bytes.SIZEOF_INT;
             boolean convertToInt = false;
 
@@ -507,7 +566,8 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
                 if (PArrayDataType.useShortForOffsetArray(newElementPosition)) {
                     newArray = new byte[length + elementLength + Bytes.SIZEOF_SHORT + Bytes.SIZEOF_BYTE];
                 } else {
-                    newArray = new byte[length + elementLength + arrayLength * Bytes.SIZEOF_SHORT + Bytes.SIZEOF_INT + Bytes.SIZEOF_BYTE];
+                    newArray = new byte[length + elementLength + arrayLength * Bytes.SIZEOF_SHORT + Bytes.SIZEOF_INT
+                            + Bytes.SIZEOF_BYTE];
                     convertToInt = true;
                 }
             } else {
@@ -517,24 +577,35 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
             int newOffsetArrayPosition = newElementPosition + elementLength + 3 * Bytes.SIZEOF_BYTE;
 
             System.arraycopy(arrayBytes, offset, newArray, 0, newElementPosition);
+            // Write separator explicitly, as it may not be 0
+            byte sepByte = getSeparatorByte(isRowKeyOrderOptimized(false, sortOrder, arrayBytes, offset, length), sortOrder);
+            newArray[newOffsetArrayPosition-3] = sepByte; // Separator for new value
+            newArray[newOffsetArrayPosition-2] = sepByte; // Double byte separator
+            newArray[newOffsetArrayPosition-1] = sepByte;
             System.arraycopy(elementBytes, elementOffset, newArray, newElementPosition, elementLength);
 
-            arrayLength = (Math.abs(arrayLength) + 1) * (int) Math.signum(arrayLength);
+            arrayLength = (Math.abs(arrayLength) + 1) * (int)Math.signum(arrayLength);
             if (useInt) {
-                System.arraycopy(arrayBytes, offset + offsetArrayPosition, newArray, newOffsetArrayPosition, offsetArrayLength);
+                System.arraycopy(arrayBytes, offset + offsetArrayPosition, newArray, newOffsetArrayPosition,
+                        offsetArrayLength);
                 Bytes.putInt(newArray, newOffsetArrayPosition + offsetArrayLength, newElementPosition);
 
-                writeEndBytes(newArray, newOffsetArrayPosition, offsetArrayLength, arrayLength, arrayBytes[offset + length - 1], true);
+                writeEndBytes(newArray, newOffsetArrayPosition, offsetArrayLength, arrayLength, arrayBytes[offset
+                        + length - 1], true);
             } else {
                 if (!convertToInt) {
-                    System.arraycopy(arrayBytes, offset + offsetArrayPosition, newArray, newOffsetArrayPosition, offsetArrayLength);
-                    Bytes.putShort(newArray, newOffsetArrayPosition + offsetArrayLength, (short) (newElementPosition - Short.MAX_VALUE));
+                    System.arraycopy(arrayBytes, offset + offsetArrayPosition, newArray, newOffsetArrayPosition,
+                            offsetArrayLength);
+                    Bytes.putShort(newArray, newOffsetArrayPosition + offsetArrayLength,
+                            (short)(newElementPosition - Short.MAX_VALUE));
 
-                    writeEndBytes(newArray, newOffsetArrayPosition, offsetArrayLength, arrayLength, arrayBytes[offset + length - 1], false);
+                    writeEndBytes(newArray, newOffsetArrayPosition, offsetArrayLength, arrayLength, arrayBytes[offset
+                            + length - 1], false);
                 } else {
                     int off = newOffsetArrayPosition;
                     for (int arrayIndex = 0; arrayIndex < Math.abs(arrayLength) - 1; arrayIndex++) {
-                        Bytes.putInt(newArray, off, getOffset(arrayBytes, arrayIndex, true, offsetArrayPosition + offset));
+                        Bytes.putInt(newArray, off,
+                                getOffset(arrayBytes, arrayIndex, true, offsetArrayPosition + offset));
                         off += Bytes.SIZEOF_INT;
                     }
 
@@ -557,7 +628,8 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
         return true;
     }
 
-    private static void writeEndBytes(byte[] array, int newOffsetArrayPosition, int offsetArrayLength, int arrayLength, byte header, boolean useInt) {
+    private static void writeEndBytes(byte[] array, int newOffsetArrayPosition, int offsetArrayLength, int arrayLength,
+            byte header, boolean useInt) {
         int byteSize = useInt ? Bytes.SIZEOF_INT : Bytes.SIZEOF_SHORT;
 
         Bytes.putInt(array, newOffsetArrayPosition + offsetArrayLength + byteSize, newOffsetArrayPosition);
@@ -565,12 +637,13 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
         Bytes.putByte(array, newOffsetArrayPosition + offsetArrayLength + byteSize + 2 * Bytes.SIZEOF_INT, header);
     }
 
-    public static boolean prependItemToArray(ImmutableBytesWritable ptr, int length, int offset, byte[] arrayBytes, PDataType baseType, int arrayLength, Integer maxLength, SortOrder sortOrder) {
+    public static boolean prependItemToArray(ImmutableBytesWritable ptr, int length, int offset, byte[] arrayBytes,
+            PDataType baseType, int arrayLength, Integer maxLength, SortOrder sortOrder) {
         int elementLength = maxLength == null ? ptr.getLength() : maxLength;
         if (ptr.getLength() == 0) {
             elementLength = 0;
         }
-        //padding
+        // padding
         if (elementLength > ptr.getLength()) {
             baseType.pad(ptr, elementLength, sortOrder);
         }
@@ -579,28 +652,31 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
 
         byte[] newArray;
         if (!baseType.isFixedWidth()) {
-            int offsetArrayPosition = Bytes.toInt(arrayBytes, offset + length - Bytes.SIZEOF_INT - Bytes.SIZEOF_INT - Bytes.SIZEOF_BYTE, Bytes.SIZEOF_INT);
-            int offsetArrayLength = length - offsetArrayPosition - Bytes.SIZEOF_INT - Bytes.SIZEOF_INT - Bytes.SIZEOF_BYTE;
+            int offsetArrayPosition = Bytes.toInt(arrayBytes, offset + length - Bytes.SIZEOF_INT - Bytes.SIZEOF_INT
+                    - Bytes.SIZEOF_BYTE, Bytes.SIZEOF_INT);
+            int offsetArrayLength = length - offsetArrayPosition - Bytes.SIZEOF_INT - Bytes.SIZEOF_INT
+                    - Bytes.SIZEOF_BYTE;
             arrayLength = Math.abs(arrayLength);
 
-            //checks whether offset array consists of shorts or integers
+            // checks whether offset array consists of shorts or integers
             boolean useInt = offsetArrayLength / arrayLength == Bytes.SIZEOF_INT;
             boolean convertToInt = false;
-            int endElementPosition = getOffset(arrayBytes, arrayLength - 1, !useInt, offsetArrayPosition + offset) + elementLength + Bytes.SIZEOF_BYTE;
+            int endElementPosition = getOffset(arrayBytes, arrayLength - 1, !useInt, offsetArrayPosition + offset)
+                    + elementLength + Bytes.SIZEOF_BYTE;
             int newOffsetArrayPosition;
             int lengthIncrease;
             int firstNonNullElementPosition = 0;
             int currentPosition = 0;
-            //handle the case where prepended element is null
+            // handle the case where prepended element is null
             if (elementLength == 0) {
                 int nulls = 1;
-                //counts the number of nulls which are already at the beginning of the array
+                // counts the number of nulls which are already at the beginning of the array
                 for (int index = 0; index < arrayLength; index++) {
                     int currOffset = getOffset(arrayBytes, index, !useInt, offsetArrayPosition + offset);
                     if (arrayBytes[offset + currOffset] == QueryConstants.SEPARATOR_BYTE) {
                         nulls++;
                     } else {
-                        //gets the offset of the first element after nulls at the beginning
+                        // gets the offset of the first element after nulls at the beginning
                         firstNonNullElementPosition = currOffset;
                         break;
                     }
@@ -609,26 +685,31 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
                 int nMultiplesOver255 = nulls / 255;
                 int nRemainingNulls = nulls % 255;
 
-                //Calculates the increase in length due to prepending the null
-                //There is a length increase only when nRemainingNulls == 1
-                //nRemainingNulls == 1 and nMultiplesOver255 == 0 means there were no nulls at the beginning previously.
-                //At that case we need to increase the length by two bytes, one for separator byte and one for null count.
-                //ex: initial array - 65 0 66 0 0 0 after prepending null - 0 1(inverted) 65 0 66 0 0 0
-                //nRemainingNulls == 1 and nMultiplesOver255 != 0 means there were null at the beginning previously.
-                //In this case due to prepending nMultiplesOver255 is increased by 1.
-                //We need to increase the length by one byte to store increased that.
-                //ex: initial array - 0 1 65 0 66 0 0 0 after prepending null - 0 1 1(inverted) 65 0 66 0 0 0
-                //nRemainingNulls == 0 case.
-                //ex: initial array - 0 254(inverted) 65 0 66 0 0 0 after prepending null - 0 1 65 0 66 0 0 0
-                //nRemainingNulls > 1 case.
-                //ex: initial array - 0 45(inverted) 65 0 66 0 0 0 after prepending null - 0 46(inverted) 65 0 66 0 0 0
-                lengthIncrease = nRemainingNulls == 1 ? (nMultiplesOver255 == 0 ? 2 * Bytes.SIZEOF_BYTE : Bytes.SIZEOF_BYTE) : 0;
-                endElementPosition = getOffset(arrayBytes, arrayLength - 1, !useInt, offsetArrayPosition + offset) + lengthIncrease;
+                // Calculates the increase in length due to prepending the null
+                // There is a length increase only when nRemainingNulls == 1
+                // nRemainingNulls == 1 and nMultiplesOver255 == 0 means there were no nulls at the beginning
+                // previously.
+                // At that case we need to increase the length by two bytes, one for separator byte and one for null
+                // count.
+                // ex: initial array - 65 0 66 0 0 0 after prepending null - 0 1(inverted) 65 0 66 0 0 0
+                // nRemainingNulls == 1 and nMultiplesOver255 != 0 means there were null at the beginning previously.
+                // In this case due to prepending nMultiplesOver255 is increased by 1.
+                // We need to increase the length by one byte to store increased that.
+                // ex: initial array - 0 1 65 0 66 0 0 0 after prepending null - 0 1 1(inverted) 65 0 66 0 0 0
+                // nRemainingNulls == 0 case.
+                // ex: initial array - 0 254(inverted) 65 0 66 0 0 0 after prepending null - 0 1 65 0 66 0 0 0
+                // nRemainingNulls > 1 case.
+                // ex: initial array - 0 45(inverted) 65 0 66 0 0 0 after prepending null - 0 46(inverted) 65 0 66 0 0 0
+                lengthIncrease = nRemainingNulls == 1 ? (nMultiplesOver255 == 0 ? 2 * Bytes.SIZEOF_BYTE
+                        : Bytes.SIZEOF_BYTE) : 0;
+                endElementPosition = getOffset(arrayBytes, arrayLength - 1, !useInt, offsetArrayPosition + offset)
+                        + lengthIncrease;
                 if (!useInt) {
                     if (PArrayDataType.useShortForOffsetArray(endElementPosition)) {
                         newArray = new byte[length + Bytes.SIZEOF_SHORT + lengthIncrease];
                     } else {
-                        newArray = new byte[length + arrayLength * Bytes.SIZEOF_SHORT + Bytes.SIZEOF_INT + lengthIncrease];
+                        newArray = new byte[length + arrayLength * Bytes.SIZEOF_SHORT + Bytes.SIZEOF_INT
+                                + lengthIncrease];
                         convertToInt = true;
                     }
                 } else {
@@ -638,14 +719,15 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
                 currentPosition++;
 
                 newOffsetArrayPosition = offsetArrayPosition + lengthIncrease;
-                //serialize nulls at the beginning
+                // serialize nulls at the beginning
                 currentPosition = serializeNulls(newArray, currentPosition, nulls);
             } else {
                 if (!useInt) {
                     if (PArrayDataType.useShortForOffsetArray(endElementPosition)) {
                         newArray = new byte[length + elementLength + Bytes.SIZEOF_SHORT + Bytes.SIZEOF_BYTE];
                     } else {
-                        newArray = new byte[length + elementLength + arrayLength * Bytes.SIZEOF_SHORT + Bytes.SIZEOF_INT + Bytes.SIZEOF_BYTE];
+                        newArray = new byte[length + elementLength + arrayLength * Bytes.SIZEOF_SHORT
+                                + Bytes.SIZEOF_INT + Bytes.SIZEOF_BYTE];
                         convertToInt = true;
                     }
                 } else {
@@ -655,17 +737,22 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
 
                 lengthIncrease = elementLength + Bytes.SIZEOF_BYTE;
                 System.arraycopy(elementBytes, elementOffset, newArray, 0, elementLength);
+                // Explicitly set separator byte since for DESC it won't be 0.
+                newArray[elementLength] = getSeparatorByte(isRowKeyOrderOptimized(false, sortOrder, arrayBytes, offset, length), sortOrder);
                 currentPosition += elementLength + Bytes.SIZEOF_BYTE;
             }
 
-            System.arraycopy(arrayBytes, firstNonNullElementPosition + offset, newArray, currentPosition, offsetArrayPosition);
+            System.arraycopy(arrayBytes, firstNonNullElementPosition + offset, newArray, currentPosition,
+                    offsetArrayPosition);
 
             arrayLength = arrayLength + 1;
-            //writes the new offset and changes the previous offsets
+            // writes the new offset and changes the previous offsets
             if (useInt || convertToInt) {
-                writeNewOffsets(arrayBytes, newArray, false, !useInt, newOffsetArrayPosition, arrayLength, offsetArrayPosition, offset, lengthIncrease, length);
+                writeNewOffsets(arrayBytes, newArray, false, !useInt, newOffsetArrayPosition, arrayLength,
+                        offsetArrayPosition, offset, lengthIncrease, length);
             } else {
-                writeNewOffsets(arrayBytes, newArray, true, true, newOffsetArrayPosition, arrayLength, offsetArrayPosition, offset, lengthIncrease, length);
+                writeNewOffsets(arrayBytes, newArray, true, true, newOffsetArrayPosition, arrayLength,
+                        offsetArrayPosition, offset, lengthIncrease, length);
             }
         } else {
             newArray = new byte[length + elementLength];
@@ -678,11 +765,13 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
         return true;
     }
 
-    private static void writeNewOffsets(byte[] arrayBytes, byte[] newArray, boolean useShortNew, boolean useShortPrevious, int newOffsetArrayPosition, int arrayLength, int offsetArrayPosition, int offset, int offsetShift, int length) {
+    private static void writeNewOffsets(byte[] arrayBytes, byte[] newArray, boolean useShortNew,
+            boolean useShortPrevious, int newOffsetArrayPosition, int arrayLength, int offsetArrayPosition, int offset,
+            int offsetShift, int length) {
         int currentPosition = newOffsetArrayPosition;
         int offsetArrayElementSize = useShortNew ? Bytes.SIZEOF_SHORT : Bytes.SIZEOF_INT;
         if (useShortNew) {
-            Bytes.putShort(newArray, currentPosition, (short) (0 - Short.MAX_VALUE));
+            Bytes.putShort(newArray, currentPosition, (short)(0 - Short.MAX_VALUE));
         } else {
             Bytes.putInt(newArray, currentPosition, 0);
         }
@@ -693,13 +782,13 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
             int oldOffset = getOffset(arrayBytes, arrayIndex, useShortPrevious, offsetArrayPosition + offset);
             if (arrayBytes[offset + oldOffset] == QueryConstants.SEPARATOR_BYTE && nullsAtBeginning) {
                 if (useShortNew) {
-                    Bytes.putShort(newArray, currentPosition, (short) (oldOffset - Short.MAX_VALUE));
+                    Bytes.putShort(newArray, currentPosition, (short)(oldOffset - Short.MAX_VALUE));
                 } else {
                     Bytes.putInt(newArray, currentPosition, oldOffset);
                 }
             } else {
                 if (useShortNew) {
-                    Bytes.putShort(newArray, currentPosition, (short) (oldOffset + offsetShift - Short.MAX_VALUE));
+                    Bytes.putShort(newArray, currentPosition, (short)(oldOffset + offsetShift - Short.MAX_VALUE));
                 } else {
                     Bytes.putInt(newArray, currentPosition, oldOffset + offsetShift);
                 }
@@ -715,7 +804,8 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
         Bytes.putByte(newArray, currentPosition, arrayBytes[offset + length - 1]);
     }
 
-    public static boolean concatArrays(ImmutableBytesWritable ptr, int array1BytesLength, int array1BytesOffset, byte[] array1Bytes, PDataType baseType, int actualLengthOfArray1, int actualLengthOfArray2) {
+    public static boolean concatArrays(ImmutableBytesWritable ptr, int array1BytesLength, int array1BytesOffset,
+            byte[] array1Bytes, PDataType baseType, int actualLengthOfArray1, int actualLengthOfArray2) {
         int array2BytesLength = ptr.getLength();
         int array2BytesOffset = ptr.getOffset();
         byte[] array2Bytes = ptr.get();
@@ -723,18 +813,22 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
         byte[] newArray;
 
         if (!baseType.isFixedWidth()) {
-            int offsetArrayPositionArray1 = Bytes.toInt(array1Bytes, array1BytesOffset + array1BytesLength - Bytes.SIZEOF_INT - Bytes.SIZEOF_INT - Bytes.SIZEOF_BYTE, Bytes.SIZEOF_INT);
-            int offsetArrayPositionArray2 = Bytes.toInt(array2Bytes, array2BytesOffset + array2BytesLength - Bytes.SIZEOF_INT - Bytes.SIZEOF_INT - Bytes.SIZEOF_BYTE, Bytes.SIZEOF_INT);
-            int offsetArrayLengthArray1 = array1BytesLength - offsetArrayPositionArray1 - Bytes.SIZEOF_INT - Bytes.SIZEOF_INT - Bytes.SIZEOF_BYTE;
-            int offsetArrayLengthArray2 = array2BytesLength - offsetArrayPositionArray2 - Bytes.SIZEOF_INT - Bytes.SIZEOF_INT - Bytes.SIZEOF_BYTE;
+            int offsetArrayPositionArray1 = Bytes.toInt(array1Bytes, array1BytesOffset + array1BytesLength
+                    - Bytes.SIZEOF_INT - Bytes.SIZEOF_INT - Bytes.SIZEOF_BYTE, Bytes.SIZEOF_INT);
+            int offsetArrayPositionArray2 = Bytes.toInt(array2Bytes, array2BytesOffset + array2BytesLength
+                    - Bytes.SIZEOF_INT - Bytes.SIZEOF_INT - Bytes.SIZEOF_BYTE, Bytes.SIZEOF_INT);
+            int offsetArrayLengthArray1 = array1BytesLength - offsetArrayPositionArray1 - Bytes.SIZEOF_INT
+                    - Bytes.SIZEOF_INT - Bytes.SIZEOF_BYTE;
+            int offsetArrayLengthArray2 = array2BytesLength - offsetArrayPositionArray2 - Bytes.SIZEOF_INT
+                    - Bytes.SIZEOF_INT - Bytes.SIZEOF_BYTE;
             int newArrayLength = actualLengthOfArray1 + actualLengthOfArray2;
             int nullsAtTheEndOfArray1 = 0;
             int nullsAtTheBeginningOfArray2 = 0;
-            //checks whether offset array consists of shorts or integers
+            // checks whether offset array consists of shorts or integers
             boolean useIntArray1 = offsetArrayLengthArray1 / actualLengthOfArray1 == Bytes.SIZEOF_INT;
             boolean useIntArray2 = offsetArrayLengthArray2 / actualLengthOfArray2 == Bytes.SIZEOF_INT;
             boolean useIntNewArray = false;
-            //count nulls at the end of array 1
+            // count nulls at the end of array 1
             for (int index = actualLengthOfArray1 - 1; index > -1; index--) {
                 int offset = getOffset(array1Bytes, index, !useIntArray1, array1BytesOffset + offsetArrayPositionArray1);
                 if (array1Bytes[array1BytesOffset + offset] == QueryConstants.SEPARATOR_BYTE) {
@@ -743,7 +837,7 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
                     break;
                 }
             }
-            //count nulls at the beginning of the array 2
+            // count nulls at the beginning of the array 2
             int array2FirstNonNullElementOffset = 0;
             int array2FirstNonNullIndex = 0;
             for (int index = 0; index < actualLengthOfArray2; index++) {
@@ -757,75 +851,95 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
                 }
             }
             int nullsInMiddleAfterConcat = nullsAtTheEndOfArray1 + nullsAtTheBeginningOfArray2;
-            int bytesForNullsBefore = nullsAtTheBeginningOfArray2 / 255 + (nullsAtTheBeginningOfArray2 % 255 == 0 ? 0 : 1);
+            int bytesForNullsBefore = nullsAtTheBeginningOfArray2 / 255
+                    + (nullsAtTheBeginningOfArray2 % 255 == 0 ? 0 : 1);
             int bytesForNullsAfter = nullsInMiddleAfterConcat / 255 + (nullsInMiddleAfterConcat % 255 == 0 ? 0 : 1);
-            //Increase of length required to store nulls
+            // Increase of length required to store nulls
             int lengthIncreaseForNulls = bytesForNullsAfter - bytesForNullsBefore;
-            //Length increase incremented by one when there were no nulls at the beginning of array and when there are
-            //nulls at the end of array 1 as we need to allocate a byte for separator byte in this case.
-            lengthIncreaseForNulls += nullsAtTheBeginningOfArray2 == 0 && nullsAtTheEndOfArray1 != 0 ? Bytes.SIZEOF_BYTE : 0;
-            int newOffsetArrayPosition = offsetArrayPositionArray1 + offsetArrayPositionArray2 + lengthIncreaseForNulls - 2 * Bytes.SIZEOF_BYTE;
-            int endElementPositionOfArray2 = getOffset(array2Bytes, actualLengthOfArray2 - 1, !useIntArray2, array2BytesOffset + offsetArrayPositionArray2);
-            int newEndElementPosition = lengthIncreaseForNulls + endElementPositionOfArray2 + offsetArrayPositionArray1 - 2 * Bytes.SIZEOF_BYTE;
-            //Creates a byre array to store the concatenated array
+            // Length increase incremented by one when there were no nulls at the beginning of array and when there are
+            // nulls at the end of array 1 as we need to allocate a byte for separator byte in this case.
+            lengthIncreaseForNulls += nullsAtTheBeginningOfArray2 == 0 && nullsAtTheEndOfArray1 != 0 ? Bytes.SIZEOF_BYTE
+                    : 0;
+            int newOffsetArrayPosition = offsetArrayPositionArray1 + offsetArrayPositionArray2 + lengthIncreaseForNulls
+                    - 2 * Bytes.SIZEOF_BYTE;
+            int endElementPositionOfArray2 = getOffset(array2Bytes, actualLengthOfArray2 - 1, !useIntArray2,
+                    array2BytesOffset + offsetArrayPositionArray2);
+            int newEndElementPosition = lengthIncreaseForNulls + endElementPositionOfArray2 + offsetArrayPositionArray1
+                    - 2 * Bytes.SIZEOF_BYTE;
+            // Creates a byte array to store the concatenated array
             if (PArrayDataType.useShortForOffsetArray(newEndElementPosition)) {
-                newArray = new byte[newOffsetArrayPosition + newArrayLength * Bytes.SIZEOF_SHORT + Bytes.SIZEOF_INT + Bytes.SIZEOF_INT + Bytes.SIZEOF_BYTE];
+                newArray = new byte[newOffsetArrayPosition + newArrayLength * Bytes.SIZEOF_SHORT + Bytes.SIZEOF_INT
+                        + Bytes.SIZEOF_INT + Bytes.SIZEOF_BYTE];
             } else {
                 useIntNewArray = true;
-                newArray = new byte[newOffsetArrayPosition + newArrayLength * Bytes.SIZEOF_INT + Bytes.SIZEOF_INT + Bytes.SIZEOF_INT + Bytes.SIZEOF_BYTE];
+                newArray = new byte[newOffsetArrayPosition + newArrayLength * Bytes.SIZEOF_INT + Bytes.SIZEOF_INT
+                        + Bytes.SIZEOF_INT + Bytes.SIZEOF_BYTE];
             }
 
             int currentPosition = 0;
-            //Copies all the elements from array 1 to new array
-            System.arraycopy(array1Bytes, array1BytesOffset, newArray, currentPosition, offsetArrayPositionArray1 - 2 * Bytes.SIZEOF_BYTE);
+            // Copies all the elements from array 1 to new array
+            System.arraycopy(array1Bytes, array1BytesOffset, newArray, currentPosition, offsetArrayPositionArray1 - 2
+                    * Bytes.SIZEOF_BYTE);
             currentPosition = offsetArrayPositionArray1 - 2 * Bytes.SIZEOF_BYTE;
             int array2StartingPosition = currentPosition;
             currentPosition += nullsInMiddleAfterConcat != 0 ? 1 : 0;
-            //Writes nulls in the middle of the array.
+            // Writes nulls in the middle of the array.
             currentPosition = serializeNulls(newArray, currentPosition, nullsInMiddleAfterConcat);
-            //Copies the elements from array 2 beginning from the first non null element.
-            System.arraycopy(array2Bytes, array2BytesOffset + array2FirstNonNullElementOffset, newArray, currentPosition, offsetArrayPositionArray2 - array2FirstNonNullElementOffset);
+            // Copies the elements from array 2 beginning from the first non null element.
+            System.arraycopy(array2Bytes, array2BytesOffset + array2FirstNonNullElementOffset, newArray,
+                    currentPosition, offsetArrayPositionArray2 - array2FirstNonNullElementOffset);
             currentPosition += offsetArrayPositionArray2 - array2FirstNonNullElementOffset;
 
-            //Writing offset arrays
+            // Writing offset arrays
             if (useIntNewArray) {
-                //offsets for the elements from array 1. Simply copied.
+                // offsets for the elements from array 1. Simply copied.
                 for (int index = 0; index < actualLengthOfArray1; index++) {
-                    int offset = getOffset(array1Bytes, index, !useIntArray1, array1BytesOffset + offsetArrayPositionArray1);
+                    int offset = getOffset(array1Bytes, index, !useIntArray1, array1BytesOffset
+                            + offsetArrayPositionArray1);
                     Bytes.putInt(newArray, currentPosition, offset);
                     currentPosition += Bytes.SIZEOF_INT;
                 }
-                //offsets for nulls in the middle
+                // offsets for nulls in the middle
                 for (int index = 0; index < array2FirstNonNullIndex; index++) {
-                    int offset = getOffset(array2Bytes, index, !useIntArray2, array2BytesOffset + offsetArrayPositionArray2);
+                    int offset = getOffset(array2Bytes, index, !useIntArray2, array2BytesOffset
+                            + offsetArrayPositionArray2);
                     Bytes.putInt(newArray, currentPosition, offset + array2StartingPosition);
                     currentPosition += Bytes.SIZEOF_INT;
                 }
-                //offsets for the elements from the first non null element from array 2
-                int part2NonNullStartingPosition = array2StartingPosition + bytesForNullsAfter + (bytesForNullsAfter == 0 ? 0 : Bytes.SIZEOF_BYTE);
+                // offsets for the elements from the first non null element from array 2
+                int part2NonNullStartingPosition = array2StartingPosition + bytesForNullsAfter
+                        + (bytesForNullsAfter == 0 ? 0 : Bytes.SIZEOF_BYTE);
                 for (int index = array2FirstNonNullIndex; index < actualLengthOfArray2; index++) {
-                    int offset = getOffset(array2Bytes, index, !useIntArray2, array2BytesOffset + offsetArrayPositionArray2);
-                    Bytes.putInt(newArray, currentPosition, offset - array2FirstNonNullElementOffset + part2NonNullStartingPosition);
+                    int offset = getOffset(array2Bytes, index, !useIntArray2, array2BytesOffset
+                            + offsetArrayPositionArray2);
+                    Bytes.putInt(newArray, currentPosition, offset - array2FirstNonNullElementOffset
+                            + part2NonNullStartingPosition);
                     currentPosition += Bytes.SIZEOF_INT;
                 }
             } else {
-                //offsets for the elements from array 1. Simply copied.
+                // offsets for the elements from array 1. Simply copied.
                 for (int index = 0; index < actualLengthOfArray1; index++) {
-                    int offset = getOffset(array1Bytes, index, !useIntArray1, array1BytesOffset + offsetArrayPositionArray1);
-                    Bytes.putShort(newArray, currentPosition, (short) (offset - Short.MAX_VALUE));
+                    int offset = getOffset(array1Bytes, index, !useIntArray1, array1BytesOffset
+                            + offsetArrayPositionArray1);
+                    Bytes.putShort(newArray, currentPosition, (short)(offset - Short.MAX_VALUE));
                     currentPosition += Bytes.SIZEOF_SHORT;
                 }
-                //offsets for nulls in the middle
+                // offsets for nulls in the middle
                 for (int index = 0; index < array2FirstNonNullIndex; index++) {
-                    int offset = getOffset(array2Bytes, index, !useIntArray2, array2BytesOffset + offsetArrayPositionArray2);
-                    Bytes.putShort(newArray, currentPosition, (short) (offset + array2StartingPosition - Short.MAX_VALUE));
+                    int offset = getOffset(array2Bytes, index, !useIntArray2, array2BytesOffset
+                            + offsetArrayPositionArray2);
+                    Bytes.putShort(newArray, currentPosition,
+                            (short)(offset + array2StartingPosition - Short.MAX_VALUE));
                     currentPosition += Bytes.SIZEOF_SHORT;
                 }
-                //offsets for the elements from the first non null element from array 2
-                int part2NonNullStartingPosition = array2StartingPosition + bytesForNullsAfter + (bytesForNullsAfter == 0 ? 0 : Bytes.SIZEOF_BYTE);
+                // offsets for the elements from the first non null element from array 2
+                int part2NonNullStartingPosition = array2StartingPosition + bytesForNullsAfter
+                        + (bytesForNullsAfter == 0 ? 0 : Bytes.SIZEOF_BYTE);
                 for (int index = array2FirstNonNullIndex; index < actualLengthOfArray2; index++) {
-                    int offset = getOffset(array2Bytes, index, !useIntArray2, array2BytesOffset + offsetArrayPositionArray2);
-                    Bytes.putShort(newArray, currentPosition, (short) (offset - array2FirstNonNullElementOffset + part2NonNullStartingPosition - Short.MAX_VALUE));
+                    int offset = getOffset(array2Bytes, index, !useIntArray2, array2BytesOffset
+                            + offsetArrayPositionArray2);
+                    Bytes.putShort(newArray, currentPosition, (short)(offset - array2FirstNonNullElementOffset
+                            + part2NonNullStartingPosition - Short.MAX_VALUE));
                     currentPosition += Bytes.SIZEOF_SHORT;
                 }
             }
@@ -856,7 +970,7 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
             noOfElements = -noOfElements;
         }
         int off = 0;
-        if(useInt) {
+        if (useInt) {
             for (int pos : offsetPos) {
                 Bytes.putInt(offsetArr, off, pos);
                 off += Bytes.SIZEOF_INT;
@@ -886,9 +1000,9 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
         oStream.write(ARRAY_SERIALIZATION_VERSION);
     }
 
-	public static int initOffsetArray(int noOfElements, int baseSize) {
-		// for now create an offset array equal to the noofelements
-		return noOfElements * baseSize;
+    public static int initOffsetArray(int noOfElements, int baseSize) {
+        // for now create an offset array equal to the noofelements
+        return noOfElements * baseSize;
     }
 
     // Any variable length array would follow the below order
@@ -898,11 +1012,11 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
     // Trailing nulls are not taken into account
     // The last non null element is followed by two seperator bytes
     // For eg
-    // a, b, null, null, c, null would be 
+    // a, b, null, null, c, null would be
     // 65 0 66 0 0 2 67 0 0 0
     // a null null null b c null d would be
     // 65 0 0 3 66 0 67 0 0 1 68 0 0 0
-	// Follow the above example to understand how this works
+    // Follow the above example to understand how this works
     private Object createPhoenixArray(byte[] bytes, int offset, int length, SortOrder sortOrder,
             PDataType baseDataType, Integer maxLength, PDataType desiredDataType) {
         if (bytes == null || length == 0) { return null; }
@@ -990,45 +1104,43 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
                 }
             }
         }
-        if(baseDataType == desiredDataType) {
+        if (baseDataType == desiredDataType) {
             return PArrayDataType.instantiatePhoenixArray(baseDataType, elements);
         } else {
             return PArrayDataType.instantiatePhoenixArray(desiredDataType, elements);
         }
     }
-	
+
     public static PhoenixArray instantiatePhoenixArray(PDataType actualType, Object[] elements) {
         return PDataType.instantiatePhoenixArray(actualType, elements);
     }
-	
-	public int compareTo(Object lhs, Object rhs) {
-		PhoenixArray lhsArr = (PhoenixArray) lhs;
-		PhoenixArray rhsArr = (PhoenixArray) rhs;
-		if(lhsArr.equals(rhsArr)) {
-			return 0;
-		}
-		return 1;
-	}
-
-	public static int getArrayLength(ImmutableBytesWritable ptr,
-			PDataType baseType, Integer maxLength) {
-		byte[] bytes = ptr.get();
-		if(baseType.isFixedWidth()) {
-		    int elemLength = maxLength == null ? baseType.getByteSize() : maxLength;
-			return (ptr.getLength() / elemLength);
-		}
-		return Bytes.toInt(bytes, (ptr.getOffset() + ptr.getLength() - (Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT)));
-	}
+
+    @Override
+    public int compareTo(Object lhs, Object rhs) {
+        PhoenixArray lhsArr = (PhoenixArray)lhs;
+        PhoenixArray rhsArr = (PhoenixArray)rhs;
+        if (lhsArr.equals(rhsArr)) { return 0; }
+        return 1;
+    }
+
+    public static int getArrayLength(ImmutableBytesWritable ptr, PDataType baseType, Integer maxLength) {
+        byte[] bytes = ptr.get();
+        if (baseType.isFixedWidth()) {
+            int elemLength = maxLength == null ? baseType.getByteSize() : maxLength;
+            return (ptr.getLength() / elemLength);
+        }
+        return Bytes.toInt(bytes, (ptr.getOffset() + ptr.getLength() - (Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT)));
+    }
 
     public static int estimateSize(int size, PDataType baseType) {
-        if(baseType.isFixedWidth()) {
+        if (baseType.isFixedWidth()) {
             return baseType.getByteSize() * size;
         } else {
             return size * ValueSchema.ESTIMATED_VARIABLE_LENGTH_SIZE;
         }
-        
+
     }
-    
+
     public Object getSampleValue(PDataType baseType, Integer arrayLength, Integer elemLength) {
         Preconditions.checkArgument(arrayLength == null || arrayLength >= 0);
         if (arrayLength == null) {
@@ -1047,17 +1159,18 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
         PhoenixArray array = (PhoenixArray)o;
         PDataType baseType = PDataType.arrayBaseType(this);
         int len = array.getDimensions();
-        if (len != 0)  {
+        if (len != 0) {
             for (int i = 0; i < len; i++) {
                 buf.append(baseType.toStringLiteral(array.getElement(i), null));
                 buf.append(',');
             }
-            buf.setLength(buf.length()-1);
+            buf.setLength(buf.length() - 1);
         }
         buf.append(']');
         return buf.toString();
     }
 
+    // FIXME: remove this duplicate code
     static public class PArrayDataTypeBytesArrayBuilder<T> {
         static private final int BYTE_ARRAY_DEFAULT_SIZE = 128;
 
@@ -1083,8 +1196,7 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
                 if (oStream != null) oStream.close();
                 byteStream = null;
                 oStream = null;
-            } catch (IOException ioe) {
-            }
+            } catch (IOException ioe) {}
         }
 
         public boolean appendElem(byte[] bytes) {
@@ -1105,7 +1217,7 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
                             SortOrder.invert(bytes, offset, bytes, offset, len);
                         }
                         oStream.write(bytes, offset, len);
-                        oStream.write(QueryConstants.SEPARATOR_BYTE);
+                        oStream.write(getSeparatorByte(true, sortOrder));
                     }
                 } else {
                     if (sortOrder == SortOrder.DESC) {
@@ -1114,12 +1226,11 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
                     oStream.write(bytes, offset, len);
                 }
                 return true;
-            } catch (IOException e) {
-            }
+            } catch (IOException e) {}
             return false;
         }
 
-        public byte[] getBytesAndClose() {
+        public byte[] getBytesAndClose(SortOrder sortOrder) {
             try {
                 if (!baseType.isFixedWidth()) {
                     int noOfElements = offsetPos.size();
@@ -1129,18 +1240,15 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
                         offsetPosArray[index] = i;
                         ++index;
                     }
-                    PArrayDataType.writeEndSeperatorForVarLengthArray(oStream);
-                    noOfElements =
-                            PArrayDataType.serailizeOffsetArrayIntoStream(oStream, byteStream,
-                                noOfElements, offsetPosArray[offsetPosArray.length - 1],
-                                offsetPosArray);
+                    PArrayDataType.writeEndSeperatorForVarLengthArray(oStream, sortOrder);
+                    noOfElements = PArrayDataType.serailizeOffsetArrayIntoStream(oStream, byteStream, noOfElements,
+                            offsetPosArray[offsetPosArray.length - 1], offsetPosArray);
                     serializeHeaderInfoIntoStream(oStream, noOfElements);
                 }
                 ImmutableBytesWritable ptr = new ImmutableBytesWritable();
                 ptr.set(byteStream.getBuffer(), 0, byteStream.size());
                 return ByteUtil.copyKeyBytesIfNecessary(ptr);
-            } catch (IOException e) {
-            } finally {
+            } catch (IOException e) {} finally {
                 close();
             }
             return null;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2620a80c/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PBinaryArray.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PBinaryArray.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PBinaryArray.java
index 16c6485..523f774 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PBinaryArray.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PBinaryArray.java
@@ -17,92 +17,70 @@
  */
 package org.apache.phoenix.schema.types;
 
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.schema.SortOrder;
 
-import java.sql.Types;
-
 public class PBinaryArray extends PArrayDataType<byte[][]> {
 
-  public static final PBinaryArray INSTANCE = new PBinaryArray();
-
-  private PBinaryArray() {
-    super("BINARY ARRAY", PDataType.ARRAY_TYPE_BASE + PBinary.INSTANCE.getSqlType(),
-        PhoenixArray.class, null, 28);
-  }
-
-  @Override
-  public boolean isArrayType() {
-    return true;
-  }
-
-  @Override
-  public boolean isFixedWidth() {
-    return false;
-  }
+    public static final PBinaryArray INSTANCE = new PBinaryArray();
 
-  @Override
-  public int compareTo(Object lhs, Object rhs, PDataType rhsType) {
-    return compareTo(lhs, rhs);
-  }
+    private PBinaryArray() {
+        super("BINARY ARRAY", PDataType.ARRAY_TYPE_BASE + PBinary.INSTANCE.getSqlType(), PhoenixArray.class, null, 28);
+    }
 
-  @Override
-  public Integer getByteSize() {
-    return null;
-  }
+    @Override
+    public boolean isArrayType() {
+        return true;
+    }
 
-  @Override
-  public byte[] toBytes(Object object) {
-    return toBytes(object, SortOrder.ASC);
-  }
+    @Override
+    public boolean isFixedWidth() {
+        return false;
+    }
 
-  @Override
-  public byte[] toBytes(Object object, SortOrder sortOrder) {
-    return toBytes(object, PBinary.INSTANCE, sortOrder);
-  }
+    @Override
+    public int compareTo(Object lhs, Object rhs, PDataType rhsType) {
+        return compareTo(lhs, rhs);
+    }
 
-  @Override
-  public Object toObject(byte[] bytes, int offset, int length,
-      PDataType actualType, SortOrder sortOrder, Integer maxLength, Integer scale) {
-    return toObject(bytes, offset, length, PBinary.INSTANCE, sortOrder, maxLength, scale,
-        PBinary.INSTANCE);
-  }
+    @Override
+    public Integer getByteSize() {
+        return null;
+    }
 
-  @Override
-  public boolean isCoercibleTo(PDataType targetType) {
-    return isCoercibleTo(targetType, this);
-  }
+    @Override
+    public byte[] toBytes(Object object) {
+        return toBytes(object, SortOrder.ASC);
+    }
 
-  @Override
-  public boolean isCoercibleTo(PDataType targetType, Object value) {
-    if (value == null) {
-      return true;
+    @Override
+    public byte[] toBytes(Object object, SortOrder sortOrder) {
+        return toBytes(object, PBinary.INSTANCE, sortOrder);
     }
-    PhoenixArray pArr = (PhoenixArray) value;
-    Object[] charArr = (Object[]) pArr.array;
-    for (Object i : charArr) {
-      if (!super.isCoercibleTo(PBinary.INSTANCE, i)) {
-        return false;
-      }
+
+    @Override
+    public Object toObject(byte[] bytes, int offset, int length, PDataType actualType, SortOrder sortOrder,
+            Integer maxLength, Integer scale) {
+        return toObject(bytes, offset, length, PBinary.INSTANCE, sortOrder, maxLength, scale, PBinary.INSTANCE);
     }
-    return true;
-  }
 
-  @Override
-  public void coerceBytes(ImmutableBytesWritable ptr, Object object, PDataType actualType,
-      Integer maxLength, Integer scale, SortOrder actualModifer, Integer desiredMaxLength,
-      Integer desiredScale, SortOrder desiredModifier) {
-    coerceBytes(ptr, object, actualType, maxLength, scale, desiredMaxLength, desiredScale,
-        this, actualModifer, desiredModifier);
-  }
+    @Override
+    public boolean isCoercibleTo(PDataType targetType) {
+        return isCoercibleTo(targetType, this);
+    }
 
-  @Override
-  public int getResultSetSqlType() {
-    return Types.ARRAY;
-  }
+    @Override
+    public boolean isCoercibleTo(PDataType targetType, Object value) {
+        if (value == null) { return true; }
+        PhoenixArray pArr = (PhoenixArray)value;
+        Object[] charArr = (Object[])pArr.array;
+        for (Object i : charArr) {
+            if (!super.isCoercibleTo(PBinary.INSTANCE, i)) { return false; }
+        }
+        return true;
+    }
 
-  @Override
-  public Object getSampleValue(Integer maxLength, Integer arrayLength) {
-    return getSampleValue(PBinary.INSTANCE, arrayLength, maxLength);
-  }
+    @Override
+    public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+        return getSampleValue(PBinary.INSTANCE, arrayLength, maxLength);
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2620a80c/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PBooleanArray.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PBooleanArray.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PBooleanArray.java
index 87bf9b4..742b0de 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PBooleanArray.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PBooleanArray.java
@@ -17,93 +17,71 @@
  */
 package org.apache.phoenix.schema.types;
 
-import java.sql.Types;
-
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.types.PhoenixArray.PrimitiveBooleanPhoenixArray;
 
 public class PBooleanArray extends PArrayDataType<boolean[]> {
 
-  public static final PBooleanArray INSTANCE = new PBooleanArray();
+    public static final PBooleanArray INSTANCE = new PBooleanArray();
 
-  private PBooleanArray() {
-    super("BOOLEAN ARRAY", PDataType.ARRAY_TYPE_BASE + PBoolean.INSTANCE.getSqlType(),
-        PhoenixArray.class, null, 25);
-  }
+    private PBooleanArray() {
+        super("BOOLEAN ARRAY", PDataType.ARRAY_TYPE_BASE + PBoolean.INSTANCE.getSqlType(), PhoenixArray.class, null, 25);
+    }
 
-  @Override
-  public boolean isArrayType() {
-    return true;
-  }
+    @Override
+    public boolean isArrayType() {
+        return true;
+    }
 
-  @Override
-  public boolean isFixedWidth() {
-    return false;
-  }
+    @Override
+    public boolean isFixedWidth() {
+        return false;
+    }
 
-  @Override
-  public int compareTo(Object lhs, Object rhs, PDataType rhsType) {
-    return compareTo(lhs, rhs);
-  }
+    @Override
+    public int compareTo(Object lhs, Object rhs, PDataType rhsType) {
+        return compareTo(lhs, rhs);
+    }
 
-  @Override
-  public Integer getByteSize() {
-    return null;
-  }
+    @Override
+    public Integer getByteSize() {
+        return null;
+    }
 
-  @Override
-  public byte[] toBytes(Object object) {
-    return toBytes(object, SortOrder.ASC);
-  }
+    @Override
+    public byte[] toBytes(Object object) {
+        return toBytes(object, SortOrder.ASC);
+    }
 
-  @Override
-  public byte[] toBytes(Object object, SortOrder sortOrder) {
-    return toBytes(object, PBoolean.INSTANCE, sortOrder);
-  }
+    @Override
+    public byte[] toBytes(Object object, SortOrder sortOrder) {
+        return toBytes(object, PBoolean.INSTANCE, sortOrder);
+    }
 
-  @Override
-  public Object toObject(byte[] bytes, int offset, int length,
-      PDataType actualType, SortOrder sortOrder, Integer maxLength, Integer scale) {
-    return toObject(bytes, offset, length, PBoolean.INSTANCE, sortOrder, maxLength, scale,
-        PBoolean.INSTANCE);
-  }
+    @Override
+    public Object toObject(byte[] bytes, int offset, int length, PDataType actualType, SortOrder sortOrder,
+            Integer maxLength, Integer scale) {
+        return toObject(bytes, offset, length, PBoolean.INSTANCE, sortOrder, maxLength, scale, PBoolean.INSTANCE);
+    }
 
-  @Override
-  public boolean isCoercibleTo(PDataType targetType) {
-    return isCoercibleTo(targetType, this);
-  }
+    @Override
+    public boolean isCoercibleTo(PDataType targetType) {
+        return isCoercibleTo(targetType, this);
+    }
 
     @Override
     public boolean isCoercibleTo(PDataType targetType, Object value) {
-        if (value == null) {
-            return true;
-        }
-        PrimitiveBooleanPhoenixArray pArr = (PrimitiveBooleanPhoenixArray) value;
-        boolean[] booleanArr = (boolean[]) pArr.array;
+        if (value == null) { return true; }
+        PrimitiveBooleanPhoenixArray pArr = (PrimitiveBooleanPhoenixArray)value;
+        boolean[] booleanArr = (boolean[])pArr.array;
         for (boolean b : booleanArr) {
-            if (!super.isCoercibleTo(PInteger.INSTANCE, b)) {
-                return false;
-            }
+            if (!super.isCoercibleTo(PInteger.INSTANCE, b)) { return false; }
         }
         return true;
     }
 
-  @Override
-  public int getResultSetSqlType() {
-    return Types.ARRAY;
-  }
-
-  @Override
-  public void coerceBytes(ImmutableBytesWritable ptr, Object object, PDataType actualType,
-      Integer maxLength, Integer scale, SortOrder actualModifer, Integer desiredMaxLength,
-      Integer desiredScale, SortOrder desiredModifier) {
-    coerceBytes(ptr, object, actualType, maxLength, scale, desiredMaxLength, desiredScale,
-        this, actualModifer, desiredModifier);
-  }
-
-  @Override
-  public Object getSampleValue(Integer maxLength, Integer arrayLength) {
-    return getSampleValue(PBoolean.INSTANCE, arrayLength, maxLength);
-  }
+    @Override
+    public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+        return getSampleValue(PBoolean.INSTANCE, arrayLength, maxLength);
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2620a80c/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PCharArray.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PCharArray.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PCharArray.java
index 89ec5db..a740c7f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PCharArray.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PCharArray.java
@@ -17,92 +17,76 @@
  */
 package org.apache.phoenix.schema.types;
 
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.schema.SortOrder;
 
-import java.sql.Types;
-
 public class PCharArray extends PArrayDataType<String[]> {
 
-  public static final PCharArray INSTANCE = new PCharArray();
-
-  private PCharArray() {
-    super("CHAR ARRAY", PDataType.ARRAY_TYPE_BASE + PChar.INSTANCE.getSqlType(), PhoenixArray.class,
-        null, 29);
-  }
-
-  @Override
-  public boolean isArrayType() {
-    return true;
-  }
-
-  @Override
-  public boolean isFixedWidth() {
-    return false;
-  }
+    public static final PCharArray INSTANCE = new PCharArray();
 
-  @Override
-  public int compareTo(Object lhs, Object rhs, PDataType rhsType) {
-    return compareTo(lhs, rhs);
-  }
+    private PCharArray() {
+        super("CHAR ARRAY", PDataType.ARRAY_TYPE_BASE + PChar.INSTANCE.getSqlType(), PhoenixArray.class,
+                null, 29);
+    }
 
-  @Override
-  public Integer getByteSize() {
-    return null;
-  }
+    @Override
+    public boolean isArrayType() {
+        return true;
+    }
 
-  @Override
-  public byte[] toBytes(Object object) {
-    return toBytes(object, SortOrder.ASC);
-  }
+    @Override
+    public boolean isFixedWidth() {
+        return false;
+    }
 
-  @Override
-  public byte[] toBytes(Object object, SortOrder sortOrder) {
-    return toBytes(object, PChar.INSTANCE, sortOrder);
-  }
+    @Override
+    public int compareTo(Object lhs, Object rhs, PDataType rhsType) {
+        return compareTo(lhs, rhs);
+    }
 
-  @Override
-  public Object toObject(byte[] bytes, int offset, int length,
-      PDataType actualType, SortOrder sortOrder, Integer maxLength, Integer scale) {
-    return toObject(bytes, offset, length, PChar.INSTANCE, sortOrder, maxLength, scale,
-        PChar.INSTANCE);
-  }
+    @Override
+    public Integer getByteSize() {
+        return null;
+    }
 
-  @Override
-  public boolean isCoercibleTo(PDataType targetType) {
-    return isCoercibleTo(targetType, this);
-  }
+    @Override
+    public byte[] toBytes(Object object) {
+        return toBytes(object, SortOrder.ASC);
+    }
 
-  @Override
-  public boolean isCoercibleTo(PDataType targetType, Object value) {
-    if (value == null) {
-      return true;
+    @Override
+    public byte[] toBytes(Object object, SortOrder sortOrder) {
+        return toBytes(object, PChar.INSTANCE, sortOrder);
     }
-    PhoenixArray pArr = (PhoenixArray) value;
-    Object[] charArr = (Object[]) pArr.array;
-    for (Object i : charArr) {
-      if (!super.isCoercibleTo(PChar.INSTANCE, i)) {
-        return false;
-      }
+
+    @Override
+    public Object toObject(byte[] bytes, int offset, int length,
+            PDataType actualType, SortOrder sortOrder, Integer maxLength, Integer scale) {
+        return toObject(bytes, offset, length, PChar.INSTANCE, sortOrder, maxLength, scale,
+                PChar.INSTANCE);
     }
-    return true;
-  }
 
-  @Override
-  public void coerceBytes(ImmutableBytesWritable ptr, Object object, PDataType actualType,
-      Integer maxLength, Integer scale, SortOrder actualModifer, Integer desiredMaxLength,
-      Integer desiredScale, SortOrder desiredModifier) {
-    coerceBytes(ptr, object, actualType, maxLength, scale, desiredMaxLength, desiredScale,
-        this, actualModifer, desiredModifier);
-  }
+    @Override
+    public boolean isCoercibleTo(PDataType targetType) {
+        return isCoercibleTo(targetType, this);
+    }
 
-  @Override
-  public int getResultSetSqlType() {
-    return Types.ARRAY;
-  }
+    @Override
+    public boolean isCoercibleTo(PDataType targetType, Object value) {
+        if (value == null) {
+            return true;
+        }
+        PhoenixArray pArr = (PhoenixArray) value;
+        Object[] charArr = (Object[]) pArr.array;
+        for (Object i : charArr) {
+            if (!super.isCoercibleTo(PChar.INSTANCE, i)) {
+                return false;
+            }
+        }
+        return true;
+    }
 
-  @Override
-  public Object getSampleValue(Integer maxLength, Integer arrayLength) {
-    return getSampleValue(PChar.INSTANCE, arrayLength, maxLength);
-  }
+    @Override
+    public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+        return getSampleValue(PChar.INSTANCE, arrayLength, maxLength);
+    }
 }