You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/01/27 20:23:09 UTC

[08/51] [partial] Initial commit

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/util/ByteUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/util/ByteUtil.java b/src/main/java/org/apache/phoenix/util/ByteUtil.java
new file mode 100644
index 0000000..f654c9e
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/util/ByteUtil.java
@@ -0,0 +1,559 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.WritableUtils;
+
+import org.apache.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+
+
+/**
+ * 
+ * Byte utilities
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class ByteUtil {
+    public static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
+    
+    public static final Comparator<ImmutableBytesPtr> BYTES_PTR_COMPARATOR = new Comparator<ImmutableBytesPtr>() {
+
+        @Override
+        public int compare(ImmutableBytesPtr o1, ImmutableBytesPtr o2) {
+            return Bytes.compareTo(o1.get(), o1.getOffset(), o1.getLength(), o2.get(), o2.getOffset(), o2.getLength());
+        }
+        
+    };
+
+    /**
+     * Serialize an array of byte arrays into a single byte array.  Used
+     * to pass through a set of bytes arrays as an attribute of a Scan.
+     * Use {@link #toByteArrays(byte[], int)} to convert the serialized
+     * byte array back to the array of byte arrays.
+     * @param byteArrays the array of byte arrays to serialize
+     * @return the byte array
+     */
+    public static byte[] toBytes(byte[][] byteArrays) {
+        int size = 0;
+        for (byte[] b : byteArrays) {
+            if (b == null) {
+                size++;
+            } else {
+                size += b.length;
+                size += WritableUtils.getVIntSize(b.length);
+            }
+        }
+        TrustedByteArrayOutputStream bytesOut = new TrustedByteArrayOutputStream(size);
+        DataOutputStream out = new DataOutputStream(bytesOut);
+        try {
+            for (byte[] b : byteArrays) {
+                if (b == null) {
+                    WritableUtils.writeVInt(out, 0);
+                } else {
+                    WritableUtils.writeVInt(out, b.length);
+                    out.write(b);
+                }
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e); // not possible
+        } finally {
+            try {
+                out.close();
+            } catch (IOException e) {
+                throw new RuntimeException(e); // not possible
+            }
+        }
+        return bytesOut.getBuffer();
+    }
+
+    /**
+     * Deserialize a byte array into a set of byte arrays.  Used in
+     * coprocessor to reconstruct byte arrays from attribute value
+     * passed through the Scan.
+     * @param b byte array containing serialized byte arrays (created by {@link #toBytes(byte[][])}).
+     * @param length number of byte arrays that were serialized
+     * @return array of now deserialized byte arrays
+     * @throws IllegalStateException if there are more than length number of byte arrays that were serialized
+     */
+    public static byte[][] toByteArrays(byte[] b, int length) {
+        return toByteArrays(b, 0, length);
+    }
+
+    public static byte[][] toByteArrays(byte[] b, int offset, int length) {
+        ByteArrayInputStream bytesIn = new ByteArrayInputStream(b, offset, b.length - offset);
+        DataInputStream in = new DataInputStream(bytesIn);
+        byte[][] byteArrays = new byte[length][];
+        try {
+            for (int i = 0; i < length; i++) {
+                int bLength = WritableUtils.readVInt(in);
+                if (bLength == 0) {
+                    byteArrays[i] = null;
+                } else {
+                    byteArrays[i] = new byte[bLength];
+                    int rLength = in.read(byteArrays[i], 0, bLength);
+                    assert (rLength == bLength); // For find bugs
+                }
+            }
+            if (in.read() != -1) {
+                throw new IllegalStateException("Expected only " + length + " byte arrays, but found more");
+            }
+            return byteArrays;
+        } catch (IOException e) {
+            throw new RuntimeException(e); // not possible
+        } finally {
+            try {
+                in.close();
+            } catch (IOException e) {
+                throw new RuntimeException(e); // not possible
+            }
+        }
+    }
+
+    public static byte[] serializeVIntArray(int[] intArray) {
+        return serializeVIntArray(intArray,intArray.length);
+    }
+
+    public static byte[] serializeVIntArray(int[] intArray, int encodedLength) {
+        int size = WritableUtils.getVIntSize(encodedLength);
+        for (int i = 0; i < intArray.length; i++) {
+            size += WritableUtils.getVIntSize(intArray[i]);
+        }
+        int offset = 0;
+        byte[] out = new byte[size];
+        offset += ByteUtil.vintToBytes(out, offset, size);
+        for (int i = 0; i < intArray.length; i++) {
+            offset += ByteUtil.vintToBytes(out, offset, intArray[i]);
+        }
+        return out;
+    }
+
+    public static void serializeVIntArray(DataOutput output, int[] intArray) throws IOException {
+        serializeVIntArray(output, intArray, intArray.length);
+    }
+
+    /**
+     * Allows additional stuff to be encoded in length
+     * @param output
+     * @param intArray
+     * @param encodedLength
+     * @throws IOException
+     */
+    public static void serializeVIntArray(DataOutput output, int[] intArray, int encodedLength) throws IOException {
+        WritableUtils.writeVInt(output, encodedLength);
+        for (int i = 0; i < intArray.length; i++) {
+            WritableUtils.writeVInt(output, intArray[i]);
+        }
+    }
+
+    public static long[] readFixedLengthLongArray(DataInput input, int length) throws IOException {
+        long[] longArray = new long[length];
+        for (int i = 0; i < length; i++) {
+            longArray[i] = input.readLong();
+        }
+        return longArray;
+    }
+
+    public static void writeFixedLengthLongArray(DataOutput output, long[] longArray) throws IOException {
+        for (int i = 0; i < longArray.length; i++) {
+            output.writeLong(longArray[i]);
+        }
+    }
+
+    /**
+     * Deserialize a byte array into a int array.  
+     * @param b byte array storing serialized vints
+     * @return int array
+     */
+    public static int[] deserializeVIntArray(byte[] b) {
+        ByteArrayInputStream bytesIn = new ByteArrayInputStream(b);
+        DataInputStream in = new DataInputStream(bytesIn);
+        try {
+            int length = WritableUtils.readVInt(in);
+            return deserializeVIntArray(in, length);
+        } catch (IOException e) {
+            throw new RuntimeException(e); // not possible
+        } finally {
+            try {
+                in.close();
+            } catch (IOException e) {
+                throw new RuntimeException(e); // not possible
+            }
+        }
+    }
+
+    public static int[] deserializeVIntArray(DataInput in) throws IOException {
+        return deserializeVIntArray(in, WritableUtils.readVInt(in));
+    }
+
+    public static int[] deserializeVIntArray(DataInput in, int length) throws IOException {
+        int i = 0;
+        int[] intArray = new int[length];
+        while (i < length) {
+            intArray[i++] = WritableUtils.readVInt(in);
+        }
+        return intArray;
+    }
+
+    /**
+     * Deserialize a byte array into a int array.  
+     * @param b byte array storing serialized vints
+     * @param length number of serialized vints
+     * @return int array
+     */
+    public static int[] deserializeVIntArray(byte[] b, int length) {
+        ByteArrayInputStream bytesIn = new ByteArrayInputStream(b);
+        DataInputStream in = new DataInputStream(bytesIn);
+        try {
+            return deserializeVIntArray(in,length);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Concatenate together one or more byte arrays
+     * @param first first byte array
+     * @param rest rest of byte arrays
+     * @return newly allocated byte array that is a concatenation of all the byte arrays passed in
+     */
+    public static byte[] concat(byte[] first, byte[]... rest) {
+        int totalLength = first.length;
+        for (byte[] array : rest) {
+            totalLength += array.length;
+        }
+        byte[] result = Arrays.copyOf(first, totalLength);
+        int offset = first.length;
+        for (byte[] array : rest) {
+            System.arraycopy(array, 0, result, offset, array.length);
+            offset += array.length;
+        }
+        return result;
+    }
+
+    public static <T> T[] concat(T[] first, T[]... rest) {
+        int totalLength = first.length;
+        for (T[] array : rest) {
+          totalLength += array.length;
+        }
+        T[] result = Arrays.copyOf(first, totalLength);
+        int offset = first.length;
+        for (T[] array : rest) {
+            System.arraycopy(array, 0, result, offset, array.length);
+            offset += array.length;
+        }
+        return result;
+    }
+
+    public static byte[] concat(ColumnModifier columnModifier, ImmutableBytesWritable... writables) {
+        int totalLength = 0;
+        for (ImmutableBytesWritable writable : writables) {
+            totalLength += writable.getLength();
+        }
+        byte[] result = new byte[totalLength];
+        int offset = 0;
+        for (ImmutableBytesWritable array : writables) {
+            byte[] bytes = array.get();
+            if (columnModifier != null) {
+                bytes = columnModifier.apply(bytes, array.getOffset(), new byte[array.getLength()], 0, array.getLength());
+            }
+            System.arraycopy(bytes, array.getOffset(), result, offset, array.getLength());
+            offset += array.getLength();
+        }
+        return result;
+    }
+
+    public static int vintFromBytes(byte[] buffer, int offset) {
+        try {
+            return (int)Bytes.readVLong(buffer, offset);
+        } catch (IOException e) { // Impossible
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Decode a vint from the buffer pointed at to by ptr and
+     * increment the offset of the ptr by the length of the
+     * vint.
+     * @param ptr a pointer to a byte array buffer
+     * @return the decoded vint value as an int
+     */
+    public static int vintFromBytes(ImmutableBytesWritable ptr) {
+        return (int) vlongFromBytes(ptr);
+    }
+
+    /**
+     * Decode a vint from the buffer pointed at to by ptr and
+     * increment the offset of the ptr by the length of the
+     * vint.
+     * @param ptr a pointer to a byte array buffer
+     * @return the decoded vint value as a long
+     */
+    public static long vlongFromBytes(ImmutableBytesWritable ptr) {
+        final byte [] buffer = ptr.get();
+        final int offset = ptr.getOffset();
+        byte firstByte = buffer[offset];
+        int len = WritableUtils.decodeVIntSize(firstByte);
+        if (len == 1) {
+            ptr.set(buffer, offset+1, ptr.getLength());
+            return firstByte;
+        }
+        long i = 0;
+        for (int idx = 0; idx < len-1; idx++) {
+            byte b = buffer[offset + 1 + idx];
+            i = i << 8;
+            i = i | (b & 0xFF);
+        }
+        ptr.set(buffer, offset+len, ptr.getLength());
+        return (WritableUtils.isNegativeVInt(firstByte) ? ~i : i);
+    }
+
+    
+    /**
+     * Put long as variable length encoded number at the offset in the result byte array
+     * @param vint Integer to make a vint of.
+     * @param result buffer to put vint into
+     * @return Vint length in bytes of vint
+     */
+    public static int vintToBytes(byte[] result, int offset, final long vint) {
+      long i = vint;
+      if (i >= -112 && i <= 127) {
+        result[offset] = (byte) i;
+        return 1;
+      }
+
+      int len = -112;
+      if (i < 0) {
+        i ^= -1L; // take one's complement'
+        len = -120;
+      }
+
+      long tmp = i;
+      while (tmp != 0) {
+        tmp = tmp >> 8;
+        len--;
+      }
+
+      result[offset++] = (byte) len;
+
+      len = (len < -120) ? -(len + 120) : -(len + 112);
+
+      for (int idx = len; idx != 0; idx--) {
+        int shiftbits = (idx - 1) * 8;
+        long mask = 0xFFL << shiftbits;
+        result[offset++] = (byte)((i & mask) >> shiftbits);
+      }
+      return len + 1;
+    }
+
+    /**
+     * Increment the key to the next key
+     * @param key the key to increment
+     * @return a new byte array with the next key or null
+     *  if the key could not be incremented because it's
+     *  already at its max value.
+     */
+    public static byte[] nextKey(byte[] key) {
+        byte[] nextStartRow = new byte[key.length];
+        System.arraycopy(key, 0, nextStartRow, 0, key.length);
+        if (!nextKey(nextStartRow, nextStartRow.length)) {
+            return null;
+        }
+        return nextStartRow;
+    }
+
+    /**
+     * Increment the key in-place to the next key
+     * @param key the key to increment
+     * @param length the length of the key
+     * @return true if the key can be incremented and
+     *  false otherwise if the key is at its max
+     *  value.
+     */
+    public static boolean nextKey(byte[] key, int length) {
+        return nextKey(key, 0, length);
+    }
+    
+    public static boolean nextKey(byte[] key, int offset, int length) {
+        if (length == 0) {
+            return false;
+        }
+        int i = offset + length - 1;
+        while (key[i] == -1) {
+            key[i] = 0;
+            i--;
+            if (i < offset) {
+                // Change bytes back to the way they were
+                do {
+                    key[++i] = -1;
+                } while (i < offset + length - 1);
+                return false;
+            }
+         }
+        key[i] = (byte)(key[i] + 1);
+        return true;
+    }
+
+    public static byte[] previousKey(byte[] key) {
+        byte[] previousKey = new byte[key.length];
+        System.arraycopy(key, 0, previousKey, 0, key.length);
+        if (!previousKey(previousKey, previousKey.length)) {
+            return null;
+        }
+        return previousKey;
+    }
+
+    public static boolean previousKey(byte[] key, int length) {
+        return previousKey(key, 0, length);
+    }
+    
+    public static boolean previousKey(byte[] key, int offset, int length) {
+        if (length == 0) {
+            return false;
+        }
+        int i = offset + length - 1;
+        while (key[i] == 0) {
+            key[i] = -1;
+            i--;
+            if (i < offset) {
+                // Change bytes back to the way they were
+                do {
+                    key[++i] = 0;
+                } while (i < offset + length - 1);
+                return false;
+            }
+         }
+        key[i] = (byte)(key[i] - 1);
+        return true;
+    }
+
+    /**
+     * Expand the key to length bytes using the fillByte to fill the
+     * bytes beyond the current key length.
+     */
+    public static byte[] fillKey(byte[] key, int length) {
+        if(key.length > length) {
+            throw new IllegalStateException();
+        }
+        if (key.length == length) {
+            return key;
+        }
+        byte[] newBound = new byte[length];
+        System.arraycopy(key, 0, newBound, 0, key.length);
+        return newBound;
+    }
+
+    /**
+     * Get the size in bytes of the UTF-8 encoded CharSequence
+     * @param sequence the CharSequence
+     */
+    public static int getSize(CharSequence sequence) {
+        int count = 0;
+        for (int i = 0, len = sequence.length(); i < len; i++) {
+          char ch = sequence.charAt(i);
+          if (ch <= 0x7F) {
+            count++;
+          } else if (ch <= 0x7FF) {
+            count += 2;
+          } else if (Character.isHighSurrogate(ch)) {
+            count += 4;
+            ++i;
+          } else {
+            count += 3;
+          }
+        }
+        return count;
+    }
+
+    public static boolean isInclusive(CompareOp op) {
+        switch (op) {
+            case LESS:
+            case GREATER:
+                return false;
+            case EQUAL:
+            case NOT_EQUAL:
+            case LESS_OR_EQUAL:
+            case GREATER_OR_EQUAL:
+                return true;
+            default:
+              throw new RuntimeException("Unknown Compare op " + op.name());
+        }
+    }
+    public static boolean compare(CompareOp op, int compareResult) {
+        switch (op) {
+            case LESS:
+              return compareResult < 0;
+            case LESS_OR_EQUAL:
+              return compareResult <= 0;
+            case EQUAL:
+              return compareResult == 0;
+            case NOT_EQUAL:
+              return compareResult != 0;
+            case GREATER_OR_EQUAL:
+              return compareResult >= 0;
+            case GREATER:
+              return compareResult > 0;
+            default:
+              throw new RuntimeException("Unknown Compare op " + op.name());
+        }
+    }
+
+    /**
+     * Given an ImmutableBytesWritable, returns the payload part of the argument as an byte array. 
+     */
+    public static byte[] copyKeyBytesIfNecessary(ImmutableBytesWritable ptr) {
+        if (ptr.getOffset() == 0 && ptr.getLength() == ptr.get().length) {
+            return ptr.get();
+        }
+        return ptr.copyBytes();
+    }
+    
+    public static KeyRange getKeyRange(byte[] key, CompareOp op, PDataType type) {
+        switch (op) {
+        case EQUAL:
+            return type.getKeyRange(key, true, key, true);
+        case GREATER:
+            return type.getKeyRange(key, false, KeyRange.UNBOUND, false);
+        case GREATER_OR_EQUAL:
+            return type.getKeyRange(key, true, KeyRange.UNBOUND, false);
+        case LESS:
+            return type.getKeyRange(KeyRange.UNBOUND, false, key, false);
+        case LESS_OR_EQUAL:
+            return type.getKeyRange(KeyRange.UNBOUND, false, key, true);
+        default:
+            throw new IllegalArgumentException("Unknown operator " + op);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/util/CSVLoader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/util/CSVLoader.java b/src/main/java/org/apache/phoenix/util/CSVLoader.java
new file mode 100644
index 0000000..57c28ec
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/util/CSVLoader.java
@@ -0,0 +1,250 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import au.com.bytecode.opencsv.CSVReader;
+import com.google.common.collect.Maps;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.PDataType;
+
+import java.io.FileReader;
+import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/***
+ * Upserts CSV data using Phoenix JDBC connection
+ * 
+ * @author mchohan
+ * 
+ */
+public class CSVLoader {
+
+	private final PhoenixConnection conn;
+	private final String tableName;
+    private final List<String> columns;
+    private final boolean isStrict;
+    private final List<String> delimiter;
+    private final Map<String,Character> ctrlTable = new HashMap<String,Character>() {
+        {   put("1",'\u0001');
+            put("2",'\u0002');
+            put("3",'\u0003');
+            put("4",'\u0004');
+            put("5",'\u0005');
+            put("6",'\u0006');
+            put("7",'\u0007');
+            put("8",'\u0008');
+            put("9",'\u0009');}};
+    
+    private int unfoundColumnCount;
+
+    public CSVLoader(PhoenixConnection conn, String tableName, List<String> columns, boolean isStrict,List<String> delimiter) {
+        this.conn = conn;
+        this.tableName = tableName;
+        this.columns = columns;
+        this.isStrict = isStrict;
+        this.delimiter = delimiter;
+    }
+
+	public CSVLoader(PhoenixConnection conn, String tableName, List<String> columns, boolean isStrict) {
+       this(conn,tableName,columns,isStrict,null);
+    }
+
+
+    /**
+	 * Upserts data from CSV file. Data is batched up based on connection batch
+	 * size. Column PDataType is read from metadata and is used to convert
+	 * column value to correct type before upsert. Note: Column Names are
+	 * expected as first line of CSV file.
+	 * 
+	 * @param fileName
+	 * @throws Exception
+	 */
+	public void upsert(String fileName) throws Exception {
+        List<String> delimiter = this.delimiter;
+        CSVReader reader;
+        if ((delimiter != null) && (delimiter.size() == 3)) {
+            reader = new CSVReader(new FileReader(fileName),
+                getCSVCustomField(this.delimiter.get(0)),
+                getCSVCustomField(this.delimiter.get(1)),
+                getCSVCustomField(this.delimiter.get(2)));
+        } else {
+            reader = new CSVReader(new FileReader(fileName));
+        }
+        upsert(reader);
+	}
+
+
+    public char getCSVCustomField(String field) {
+        if(this.ctrlTable.containsKey(field)) {
+            return this.ctrlTable.get(field);
+        } else {
+            return field.charAt(0);
+        }
+    }
+
+	/**
+	 * Upserts data from CSV file. Data is batched up based on connection batch
+	 * size. Column PDataType is read from metadata and is used to convert
+	 * column value to correct type before upsert. Note: Column Names are
+	 * expected as first line of CSV file.
+	 * 
+	 * @param reader CSVReader instance
+	 * @throws Exception
+	 */
+	public void upsert(CSVReader reader) throws Exception {
+	    List<String> columns = this.columns;
+	    if (columns != null && columns.isEmpty()) {
+	        columns = Arrays.asList(reader.readNext());
+	    }
+		ColumnInfo[] columnInfo = generateColumnInfo(columns);
+        PreparedStatement stmt = null;
+        PreparedStatement[] stmtCache = null;
+		if (columns == null) {
+		    stmtCache = new PreparedStatement[columnInfo.length];
+		} else {
+		    String upsertStatement = QueryUtil.constructUpsertStatement(columnInfo, tableName, columnInfo.length - unfoundColumnCount);
+		    stmt = conn.prepareStatement(upsertStatement);
+		}
+		String[] nextLine;
+		int rowCount = 0;
+		int upsertBatchSize = conn.getMutateBatchSize();
+		boolean wasAutoCommit = conn.getAutoCommit();
+		try {
+    		conn.setAutoCommit(false);
+    		Object upsertValue = null;
+    		long start = System.currentTimeMillis();
+    
+    		// Upsert data based on SqlType of each column
+    		while ((nextLine = reader.readNext()) != null) {
+    		    if (columns == null) {
+    		        stmt = stmtCache[nextLine.length-1];
+    		        if (stmt == null) {
+    	                String upsertStatement = QueryUtil.constructUpsertStatement(columnInfo, tableName, nextLine.length);
+    	                stmt = conn.prepareStatement(upsertStatement);
+    	                stmtCache[nextLine.length-1] = stmt;
+    		        }
+    		    }
+    			for (int index = 0; index < columnInfo.length; index++) {
+    			    if (columnInfo[index] == null) {
+    			        continue;
+    			    }
+                    String line = nextLine[index];
+                    Integer info = columnInfo[index].getSqlType();
+                    upsertValue = convertTypeSpecificValue(line, info);
+    				if (upsertValue != null) {
+    					stmt.setObject(index + 1, upsertValue, columnInfo[index].getSqlType());
+    				} else {
+    					stmt.setNull(index + 1, columnInfo[index].getSqlType());
+    				}
+    			}
+    			stmt.execute();
+    
+    			// Commit when batch size is reached
+    			if (++rowCount % upsertBatchSize == 0) {
+    				conn.commit();
+    				System.out.println("Rows upserted: " + rowCount);
+    			}
+    		}
+    		conn.commit();
+    		double elapsedDuration = ((System.currentTimeMillis() - start) / 1000.0);
+    		System.out.println("CSV Upsert complete. " + rowCount + " rows upserted");
+    		System.out.println("Time: " + elapsedDuration + " sec(s)\n");
+		} finally {
+		    if(stmt != null) {
+		        stmt.close();
+		    }
+		    if (wasAutoCommit) conn.setAutoCommit(true);
+		}
+	}
+	
+	/**
+	 * Gets CSV string input converted to correct type 
+	 */
+	private Object convertTypeSpecificValue(String s, Integer sqlType) throws Exception {
+	    return PDataType.fromSqlType(sqlType).toObject(s);
+	}
+
+	/**
+	 * Get array of ColumnInfos that contain Column Name and its associated
+	 * PDataType
+	 * 
+	 * @param columns
+	 * @return
+	 * @throws SQLException
+	 */
+	private ColumnInfo[] generateColumnInfo(List<String> columns)
+			throws SQLException {
+	    Map<String,Integer> columnNameToTypeMap = Maps.newLinkedHashMap();
+        DatabaseMetaData dbmd = conn.getMetaData();
+        // TODO: escape wildcard characters here because we don't want that behavior here
+        String escapedTableName = StringUtil.escapeLike(tableName);
+        String[] schemaAndTable = escapedTableName.split("\\.");
+        ResultSet rs = null;
+        try {
+            rs = dbmd.getColumns(null, (schemaAndTable.length == 1 ? "" : schemaAndTable[0]),
+                    (schemaAndTable.length == 1 ? escapedTableName : schemaAndTable[1]),
+                    null);
+            while (rs.next()) {
+                columnNameToTypeMap.put(rs.getString(QueryUtil.COLUMN_NAME_POSITION), rs.getInt(QueryUtil.DATA_TYPE_POSITION));
+            }
+        } finally {
+            if(rs != null) {
+                rs.close();
+            }
+        }
+        ColumnInfo[] columnType;
+	    if (columns == null) {
+            int i = 0;
+            columnType = new ColumnInfo[columnNameToTypeMap.size()];
+            for (Map.Entry<String, Integer> entry : columnNameToTypeMap.entrySet()) {
+                columnType[i++] = new ColumnInfo(entry.getKey(),entry.getValue());
+            }
+	    } else {
+            // Leave "null" as indication to skip b/c it doesn't exist
+            columnType = new ColumnInfo[columns.size()];
+            for (int i = 0; i < columns.size(); i++) {
+                String columnName = SchemaUtil.normalizeIdentifier(columns.get(i).trim());
+                Integer sqlType = columnNameToTypeMap.get(columnName);
+                if (sqlType == null) {
+                    if (isStrict) {
+                        throw new SQLExceptionInfo.Builder(SQLExceptionCode.COLUMN_NOT_FOUND)
+                            .setColumnName(columnName).setTableName(tableName).build().buildException();
+                    }
+                    unfoundColumnCount++;
+                } else {
+                    columnType[i] = new ColumnInfo(columnName, sqlType);
+                }
+            }
+            if (unfoundColumnCount == columns.size()) {
+                throw new SQLExceptionInfo.Builder(SQLExceptionCode.COLUMN_NOT_FOUND)
+                    .setColumnName(Arrays.toString(columns.toArray(new String[0]))).setTableName(tableName).build().buildException();
+            }
+	    }
+		return columnType;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/util/Closeables.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/util/Closeables.java b/src/main/java/org/apache/phoenix/util/Closeables.java
new file mode 100644
index 0000000..b09ebe0
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/util/Closeables.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.*;
+
+import com.google.common.collect.Iterables;
+
+
+/**
+ * Utilities for operating on {@link Closeable}s.
+ * 
+ */
+public class Closeables {
+    /** Not constructed */
+    private Closeables() { }
+    
+    /**
+     * Allows you to close as many of the {@link Closeable}s as possible.
+     * 
+     * If any of the close's fail with an IOException, those exception(s) will
+     * be thrown after attempting to close all of the inputs.
+     */
+    public static void closeAll(Iterable<? extends Closeable> iterable) throws IOException {
+        IOException ex = closeAllQuietly(iterable);
+        if (ex != null) throw ex;
+    }
+ 
+    public static IOException closeAllQuietly(Iterable<? extends Closeable> iterable) {
+        if (iterable == null) return null;
+        
+        LinkedList<IOException> exceptions = null;
+        for (Closeable closeable : iterable) {
+            try {
+                closeable.close();
+            } catch (IOException x) {
+                if (exceptions == null) exceptions = new LinkedList<IOException>();
+                exceptions.add(x);
+            }
+        }
+        
+        IOException ex = MultipleCausesIOException.fromIOExceptions(exceptions);
+        return ex;
+    }
+
+    static private class MultipleCausesIOException extends IOException {
+    	private static final long serialVersionUID = 1L;
+
+        static IOException fromIOExceptions(Collection<? extends IOException> exceptions) {
+            if (exceptions == null || exceptions.isEmpty()) return null;
+            if (exceptions.size() == 1) return Iterables.getOnlyElement(exceptions);
+            
+            return new MultipleCausesIOException(exceptions);
+        }
+        
+        private final Collection<? extends IOException> exceptions;
+        private boolean hasSetStackTrace;
+        
+        /**
+         * Use the {@link #fromIOExceptions(Collection) factory}.
+         */
+        private MultipleCausesIOException(Collection<? extends IOException> exceptions) {
+            this.exceptions = exceptions;
+        }
+
+        @Override
+        public String getMessage() {
+            StringBuilder sb = new StringBuilder(this.exceptions.size() * 50);
+            int exceptionNum = 0;
+            for (IOException ex : this.exceptions) {
+                sb.append("Cause Number " + exceptionNum + ": " + ex.getMessage() + "\n");
+                exceptionNum++;
+            }
+            return sb.toString();
+        }
+        
+        @Override
+        public StackTraceElement[] getStackTrace() {
+            if (!this.hasSetStackTrace) {
+                ArrayList<StackTraceElement> frames = new ArrayList<StackTraceElement>(this.exceptions.size() * 20);
+                
+                int exceptionNum = 0;
+                for (IOException exception : this.exceptions) {
+                    StackTraceElement header = new StackTraceElement(MultipleCausesIOException.class.getName(), 
+                            "Exception Number " + exceptionNum, 
+                            "<no file>",
+                            0);
+                    
+                    frames.add(header);
+                    for (StackTraceElement ste : exception.getStackTrace()) {
+                        frames.add(ste);
+                    }
+                    exceptionNum++;
+                }
+                
+                setStackTrace(frames.toArray(new StackTraceElement[frames.size()]));
+                this.hasSetStackTrace = true;
+            }        
+            
+            return super.getStackTrace();
+        }
+
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/util/ColumnInfo.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/util/ColumnInfo.java b/src/main/java/org/apache/phoenix/util/ColumnInfo.java
new file mode 100644
index 0000000..9ca05e7
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/util/ColumnInfo.java
@@ -0,0 +1,22 @@
+package org.apache.phoenix.util;
+
+/**
+ * ColumnInfo used to store Column Name and its associated PDataType
+ */
+public class ColumnInfo {
+	private String columnName;
+	private Integer sqlType;
+
+	public ColumnInfo(String columnName, Integer sqlType) {
+		this.columnName = columnName;
+		this.sqlType = sqlType;
+	}
+
+	public String getColumnName() {
+		return columnName;
+	}
+
+	public Integer getSqlType() {
+		return sqlType;
+	}
+}	

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/util/DateUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/util/DateUtil.java b/src/main/java/org/apache/phoenix/util/DateUtil.java
new file mode 100644
index 0000000..e68ae48
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/util/DateUtil.java
@@ -0,0 +1,154 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.text.Format;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.TimeZone;
+
+import org.apache.commons.lang.time.FastDateFormat;
+
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.IllegalDataException;
+
+
+
+@SuppressWarnings("serial")
+public class DateUtil {
+    public static final TimeZone DATE_TIME_ZONE = TimeZone.getTimeZone("GMT");
+    public static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss"; // This is the format the app sets in NLS settings for every connection.
+    public static final Format DEFAULT_DATE_FORMATTER = FastDateFormat.getInstance(DEFAULT_DATE_FORMAT, DATE_TIME_ZONE);
+
+    public static final String DEFAULT_MS_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS";
+    public static final Format DEFAULT_MS_DATE_FORMATTER = FastDateFormat.getInstance(DEFAULT_MS_DATE_FORMAT, DATE_TIME_ZONE);
+
+    private DateUtil() {
+    }
+    
+	public static Format getDateParser(String pattern) {
+        SimpleDateFormat format = new SimpleDateFormat(pattern) {
+            @Override
+            public java.util.Date parseObject(String source) throws ParseException {
+                java.util.Date date = super.parse(source);
+                return new java.sql.Date(date.getTime());
+            }
+        };
+        format.setTimeZone(DateUtil.DATE_TIME_ZONE);
+        return format;
+    }
+    
+    public static Format getTimeParser(String pattern) {
+        SimpleDateFormat format = new SimpleDateFormat(pattern) {
+            @Override
+            public java.util.Date parseObject(String source) throws ParseException {
+                java.util.Date date = super.parse(source);
+                return new java.sql.Time(date.getTime());
+            }
+        };
+        format.setTimeZone(DateUtil.DATE_TIME_ZONE);
+        return format;
+    }
+    
+    public static Format getTimestampParser(String pattern) {
+        SimpleDateFormat format = new SimpleDateFormat(pattern) {
+            @Override
+            public java.util.Date parseObject(String source) throws ParseException {
+                java.util.Date date = super.parse(source);
+                return new java.sql.Timestamp(date.getTime());
+            }
+        };
+        format.setTimeZone(DateUtil.DATE_TIME_ZONE);
+        return format;
+    }
+    
+    public static Format getDateFormatter(String pattern) {
+        return DateUtil.DEFAULT_DATE_FORMAT.equals(pattern) ? DateUtil.DEFAULT_DATE_FORMATTER : FastDateFormat.getInstance(pattern, DateUtil.DATE_TIME_ZONE);
+    }
+    
+    private static ThreadLocal<Format> dateFormat =
+        new ThreadLocal < Format > () {
+            @Override protected Format initialValue() {
+                return getDateParser(DEFAULT_DATE_FORMAT);
+            }
+        };
+    
+    public static Date parseDate(String dateValue) {
+        try {
+            return (Date)dateFormat.get().parseObject(dateValue);
+        } catch (ParseException e) {
+            throw new IllegalDataException(e);
+        }
+    }
+    
+    private static ThreadLocal<Format> timeFormat =
+        new ThreadLocal < Format > () {
+            @Override protected Format initialValue() {
+                return getTimeParser(DEFAULT_DATE_FORMAT);
+            }
+        };
+    
+    public static Time parseTime(String timeValue) {
+        try {
+            return (Time)timeFormat.get().parseObject(timeValue);
+        } catch (ParseException e) {
+            throw new IllegalDataException(e);
+        }
+    }
+    
+    private static ThreadLocal<Format> timestampFormat =
+        new ThreadLocal < Format > () {
+            @Override protected Format initialValue() {
+                return getTimestampParser(DEFAULT_DATE_FORMAT);
+            }
+        };
+    
+    public static Timestamp parseTimestamp(String timeValue) {
+        try {
+            return (Timestamp)timestampFormat.get().parseObject(timeValue);
+        } catch (ParseException e) {
+            throw new IllegalDataException(e);
+        }
+    }
+    
+    /**
+     * Utility function to work around the weirdness of the {@link Timestamp} constructor.
+     * This method takes the milli-seconds that spills over to the nanos part as part of 
+     * constructing the {@link Timestamp} object.
+     * If we just set the nanos part of timestamp to the nanos passed in param, we 
+     * end up losing the sub-second part of timestamp. 
+     */
+    public static Timestamp getTimestamp(long millis, int nanos) {
+        Timestamp ts = new Timestamp(millis);
+        ts.setNanos(ts.getNanos() + nanos);
+        return ts;
+    }
+    
+    /**
+     * Utility function to convert a {@link BigDecimal} value to {@link Timestamp}.
+     */
+    public static Timestamp getTimestamp(BigDecimal bd) {
+        return DateUtil.getTimestamp(bd.longValue(), ((bd.remainder(BigDecimal.ONE).multiply(BigDecimal.valueOf(QueryConstants.MILLIS_TO_NANOS_CONVERTOR))).intValue()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/util/IndexUtil.java b/src/main/java/org/apache/phoenix/util/IndexUtil.java
new file mode 100644
index 0000000..7336d5c
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -0,0 +1,199 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.common.collect.Lists;
+import org.apache.hbase.index.ValueGetter;
+import org.apache.hbase.index.covered.update.ColumnReference;
+import org.apache.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
+import org.apache.phoenix.schema.ColumnNotFoundException;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PColumnFamily;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.PTable;
+
+public class IndexUtil {
+    public static final String INDEX_COLUMN_NAME_SEP = ":";
+    public static final byte[] INDEX_COLUMN_NAME_SEP_BYTES = Bytes.toBytes(INDEX_COLUMN_NAME_SEP);
+
+    private IndexUtil() {
+    }
+
+    // Since we cannot have nullable fixed length in a row key
+    // we need to translate to variable length.
+    public static PDataType getIndexColumnDataType(PColumn dataColumn) throws SQLException {
+        PDataType type = getIndexColumnDataType(dataColumn.isNullable(),dataColumn.getDataType());
+        if (type == null) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_INDEX_COLUMN_ON_TYPE).setColumnName(dataColumn.getName().getString())
+            .setMessage("Type="+dataColumn.getDataType()).build().buildException();
+        }
+        return type;
+    }
+    
+    // Since we cannot have nullable fixed length in a row key
+    // we need to translate to variable length. The verification that we have a valid index
+    // row key was already done, so here we just need to covert from one built-in type to
+    // another.
+    public static PDataType getIndexColumnDataType(boolean isNullable, PDataType dataType) {
+        if (dataType == null || !isNullable || !dataType.isFixedWidth() || dataType == PDataType.BINARY) {
+            return dataType;
+        }
+        // for INT, BIGINT
+        if (dataType.isCoercibleTo(PDataType.TIMESTAMP) || dataType.isCoercibleTo(PDataType.DECIMAL)) {
+            return PDataType.DECIMAL;
+        }
+        // for CHAR
+        if (dataType.isCoercibleTo(PDataType.VARCHAR)) {
+            return PDataType.VARCHAR;
+        }
+        throw new IllegalArgumentException("Unsupported non nullable index type " + dataType);
+    }
+    
+
+    public static String getDataColumnName(String name) {
+        return name.substring(name.indexOf(INDEX_COLUMN_NAME_SEP) + 1);
+    }
+
+    public static String getDataColumnFamilyName(String name) {
+        return name.substring(0,name.indexOf(INDEX_COLUMN_NAME_SEP));
+    }
+
+    public static String getDataColumnFullName(String name) {
+        int index = name.indexOf(INDEX_COLUMN_NAME_SEP) ;
+        if (index == 0) {
+            return name.substring(index+1);
+        }
+        return SchemaUtil.getColumnDisplayName(name.substring(0, index), name.substring(index+1));
+    }
+
+    public static String getIndexColumnName(String dataColumnFamilyName, String dataColumnName) {
+        return (dataColumnFamilyName == null ? "" : dataColumnFamilyName) + INDEX_COLUMN_NAME_SEP + dataColumnName;
+    }
+    
+    public static byte[] getIndexColumnName(byte[] dataColumnFamilyName, byte[] dataColumnName) {
+        return ByteUtil.concat(dataColumnFamilyName == null ?  ByteUtil.EMPTY_BYTE_ARRAY : dataColumnFamilyName, INDEX_COLUMN_NAME_SEP_BYTES, dataColumnName);
+    }
+    
+    public static String getIndexColumnName(PColumn dataColumn) {
+        String dataColumnFamilyName = SchemaUtil.isPKColumn(dataColumn) ? null : dataColumn.getFamilyName().getString();
+        return getIndexColumnName(dataColumnFamilyName, dataColumn.getName().getString());
+    }
+
+    public static PColumn getDataColumn(PTable dataTable, String indexColumnName) {
+        int pos = indexColumnName.indexOf(INDEX_COLUMN_NAME_SEP);
+        if (pos < 0) {
+            throw new IllegalArgumentException("Could not find expected '" + INDEX_COLUMN_NAME_SEP +  "' separator in index column name of \"" + indexColumnName + "\"");
+        }
+        if (pos == 0) {
+            try {
+                return dataTable.getPKColumn(indexColumnName.substring(1));
+            } catch (ColumnNotFoundException e) {
+                throw new IllegalArgumentException("Could not find PK column \"" +  indexColumnName.substring(pos+1) + "\" in index column name of \"" + indexColumnName + "\"", e);
+            }
+        }
+        PColumnFamily family;
+        try {
+            family = dataTable.getColumnFamily(indexColumnName.substring(0, pos));
+        } catch (ColumnFamilyNotFoundException e) {
+            throw new IllegalArgumentException("Could not find column family \"" +  indexColumnName.substring(0, pos) + "\" in index column name of \"" + indexColumnName + "\"", e);
+        }
+        try {
+            return family.getColumn(indexColumnName.substring(pos+1));
+        } catch (ColumnNotFoundException e) {
+            throw new IllegalArgumentException("Could not find column \"" +  indexColumnName.substring(pos+1) + "\" in index column name of \"" + indexColumnName + "\"", e);
+        }
+    }
+
+    private static boolean isEmptyKeyValue(PTable table, ColumnReference ref) {
+        byte[] emptyKeyValueCF = SchemaUtil.getEmptyColumnFamily(table.getColumnFamilies());
+        return (Bytes.compareTo(emptyKeyValueCF, ref.getFamily()) == 0 &&
+                Bytes.compareTo(QueryConstants.EMPTY_COLUMN_BYTES, ref.getQualifier()) == 0);
+    }
+    public static List<Mutation> generateIndexData(final PTable table, PTable index, List<Mutation> dataMutations, ImmutableBytesWritable ptr) throws SQLException {
+        try {
+            IndexMaintainer maintainer = index.getIndexMaintainer(table);
+            List<Mutation> indexMutations = Lists.newArrayListWithExpectedSize(dataMutations.size());
+           for (final Mutation dataMutation : dataMutations) {
+                long ts = MetaDataUtil.getClientTimeStamp(dataMutation);
+                ptr.set(dataMutation.getRow());
+                if (dataMutation instanceof Put) {
+                    // TODO: is this more efficient than looking in our mutation map
+                    // using the key plus finding the PColumn?
+                    ValueGetter valueGetter = new ValueGetter() {
+        
+                        @Override
+                        public ImmutableBytesPtr getLatestValue(ColumnReference ref) {
+                            // Always return null for our empty key value, as this will cause the index
+                            // maintainer to always treat this Put as a new row.
+                            if (isEmptyKeyValue(table, ref)) {
+                                return null;
+                            }
+                            Map<byte [], List<KeyValue>> familyMap = dataMutation.getFamilyMap();
+                            byte[] family = ref.getFamily();
+                            List<KeyValue> kvs = familyMap.get(family);
+                            if (kvs == null) {
+                                return null;
+                            }
+                            byte[] qualifier = ref.getQualifier();
+                            for (KeyValue kv : kvs) {
+                                if (Bytes.compareTo(kv.getBuffer(), kv.getFamilyOffset(), kv.getFamilyLength(), family, 0, family.length) == 0 &&
+                                    Bytes.compareTo(kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength(), qualifier, 0, qualifier.length) == 0) {
+                                    return new ImmutableBytesPtr(kv.getBuffer(), kv.getValueOffset(), kv.getValueLength());
+                                }
+                            }
+                            return null;
+                        }
+                        
+                    };
+                    indexMutations.add(maintainer.buildUpdateMutation(valueGetter, ptr, ts));
+                } else {
+                    if (!maintainer.getIndexedColumns().isEmpty()) {
+                        throw new SQLExceptionInfo.Builder(SQLExceptionCode.NO_DELETE_IF_IMMUTABLE_INDEX).setSchemaName(table.getSchemaName().getString())
+                        .setTableName(table.getTableName().getString()).build().buildException();
+                    }
+                    indexMutations.add(maintainer.buildDeleteMutation(ptr, ts));
+                }
+            }
+            return indexMutations;
+        } catch (IOException e) {
+            throw new SQLException(e);
+        }
+    }
+
+    public static boolean isDataPKColumn(PColumn column) {
+        return column.getName().getString().startsWith(INDEX_COLUMN_NAME_SEP);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/util/InstanceResolver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/util/InstanceResolver.java b/src/main/java/org/apache/phoenix/util/InstanceResolver.java
new file mode 100644
index 0000000..85cf54d
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/util/InstanceResolver.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.util.ServiceLoader;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Resolves object instances registered using the JDK 6+ {@link java.util.ServiceLoader}.
+ *
+ * @author aaraujo
+ * @since 2.0
+ */
+public class InstanceResolver {
+    private static final ConcurrentHashMap<Class, Object> RESOLVED_SINGLETONS = new ConcurrentHashMap<Class, Object>();
+
+    private InstanceResolver() {/* not allowed */}
+
+    /**
+     * Resolves an instance of the specified class if it has not already been resolved.
+     * @param clazz The type of instance to resolve
+     * @param defaultInstance The instance to use if a custom instance has not been registered
+     * @return The resolved instance or the default instance provided.
+     *         {@code null} if an instance is not registered and a default is not provided.
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> T getSingleton(Class<T> clazz, T defaultInstance) {
+        Object obj = RESOLVED_SINGLETONS.get(clazz);
+        if(obj != null) {
+            return (T)obj;
+        }
+        if (defaultInstance != null && !clazz.isInstance(defaultInstance)) throw new IllegalArgumentException("defaultInstance is not of type " + clazz.getName());
+        final Object o = resolveSingleton(clazz, defaultInstance);
+        obj = RESOLVED_SINGLETONS.putIfAbsent(clazz, o);
+        if(obj == null) {
+            obj = o;
+        }
+        return (T)obj;
+    }
+    
+    private synchronized static <T> T resolveSingleton(Class<T> clazz, T defaultInstance) {
+        ServiceLoader<T> loader = ServiceLoader.load(clazz);
+        // returns the first registered instance found
+        for (T singleton : loader) {
+            return singleton;
+        }
+        return defaultInstance;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/util/JDBCUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/util/JDBCUtil.java b/src/main/java/org/apache/phoenix/util/JDBCUtil.java
new file mode 100644
index 0000000..b3adb85
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/util/JDBCUtil.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.sql.SQLException;
+import java.util.Properties;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+
+
+
+/**
+ * Utilities for JDBC
+ *
+ * @author jtaylor
+ * @since 178
+ */
+public class JDBCUtil {
+    
+    private JDBCUtil() {
+    }
+
+    /**
+     * Find the propName by first looking in the url string and if not found,
+     * next in the info properties. If not found, null is returned.
+     * @param url JDBC connection URL
+     * @param info JDBC connection properties
+     * @param propName the name of the property to find
+     * @return the property value or null if not found
+     */
+    public static String findProperty(String url, Properties info, String propName) {
+        String urlPropName = ";" + propName + "=";
+        String propValue = info.getProperty(propName);
+        if (propValue == null) {
+            int begIndex = url.indexOf(urlPropName);
+            if (begIndex >= 0) {
+                int endIndex = url.indexOf(';',begIndex + urlPropName.length());
+                if (endIndex < 0) {
+                    endIndex = url.length();
+                }
+                propValue = url.substring(begIndex + urlPropName.length(), endIndex);
+            }
+        }
+        return propValue;
+    }
+
+    public static Long getCurrentSCN(String url, Properties info) throws SQLException {
+        String scnStr = findProperty(url, info, PhoenixRuntime.CURRENT_SCN_ATTRIB);
+        return (scnStr == null ? null : Long.parseLong(scnStr));
+    }
+
+    public static int getMutateBatchSize(String url, Properties info, ReadOnlyProps props) throws SQLException {
+        String batchSizeStr = findProperty(url, info, PhoenixRuntime.UPSERT_BATCH_SIZE_ATTRIB);
+        return (batchSizeStr == null ? props.getInt(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE) : Integer.parseInt(batchSizeStr));
+    }
+
+    public static byte[] getTenantId(String url, Properties info) throws SQLException {
+        String tenantId = findProperty(url, info, PhoenixRuntime.TENANT_ID_ATTRIB);
+        return (tenantId == null ? null : Bytes.toBytes(tenantId));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/util/KeyValueUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/util/KeyValueUtil.java b/src/main/java/org/apache/phoenix/util/KeyValueUtil.java
new file mode 100644
index 0000000..ea6f28d
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/util/KeyValueUtil.java
@@ -0,0 +1,134 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * 
+ * Utilities for KeyValue. Where there's duplication with KeyValue methods,
+ * these avoid creating new objects when not necessary (primary preventing
+ * byte array copying).
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class KeyValueUtil {
+    private KeyValueUtil() {
+    }
+
+    public static KeyValue newKeyValue(Result r, byte[] cf, byte[] cq, long ts, byte[] value, int valueOffset, int valueLength) {
+        byte[] bytes = ResultUtil.getRawBytes(r);
+        return new KeyValue(bytes, ResultUtil.getKeyOffset(r), ResultUtil.getKeyLength(r),
+                cf, 0, cf.length,
+                cq, 0, cq.length,
+                ts, Type.Put,
+                value, valueOffset, valueLength);
+    }
+
+    public static KeyValue newKeyValue(byte[] key, byte[] cf, byte[] cq, long ts, byte[] value, int valueOffset, int valueLength) {
+        return new KeyValue(key, 0, key.length,
+                cf, 0, cf.length,
+                cq, 0, cq.length,
+                ts, Type.Put,
+                value, valueOffset, valueLength);
+    }
+
+    public static KeyValue newKeyValue(ImmutableBytesWritable key, byte[] cf, byte[] cq, long ts, byte[] value, int valueOffset, int valueLength) {
+        return new KeyValue(key.get(), key.getOffset(), key.getLength(),
+                cf, 0, cf.length,
+                cq, 0, cq.length,
+                ts, Type.Put,
+                value, valueOffset, valueLength);
+    }
+
+    public static KeyValue newKeyValue(byte[] key, int keyOffset, int keyLength, byte[] cf, byte[] cq, long ts, byte[] value, int valueOffset, int valueLength) {
+        return new KeyValue(key, keyOffset, keyLength,
+                cf, 0, cf.length,
+                cq, 0, cq.length,
+                ts, Type.Put,
+                value, valueOffset, valueLength);
+    }
+
+    public static KeyValue newKeyValue(byte[] key, byte[] cf, byte[] cq, long ts, byte[] value) {
+        return newKeyValue(key,cf,cq,ts,value,0,value.length);
+    }
+
+    public static KeyValue newKeyValue(Result r, byte[] cf, byte[] cq, long ts, byte[] value) {
+        return newKeyValue(r,cf,cq,ts,value,0,value.length);
+    }
+
+    /**
+     * Binary search for latest column value without allocating memory in the process
+     * @param kvs
+     * @param searchTerm
+     */
+    public static KeyValue getColumnLatest(List<KeyValue>kvs, KeyValue searchTerm) {
+        if (kvs.size() == 0) {
+          return null;
+        }
+        
+        // pos === ( -(insertion point) - 1)
+        int pos = Collections.binarySearch(kvs, searchTerm, KeyValue.COMPARATOR);
+        // never will exact match
+        if (pos < 0) {
+          pos = (pos+1) * -1;
+          // pos is now insertion point
+        }
+        if (pos == kvs.size()) {
+          return null; // doesn't exist
+        }
+    
+        KeyValue kv = kvs.get(pos);
+        if (Bytes.compareTo(kv.getBuffer(), kv.getFamilyOffset(), kv.getFamilyLength(),
+                searchTerm.getBuffer(), searchTerm.getFamilyOffset(), searchTerm.getFamilyLength()) != 0) {
+            return null;
+        }
+        if (Bytes.compareTo(kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength(),
+                searchTerm.getBuffer(), searchTerm.getQualifierOffset(), searchTerm.getQualifierLength()) != 0) {
+            return null;
+        }
+        return kv;
+    }
+
+    /**
+     * Binary search for latest column value without allocating memory in the process
+     */
+    public static KeyValue getColumnLatest(List<KeyValue>kvs, byte[] family, byte[] qualifier) {
+        KeyValue kv = kvs.get(0);
+        return KeyValueUtil.getColumnLatest(kvs, kv.getBuffer(), kv.getRowOffset(), kv.getRowLength(), family, 0, family.length, qualifier, 0, qualifier.length);
+    }
+
+    /**
+     * Binary search for latest column value without allocating memory in the process
+     */
+    public static KeyValue getColumnLatest(List<KeyValue>kvs, byte[] row, int roffset, int rlength, byte[] family, int foffset, int flength, byte[] qualifier, int qoffset, int qlength) {
+        KeyValue searchTerm = KeyValue.createFirstOnRow(row, roffset, rlength, family, foffset, flength, qualifier, qoffset, qlength);
+        return getColumnLatest(kvs,searchTerm);
+        
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/util/MetaDataUtil.java b/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
new file mode 100644
index 0000000..b778b2a
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
@@ -0,0 +1,197 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import static org.apache.phoenix.util.SchemaUtil.getVarChars;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PDataType;
+
+
+public class MetaDataUtil {
+
+    public static boolean areClientAndServerCompatible(long version) {
+        // A server and client with the same major and minor version number must be compatible.
+        // So it's important that we roll the PHOENIX_MAJOR_VERSION or PHOENIX_MINOR_VERSION
+        // when we make an incompatible change.
+        return areClientAndServerCompatible(MetaDataUtil.decodePhoenixVersion(version), MetaDataProtocol.PHOENIX_MAJOR_VERSION, MetaDataProtocol.PHOENIX_MINOR_VERSION);
+    }
+
+    // For testing
+    static boolean areClientAndServerCompatible(int version, int pMajor, int pMinor) {
+        // A server and client with the same major and minor version number must be compatible.
+        // So it's important that we roll the PHOENIX_MAJOR_VERSION or PHOENIX_MINOR_VERSION
+        // when we make an incompatible change.
+        return MetaDataUtil.encodeMaxPatchVersion(pMajor, pMinor) >= version && MetaDataUtil.encodeMinPatchVersion(pMajor, pMinor) <= version;
+    }
+
+    // Given the encoded integer representing the phoenix version in the encoded version value.
+    // The second byte in int would be the major version, 3rd byte minor version, and 4th byte 
+    // patch version.
+    public static int decodePhoenixVersion(long version) {
+        return (int) ((version << Byte.SIZE * 3) >>> Byte.SIZE * 4);
+    }
+    
+    // TODO: generalize this to use two bytes to return a SQL error code instead
+    public static long encodeMutableIndexConfiguredProperly(long version, boolean isValid) {
+        if (!isValid) {
+            return version | 1;
+        }
+        return version;
+    }
+    
+    public static boolean decodeMutableIndexConfiguredProperly(long version) {
+        return (version & 0xF) == 0;
+    }
+
+    // Given the encoded integer representing the client hbase version in the encoded version value.
+    // The second byte in int would be the major version, 3rd byte minor version, and 4th byte 
+    // patch version.
+    public static int decodeHBaseVersion(long version) {
+        return (int) (version >>> Byte.SIZE * 5);
+    }
+
+    public static String decodeHBaseVersionAsString(int version) {
+        int major = (version >>> Byte.SIZE  * 2) & 0xFF;
+        int minor = (version >>> Byte.SIZE  * 1) & 0xFF;
+        int patch = version & 0xFF;
+        return major + "." + minor + "." + patch;
+    }
+
+    public static long encodeHBaseAndPhoenixVersions(String hbaseVersion) {
+        return (((long) encodeVersion(hbaseVersion)) << (Byte.SIZE * 5)) |
+                (((long) encodeVersion(MetaDataProtocol.PHOENIX_MAJOR_VERSION, MetaDataProtocol.PHOENIX_MINOR_VERSION,
+                        MetaDataProtocol.PHOENIX_PATCH_NUMBER)) << (Byte.SIZE * 1));
+    }
+
+    // Encode a version string in the format of "major.minor.patch" into an integer.
+    public static int encodeVersion(String version) {
+        String[] versionParts = version.split("[-\\.]");
+        return encodeVersion(versionParts[0], versionParts.length > 1 ? versionParts[1] : null, versionParts.length > 2 ? versionParts[2] : null);
+    }
+
+    // Encode the major as 2nd byte in the int, minor as the first byte and patch as the last byte.
+    public static int encodeVersion(String major, String minor, String patch) {
+        return encodeVersion(major == null ? 0 : Integer.parseInt(major), minor == null ? 0 : Integer.parseInt(minor), 
+                        patch == null ? 0 : Integer.parseInt(patch));
+    }
+
+    public static int encodeVersion(int major, int minor, int patch) {
+        int version = 0;
+        version |= (major << Byte.SIZE * 2);
+        version |= (minor << Byte.SIZE);
+        version |= patch;
+        return version;
+    }
+
+    public static int encodeMaxPatchVersion(int major, int minor) {
+        int version = 0;
+        version |= (major << Byte.SIZE * 2);
+        version |= (minor << Byte.SIZE);
+        version |= 0xFF;
+        return version;
+    }
+
+    public static int encodeMinPatchVersion(int major, int minor) {
+        int version = 0;
+        version |= (major << Byte.SIZE * 2);
+        version |= (minor << Byte.SIZE);
+        return version;
+    }
+
+    public static void getSchemaAndTableName(List<Mutation> tableMetadata, byte[][] rowKeyMetaData) {
+        Mutation m = getTableHeaderRow(tableMetadata);
+        getVarChars(m.getRow(), 2, rowKeyMetaData);
+    }
+    
+    public static byte[] getParentTableName(List<Mutation> tableMetadata) {
+        if (tableMetadata.size() == 1) {
+            return null;
+        }
+        byte[][] rowKeyMetaData = new byte[2][];
+        getSchemaAndTableName(tableMetadata, rowKeyMetaData);
+        byte[] tableName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
+        Mutation m = getParentTableHeaderRow(tableMetadata);
+        getVarChars(m.getRow(), 2, rowKeyMetaData);
+        if (Bytes.compareTo(tableName, rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX]) == 0) {
+            return null;
+        }
+        return rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
+    }
+    
+    public static long getSequenceNumber(Mutation tableMutation) {
+        List<KeyValue> kvs = tableMutation.getFamilyMap().get(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES);
+        if (kvs != null) {
+            for (KeyValue kv : kvs) { // list is not ordered, so search. TODO: we could potentially assume the position
+                if (Bytes.compareTo(kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength(), PhoenixDatabaseMetaData.TABLE_SEQ_NUM_BYTES, 0, PhoenixDatabaseMetaData.TABLE_SEQ_NUM_BYTES.length) == 0) {
+                    return PDataType.LONG.getCodec().decodeLong(kv.getBuffer(), kv.getValueOffset(), null);
+                }
+            }
+        }
+        throw new IllegalStateException();
+    }
+    
+    public static long getSequenceNumber(List<Mutation> tableMetaData) {
+        return getSequenceNumber(getTableHeaderRow(tableMetaData));
+    }
+    
+    public static long getParentSequenceNumber(List<Mutation> tableMetaData) {
+        return getSequenceNumber(getParentTableHeaderRow(tableMetaData));
+    }
+    
+    public static Mutation getTableHeaderRow(List<Mutation> tableMetaData) {
+        return tableMetaData.get(0);
+    }
+
+    public static Mutation getParentTableHeaderRow(List<Mutation> tableMetaData) {
+        return tableMetaData.get(tableMetaData.size()-1);
+    }
+
+    public static long getClientTimeStamp(List<Mutation> tableMetadata) {
+        Mutation m = tableMetadata.get(0);
+        return getClientTimeStamp(m);
+    }    
+
+    public static long getClientTimeStamp(Mutation m) {
+        Collection<List<KeyValue>> kvs = m.getFamilyMap().values();
+        // Empty if Mutation is a Delete
+        // TODO: confirm that Delete timestamp is reset like Put
+        return kvs.isEmpty() ? m.getTimeStamp() : kvs.iterator().next().get(0).getTimestamp();
+    }    
+
+    public static byte[] getParentLinkKey(String schemaName, String tableName, String indexName) {
+        return ByteUtil.concat(schemaName == null ? ByteUtil.EMPTY_BYTE_ARRAY : Bytes.toBytes(schemaName), QueryConstants.SEPARATOR_BYTE_ARRAY, Bytes.toBytes(tableName), QueryConstants.SEPARATOR_BYTE_ARRAY, QueryConstants.SEPARATOR_BYTE_ARRAY, Bytes.toBytes(indexName));
+    }
+
+    public static byte[] getParentLinkKey(byte[] schemaName, byte[] tableName, byte[] indexName) {
+        return ByteUtil.concat(schemaName == null ? ByteUtil.EMPTY_BYTE_ARRAY : schemaName, QueryConstants.SEPARATOR_BYTE_ARRAY, tableName, QueryConstants.SEPARATOR_BYTE_ARRAY, QueryConstants.SEPARATOR_BYTE_ARRAY, indexName);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/util/NumberUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/util/NumberUtil.java b/src/main/java/org/apache/phoenix/util/NumberUtil.java
new file mode 100644
index 0000000..ece1104
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/util/NumberUtil.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.math.BigDecimal;
+
+import org.apache.phoenix.schema.PDataType;
+
+/**
+ * Utility methods for numbers like decimal, long, etc.
+ *
+ * @author elevine
+ * @since 0.1
+ */
+public class NumberUtil {
+    
+    public static final String DEFAULT_NUMBER_FORMAT = "#,##0.###";
+
+    /**
+     * Strip all trailing zeros to ensure that no digit will be zero and
+     * round using our default context to ensure precision doesn't exceed max allowed.
+     * @return new {@link BigDecimal} instance
+     */
+    public static BigDecimal normalize(BigDecimal bigDecimal) {
+        return bigDecimal.stripTrailingZeros().round(PDataType.DEFAULT_MATH_CONTEXT);
+    }
+
+    public static BigDecimal setDecimalWidthAndScale(BigDecimal decimal, int precision, int scale) {
+        // If we could not fit all the digits before decimal point into the new desired precision and
+        // scale, return null and the caller method should handle the error.
+        if (((precision - scale) < (decimal.precision() - decimal.scale()))){
+            return null;
+        }
+        decimal = decimal.setScale(scale, BigDecimal.ROUND_DOWN);
+        return decimal;
+    }
+}