You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/01/27 20:23:09 UTC
[08/51] [partial] Initial commit
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/util/ByteUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/util/ByteUtil.java b/src/main/java/org/apache/phoenix/util/ByteUtil.java
new file mode 100644
index 0000000..f654c9e
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/util/ByteUtil.java
@@ -0,0 +1,559 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.WritableUtils;
+
+import org.apache.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+
+
+/**
+ *
+ * Byte utilities
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class ByteUtil {
+ public static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
+
+ public static final Comparator<ImmutableBytesPtr> BYTES_PTR_COMPARATOR = new Comparator<ImmutableBytesPtr>() {
+
+ @Override
+ public int compare(ImmutableBytesPtr o1, ImmutableBytesPtr o2) {
+ return Bytes.compareTo(o1.get(), o1.getOffset(), o1.getLength(), o2.get(), o2.getOffset(), o2.getLength());
+ }
+
+ };
+
+ /**
+ * Serialize an array of byte arrays into a single byte array. Used
+ * to pass through a set of bytes arrays as an attribute of a Scan.
+ * Use {@link #toByteArrays(byte[], int)} to convert the serialized
+ * byte array back to the array of byte arrays.
+ * @param byteArrays the array of byte arrays to serialize
+ * @return the byte array
+ */
+ public static byte[] toBytes(byte[][] byteArrays) {
+ int size = 0;
+ for (byte[] b : byteArrays) {
+ if (b == null) {
+ size++;
+ } else {
+ size += b.length;
+ size += WritableUtils.getVIntSize(b.length);
+ }
+ }
+ TrustedByteArrayOutputStream bytesOut = new TrustedByteArrayOutputStream(size);
+ DataOutputStream out = new DataOutputStream(bytesOut);
+ try {
+ for (byte[] b : byteArrays) {
+ if (b == null) {
+ WritableUtils.writeVInt(out, 0);
+ } else {
+ WritableUtils.writeVInt(out, b.length);
+ out.write(b);
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e); // not possible
+ } finally {
+ try {
+ out.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e); // not possible
+ }
+ }
+ return bytesOut.getBuffer();
+ }
+
+ /**
+ * Deserialize a byte array into a set of byte arrays. Used in
+ * coprocessor to reconstruct byte arrays from attribute value
+ * passed through the Scan.
+ * @param b byte array containing serialized byte arrays (created by {@link #toBytes(byte[][])}).
+ * @param length number of byte arrays that were serialized
+ * @return array of now deserialized byte arrays
+ * @throws IllegalStateException if there are more than length number of byte arrays that were serialized
+ */
+ public static byte[][] toByteArrays(byte[] b, int length) {
+ return toByteArrays(b, 0, length);
+ }
+
+ public static byte[][] toByteArrays(byte[] b, int offset, int length) {
+ ByteArrayInputStream bytesIn = new ByteArrayInputStream(b, offset, b.length - offset);
+ DataInputStream in = new DataInputStream(bytesIn);
+ byte[][] byteArrays = new byte[length][];
+ try {
+ for (int i = 0; i < length; i++) {
+ int bLength = WritableUtils.readVInt(in);
+ if (bLength == 0) {
+ byteArrays[i] = null;
+ } else {
+ byteArrays[i] = new byte[bLength];
+ int rLength = in.read(byteArrays[i], 0, bLength);
+ assert (rLength == bLength); // For find bugs
+ }
+ }
+ if (in.read() != -1) {
+ throw new IllegalStateException("Expected only " + length + " byte arrays, but found more");
+ }
+ return byteArrays;
+ } catch (IOException e) {
+ throw new RuntimeException(e); // not possible
+ } finally {
+ try {
+ in.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e); // not possible
+ }
+ }
+ }
+
+ public static byte[] serializeVIntArray(int[] intArray) {
+ return serializeVIntArray(intArray,intArray.length);
+ }
+
+ public static byte[] serializeVIntArray(int[] intArray, int encodedLength) {
+ int size = WritableUtils.getVIntSize(encodedLength);
+ for (int i = 0; i < intArray.length; i++) {
+ size += WritableUtils.getVIntSize(intArray[i]);
+ }
+ int offset = 0;
+ byte[] out = new byte[size];
+ offset += ByteUtil.vintToBytes(out, offset, size);
+ for (int i = 0; i < intArray.length; i++) {
+ offset += ByteUtil.vintToBytes(out, offset, intArray[i]);
+ }
+ return out;
+ }
+
+ public static void serializeVIntArray(DataOutput output, int[] intArray) throws IOException {
+ serializeVIntArray(output, intArray, intArray.length);
+ }
+
+ /**
+ * Allows additional stuff to be encoded in length
+ * @param output
+ * @param intArray
+ * @param encodedLength
+ * @throws IOException
+ */
+ public static void serializeVIntArray(DataOutput output, int[] intArray, int encodedLength) throws IOException {
+ WritableUtils.writeVInt(output, encodedLength);
+ for (int i = 0; i < intArray.length; i++) {
+ WritableUtils.writeVInt(output, intArray[i]);
+ }
+ }
+
+ public static long[] readFixedLengthLongArray(DataInput input, int length) throws IOException {
+ long[] longArray = new long[length];
+ for (int i = 0; i < length; i++) {
+ longArray[i] = input.readLong();
+ }
+ return longArray;
+ }
+
+ public static void writeFixedLengthLongArray(DataOutput output, long[] longArray) throws IOException {
+ for (int i = 0; i < longArray.length; i++) {
+ output.writeLong(longArray[i]);
+ }
+ }
+
+ /**
+ * Deserialize a byte array into a int array.
+ * @param b byte array storing serialized vints
+ * @return int array
+ */
+ public static int[] deserializeVIntArray(byte[] b) {
+ ByteArrayInputStream bytesIn = new ByteArrayInputStream(b);
+ DataInputStream in = new DataInputStream(bytesIn);
+ try {
+ int length = WritableUtils.readVInt(in);
+ return deserializeVIntArray(in, length);
+ } catch (IOException e) {
+ throw new RuntimeException(e); // not possible
+ } finally {
+ try {
+ in.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e); // not possible
+ }
+ }
+ }
+
+ public static int[] deserializeVIntArray(DataInput in) throws IOException {
+ return deserializeVIntArray(in, WritableUtils.readVInt(in));
+ }
+
+ public static int[] deserializeVIntArray(DataInput in, int length) throws IOException {
+ int i = 0;
+ int[] intArray = new int[length];
+ while (i < length) {
+ intArray[i++] = WritableUtils.readVInt(in);
+ }
+ return intArray;
+ }
+
+ /**
+ * Deserialize a byte array into a int array.
+ * @param b byte array storing serialized vints
+ * @param length number of serialized vints
+ * @return int array
+ */
+ public static int[] deserializeVIntArray(byte[] b, int length) {
+ ByteArrayInputStream bytesIn = new ByteArrayInputStream(b);
+ DataInputStream in = new DataInputStream(bytesIn);
+ try {
+ return deserializeVIntArray(in,length);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Concatenate together one or more byte arrays
+ * @param first first byte array
+ * @param rest rest of byte arrays
+ * @return newly allocated byte array that is a concatenation of all the byte arrays passed in
+ */
+ public static byte[] concat(byte[] first, byte[]... rest) {
+ int totalLength = first.length;
+ for (byte[] array : rest) {
+ totalLength += array.length;
+ }
+ byte[] result = Arrays.copyOf(first, totalLength);
+ int offset = first.length;
+ for (byte[] array : rest) {
+ System.arraycopy(array, 0, result, offset, array.length);
+ offset += array.length;
+ }
+ return result;
+ }
+
+ public static <T> T[] concat(T[] first, T[]... rest) {
+ int totalLength = first.length;
+ for (T[] array : rest) {
+ totalLength += array.length;
+ }
+ T[] result = Arrays.copyOf(first, totalLength);
+ int offset = first.length;
+ for (T[] array : rest) {
+ System.arraycopy(array, 0, result, offset, array.length);
+ offset += array.length;
+ }
+ return result;
+ }
+
+ public static byte[] concat(ColumnModifier columnModifier, ImmutableBytesWritable... writables) {
+ int totalLength = 0;
+ for (ImmutableBytesWritable writable : writables) {
+ totalLength += writable.getLength();
+ }
+ byte[] result = new byte[totalLength];
+ int offset = 0;
+ for (ImmutableBytesWritable array : writables) {
+ byte[] bytes = array.get();
+ if (columnModifier != null) {
+ bytes = columnModifier.apply(bytes, array.getOffset(), new byte[array.getLength()], 0, array.getLength());
+ }
+ System.arraycopy(bytes, array.getOffset(), result, offset, array.getLength());
+ offset += array.getLength();
+ }
+ return result;
+ }
+
+ public static int vintFromBytes(byte[] buffer, int offset) {
+ try {
+ return (int)Bytes.readVLong(buffer, offset);
+ } catch (IOException e) { // Impossible
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Decode a vint from the buffer pointed at to by ptr and
+ * increment the offset of the ptr by the length of the
+ * vint.
+ * @param ptr a pointer to a byte array buffer
+ * @return the decoded vint value as an int
+ */
+ public static int vintFromBytes(ImmutableBytesWritable ptr) {
+ return (int) vlongFromBytes(ptr);
+ }
+
+ /**
+ * Decode a vint from the buffer pointed at to by ptr and
+ * increment the offset of the ptr by the length of the
+ * vint.
+ * @param ptr a pointer to a byte array buffer
+ * @return the decoded vint value as a long
+ */
+ public static long vlongFromBytes(ImmutableBytesWritable ptr) {
+ final byte [] buffer = ptr.get();
+ final int offset = ptr.getOffset();
+ byte firstByte = buffer[offset];
+ int len = WritableUtils.decodeVIntSize(firstByte);
+ if (len == 1) {
+ ptr.set(buffer, offset+1, ptr.getLength());
+ return firstByte;
+ }
+ long i = 0;
+ for (int idx = 0; idx < len-1; idx++) {
+ byte b = buffer[offset + 1 + idx];
+ i = i << 8;
+ i = i | (b & 0xFF);
+ }
+ ptr.set(buffer, offset+len, ptr.getLength());
+ return (WritableUtils.isNegativeVInt(firstByte) ? ~i : i);
+ }
+
+
+ /**
+ * Put long as variable length encoded number at the offset in the result byte array
+ * @param vint Integer to make a vint of.
+ * @param result buffer to put vint into
+ * @return Vint length in bytes of vint
+ */
+ public static int vintToBytes(byte[] result, int offset, final long vint) {
+ long i = vint;
+ if (i >= -112 && i <= 127) {
+ result[offset] = (byte) i;
+ return 1;
+ }
+
+ int len = -112;
+ if (i < 0) {
+ i ^= -1L; // take one's complement'
+ len = -120;
+ }
+
+ long tmp = i;
+ while (tmp != 0) {
+ tmp = tmp >> 8;
+ len--;
+ }
+
+ result[offset++] = (byte) len;
+
+ len = (len < -120) ? -(len + 120) : -(len + 112);
+
+ for (int idx = len; idx != 0; idx--) {
+ int shiftbits = (idx - 1) * 8;
+ long mask = 0xFFL << shiftbits;
+ result[offset++] = (byte)((i & mask) >> shiftbits);
+ }
+ return len + 1;
+ }
+
+ /**
+ * Increment the key to the next key
+ * @param key the key to increment
+ * @return a new byte array with the next key or null
+ * if the key could not be incremented because it's
+ * already at its max value.
+ */
+ public static byte[] nextKey(byte[] key) {
+ byte[] nextStartRow = new byte[key.length];
+ System.arraycopy(key, 0, nextStartRow, 0, key.length);
+ if (!nextKey(nextStartRow, nextStartRow.length)) {
+ return null;
+ }
+ return nextStartRow;
+ }
+
+ /**
+ * Increment the key in-place to the next key
+ * @param key the key to increment
+ * @param length the length of the key
+ * @return true if the key can be incremented and
+ * false otherwise if the key is at its max
+ * value.
+ */
+ public static boolean nextKey(byte[] key, int length) {
+ return nextKey(key, 0, length);
+ }
+
+ public static boolean nextKey(byte[] key, int offset, int length) {
+ if (length == 0) {
+ return false;
+ }
+ int i = offset + length - 1;
+ while (key[i] == -1) {
+ key[i] = 0;
+ i--;
+ if (i < offset) {
+ // Change bytes back to the way they were
+ do {
+ key[++i] = -1;
+ } while (i < offset + length - 1);
+ return false;
+ }
+ }
+ key[i] = (byte)(key[i] + 1);
+ return true;
+ }
+
+ public static byte[] previousKey(byte[] key) {
+ byte[] previousKey = new byte[key.length];
+ System.arraycopy(key, 0, previousKey, 0, key.length);
+ if (!previousKey(previousKey, previousKey.length)) {
+ return null;
+ }
+ return previousKey;
+ }
+
+ public static boolean previousKey(byte[] key, int length) {
+ return previousKey(key, 0, length);
+ }
+
+ public static boolean previousKey(byte[] key, int offset, int length) {
+ if (length == 0) {
+ return false;
+ }
+ int i = offset + length - 1;
+ while (key[i] == 0) {
+ key[i] = -1;
+ i--;
+ if (i < offset) {
+ // Change bytes back to the way they were
+ do {
+ key[++i] = 0;
+ } while (i < offset + length - 1);
+ return false;
+ }
+ }
+ key[i] = (byte)(key[i] - 1);
+ return true;
+ }
+
+ /**
+ * Expand the key to length bytes using the fillByte to fill the
+ * bytes beyond the current key length.
+ */
+ public static byte[] fillKey(byte[] key, int length) {
+ if(key.length > length) {
+ throw new IllegalStateException();
+ }
+ if (key.length == length) {
+ return key;
+ }
+ byte[] newBound = new byte[length];
+ System.arraycopy(key, 0, newBound, 0, key.length);
+ return newBound;
+ }
+
+ /**
+ * Get the size in bytes of the UTF-8 encoded CharSequence
+ * @param sequence the CharSequence
+ */
+ public static int getSize(CharSequence sequence) {
+ int count = 0;
+ for (int i = 0, len = sequence.length(); i < len; i++) {
+ char ch = sequence.charAt(i);
+ if (ch <= 0x7F) {
+ count++;
+ } else if (ch <= 0x7FF) {
+ count += 2;
+ } else if (Character.isHighSurrogate(ch)) {
+ count += 4;
+ ++i;
+ } else {
+ count += 3;
+ }
+ }
+ return count;
+ }
+
+ public static boolean isInclusive(CompareOp op) {
+ switch (op) {
+ case LESS:
+ case GREATER:
+ return false;
+ case EQUAL:
+ case NOT_EQUAL:
+ case LESS_OR_EQUAL:
+ case GREATER_OR_EQUAL:
+ return true;
+ default:
+ throw new RuntimeException("Unknown Compare op " + op.name());
+ }
+ }
+ public static boolean compare(CompareOp op, int compareResult) {
+ switch (op) {
+ case LESS:
+ return compareResult < 0;
+ case LESS_OR_EQUAL:
+ return compareResult <= 0;
+ case EQUAL:
+ return compareResult == 0;
+ case NOT_EQUAL:
+ return compareResult != 0;
+ case GREATER_OR_EQUAL:
+ return compareResult >= 0;
+ case GREATER:
+ return compareResult > 0;
+ default:
+ throw new RuntimeException("Unknown Compare op " + op.name());
+ }
+ }
+
+ /**
+ * Given an ImmutableBytesWritable, returns the payload part of the argument as an byte array.
+ */
+ public static byte[] copyKeyBytesIfNecessary(ImmutableBytesWritable ptr) {
+ if (ptr.getOffset() == 0 && ptr.getLength() == ptr.get().length) {
+ return ptr.get();
+ }
+ return ptr.copyBytes();
+ }
+
+ public static KeyRange getKeyRange(byte[] key, CompareOp op, PDataType type) {
+ switch (op) {
+ case EQUAL:
+ return type.getKeyRange(key, true, key, true);
+ case GREATER:
+ return type.getKeyRange(key, false, KeyRange.UNBOUND, false);
+ case GREATER_OR_EQUAL:
+ return type.getKeyRange(key, true, KeyRange.UNBOUND, false);
+ case LESS:
+ return type.getKeyRange(KeyRange.UNBOUND, false, key, false);
+ case LESS_OR_EQUAL:
+ return type.getKeyRange(KeyRange.UNBOUND, false, key, true);
+ default:
+ throw new IllegalArgumentException("Unknown operator " + op);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/util/CSVLoader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/util/CSVLoader.java b/src/main/java/org/apache/phoenix/util/CSVLoader.java
new file mode 100644
index 0000000..57c28ec
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/util/CSVLoader.java
@@ -0,0 +1,250 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import au.com.bytecode.opencsv.CSVReader;
+import com.google.common.collect.Maps;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.PDataType;
+
+import java.io.FileReader;
+import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/***
+ * Upserts CSV data using Phoenix JDBC connection
+ *
+ * @author mchohan
+ *
+ */
+public class CSVLoader {
+
+ private final PhoenixConnection conn;
+ private final String tableName;
+ private final List<String> columns;
+ private final boolean isStrict;
+ private final List<String> delimiter;
+ private final Map<String,Character> ctrlTable = new HashMap<String,Character>() {
+ { put("1",'\u0001');
+ put("2",'\u0002');
+ put("3",'\u0003');
+ put("4",'\u0004');
+ put("5",'\u0005');
+ put("6",'\u0006');
+ put("7",'\u0007');
+ put("8",'\u0008');
+ put("9",'\u0009');}};
+
+ private int unfoundColumnCount;
+
+ public CSVLoader(PhoenixConnection conn, String tableName, List<String> columns, boolean isStrict,List<String> delimiter) {
+ this.conn = conn;
+ this.tableName = tableName;
+ this.columns = columns;
+ this.isStrict = isStrict;
+ this.delimiter = delimiter;
+ }
+
+ public CSVLoader(PhoenixConnection conn, String tableName, List<String> columns, boolean isStrict) {
+ this(conn,tableName,columns,isStrict,null);
+ }
+
+
+ /**
+ * Upserts data from CSV file. Data is batched up based on connection batch
+ * size. Column PDataType is read from metadata and is used to convert
+ * column value to correct type before upsert. Note: Column Names are
+ * expected as first line of CSV file.
+ *
+ * @param fileName
+ * @throws Exception
+ */
+ public void upsert(String fileName) throws Exception {
+ List<String> delimiter = this.delimiter;
+ CSVReader reader;
+ if ((delimiter != null) && (delimiter.size() == 3)) {
+ reader = new CSVReader(new FileReader(fileName),
+ getCSVCustomField(this.delimiter.get(0)),
+ getCSVCustomField(this.delimiter.get(1)),
+ getCSVCustomField(this.delimiter.get(2)));
+ } else {
+ reader = new CSVReader(new FileReader(fileName));
+ }
+ upsert(reader);
+ }
+
+
+ public char getCSVCustomField(String field) {
+ if(this.ctrlTable.containsKey(field)) {
+ return this.ctrlTable.get(field);
+ } else {
+ return field.charAt(0);
+ }
+ }
+
+ /**
+ * Upserts data from CSV file. Data is batched up based on connection batch
+ * size. Column PDataType is read from metadata and is used to convert
+ * column value to correct type before upsert. Note: Column Names are
+ * expected as first line of CSV file.
+ *
+ * @param reader CSVReader instance
+ * @throws Exception
+ */
+ public void upsert(CSVReader reader) throws Exception {
+ List<String> columns = this.columns;
+ if (columns != null && columns.isEmpty()) {
+ columns = Arrays.asList(reader.readNext());
+ }
+ ColumnInfo[] columnInfo = generateColumnInfo(columns);
+ PreparedStatement stmt = null;
+ PreparedStatement[] stmtCache = null;
+ if (columns == null) {
+ stmtCache = new PreparedStatement[columnInfo.length];
+ } else {
+ String upsertStatement = QueryUtil.constructUpsertStatement(columnInfo, tableName, columnInfo.length - unfoundColumnCount);
+ stmt = conn.prepareStatement(upsertStatement);
+ }
+ String[] nextLine;
+ int rowCount = 0;
+ int upsertBatchSize = conn.getMutateBatchSize();
+ boolean wasAutoCommit = conn.getAutoCommit();
+ try {
+ conn.setAutoCommit(false);
+ Object upsertValue = null;
+ long start = System.currentTimeMillis();
+
+ // Upsert data based on SqlType of each column
+ while ((nextLine = reader.readNext()) != null) {
+ if (columns == null) {
+ stmt = stmtCache[nextLine.length-1];
+ if (stmt == null) {
+ String upsertStatement = QueryUtil.constructUpsertStatement(columnInfo, tableName, nextLine.length);
+ stmt = conn.prepareStatement(upsertStatement);
+ stmtCache[nextLine.length-1] = stmt;
+ }
+ }
+ for (int index = 0; index < columnInfo.length; index++) {
+ if (columnInfo[index] == null) {
+ continue;
+ }
+ String line = nextLine[index];
+ Integer info = columnInfo[index].getSqlType();
+ upsertValue = convertTypeSpecificValue(line, info);
+ if (upsertValue != null) {
+ stmt.setObject(index + 1, upsertValue, columnInfo[index].getSqlType());
+ } else {
+ stmt.setNull(index + 1, columnInfo[index].getSqlType());
+ }
+ }
+ stmt.execute();
+
+ // Commit when batch size is reached
+ if (++rowCount % upsertBatchSize == 0) {
+ conn.commit();
+ System.out.println("Rows upserted: " + rowCount);
+ }
+ }
+ conn.commit();
+ double elapsedDuration = ((System.currentTimeMillis() - start) / 1000.0);
+ System.out.println("CSV Upsert complete. " + rowCount + " rows upserted");
+ System.out.println("Time: " + elapsedDuration + " sec(s)\n");
+ } finally {
+ if(stmt != null) {
+ stmt.close();
+ }
+ if (wasAutoCommit) conn.setAutoCommit(true);
+ }
+ }
+
+ /**
+ * Gets CSV string input converted to correct type
+ */
+ private Object convertTypeSpecificValue(String s, Integer sqlType) throws Exception {
+ return PDataType.fromSqlType(sqlType).toObject(s);
+ }
+
+ /**
+ * Get array of ColumnInfos that contain Column Name and its associated
+ * PDataType
+ *
+ * @param columns
+ * @return
+ * @throws SQLException
+ */
+ private ColumnInfo[] generateColumnInfo(List<String> columns)
+ throws SQLException {
+ Map<String,Integer> columnNameToTypeMap = Maps.newLinkedHashMap();
+ DatabaseMetaData dbmd = conn.getMetaData();
+ // TODO: escape wildcard characters here because we don't want that behavior here
+ String escapedTableName = StringUtil.escapeLike(tableName);
+ String[] schemaAndTable = escapedTableName.split("\\.");
+ ResultSet rs = null;
+ try {
+ rs = dbmd.getColumns(null, (schemaAndTable.length == 1 ? "" : schemaAndTable[0]),
+ (schemaAndTable.length == 1 ? escapedTableName : schemaAndTable[1]),
+ null);
+ while (rs.next()) {
+ columnNameToTypeMap.put(rs.getString(QueryUtil.COLUMN_NAME_POSITION), rs.getInt(QueryUtil.DATA_TYPE_POSITION));
+ }
+ } finally {
+ if(rs != null) {
+ rs.close();
+ }
+ }
+ ColumnInfo[] columnType;
+ if (columns == null) {
+ int i = 0;
+ columnType = new ColumnInfo[columnNameToTypeMap.size()];
+ for (Map.Entry<String, Integer> entry : columnNameToTypeMap.entrySet()) {
+ columnType[i++] = new ColumnInfo(entry.getKey(),entry.getValue());
+ }
+ } else {
+ // Leave "null" as indication to skip b/c it doesn't exist
+ columnType = new ColumnInfo[columns.size()];
+ for (int i = 0; i < columns.size(); i++) {
+ String columnName = SchemaUtil.normalizeIdentifier(columns.get(i).trim());
+ Integer sqlType = columnNameToTypeMap.get(columnName);
+ if (sqlType == null) {
+ if (isStrict) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.COLUMN_NOT_FOUND)
+ .setColumnName(columnName).setTableName(tableName).build().buildException();
+ }
+ unfoundColumnCount++;
+ } else {
+ columnType[i] = new ColumnInfo(columnName, sqlType);
+ }
+ }
+ if (unfoundColumnCount == columns.size()) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.COLUMN_NOT_FOUND)
+ .setColumnName(Arrays.toString(columns.toArray(new String[0]))).setTableName(tableName).build().buildException();
+ }
+ }
+ return columnType;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/util/Closeables.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/util/Closeables.java b/src/main/java/org/apache/phoenix/util/Closeables.java
new file mode 100644
index 0000000..b09ebe0
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/util/Closeables.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.*;
+
+import com.google.common.collect.Iterables;
+
+
+/**
+ * Utilities for operating on {@link Closeable}s.
+ *
+ */
+public class Closeables {
+ /** Not constructed */
+ private Closeables() { }
+
+ /**
+ * Allows you to close as many of the {@link Closeable}s as possible.
+ *
+ * If any of the close's fail with an IOException, those exception(s) will
+ * be thrown after attempting to close all of the inputs.
+ */
+ public static void closeAll(Iterable<? extends Closeable> iterable) throws IOException {
+ IOException ex = closeAllQuietly(iterable);
+ if (ex != null) throw ex;
+ }
+
+ public static IOException closeAllQuietly(Iterable<? extends Closeable> iterable) {
+ if (iterable == null) return null;
+
+ LinkedList<IOException> exceptions = null;
+ for (Closeable closeable : iterable) {
+ try {
+ closeable.close();
+ } catch (IOException x) {
+ if (exceptions == null) exceptions = new LinkedList<IOException>();
+ exceptions.add(x);
+ }
+ }
+
+ IOException ex = MultipleCausesIOException.fromIOExceptions(exceptions);
+ return ex;
+ }
+
+ static private class MultipleCausesIOException extends IOException {
+ private static final long serialVersionUID = 1L;
+
+ static IOException fromIOExceptions(Collection<? extends IOException> exceptions) {
+ if (exceptions == null || exceptions.isEmpty()) return null;
+ if (exceptions.size() == 1) return Iterables.getOnlyElement(exceptions);
+
+ return new MultipleCausesIOException(exceptions);
+ }
+
+ private final Collection<? extends IOException> exceptions;
+ private boolean hasSetStackTrace;
+
+ /**
+ * Use the {@link #fromIOExceptions(Collection) factory}.
+ */
+ private MultipleCausesIOException(Collection<? extends IOException> exceptions) {
+ this.exceptions = exceptions;
+ }
+
+ @Override
+ public String getMessage() {
+ StringBuilder sb = new StringBuilder(this.exceptions.size() * 50);
+ int exceptionNum = 0;
+ for (IOException ex : this.exceptions) {
+ sb.append("Cause Number " + exceptionNum + ": " + ex.getMessage() + "\n");
+ exceptionNum++;
+ }
+ return sb.toString();
+ }
+
+ @Override
+ public StackTraceElement[] getStackTrace() {
+ if (!this.hasSetStackTrace) {
+ ArrayList<StackTraceElement> frames = new ArrayList<StackTraceElement>(this.exceptions.size() * 20);
+
+ int exceptionNum = 0;
+ for (IOException exception : this.exceptions) {
+ StackTraceElement header = new StackTraceElement(MultipleCausesIOException.class.getName(),
+ "Exception Number " + exceptionNum,
+ "<no file>",
+ 0);
+
+ frames.add(header);
+ for (StackTraceElement ste : exception.getStackTrace()) {
+ frames.add(ste);
+ }
+ exceptionNum++;
+ }
+
+ setStackTrace(frames.toArray(new StackTraceElement[frames.size()]));
+ this.hasSetStackTrace = true;
+ }
+
+ return super.getStackTrace();
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/util/ColumnInfo.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/util/ColumnInfo.java b/src/main/java/org/apache/phoenix/util/ColumnInfo.java
new file mode 100644
index 0000000..9ca05e7
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/util/ColumnInfo.java
@@ -0,0 +1,22 @@
+package org.apache.phoenix.util;
+
+/**
+ * ColumnInfo used to store Column Name and its associated PDataType
+ */
+public class ColumnInfo {
+ private String columnName;
+ private Integer sqlType;
+
+ public ColumnInfo(String columnName, Integer sqlType) {
+ this.columnName = columnName;
+ this.sqlType = sqlType;
+ }
+
+ public String getColumnName() {
+ return columnName;
+ }
+
+ public Integer getSqlType() {
+ return sqlType;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/util/DateUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/util/DateUtil.java b/src/main/java/org/apache/phoenix/util/DateUtil.java
new file mode 100644
index 0000000..e68ae48
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/util/DateUtil.java
@@ -0,0 +1,154 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.text.Format;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.TimeZone;
+
+import org.apache.commons.lang.time.FastDateFormat;
+
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.IllegalDataException;
+
+
+
+@SuppressWarnings("serial")
+public class DateUtil {
+ public static final TimeZone DATE_TIME_ZONE = TimeZone.getTimeZone("GMT");
+ public static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss"; // This is the format the app sets in NLS settings for every connection.
+ public static final Format DEFAULT_DATE_FORMATTER = FastDateFormat.getInstance(DEFAULT_DATE_FORMAT, DATE_TIME_ZONE);
+
+ public static final String DEFAULT_MS_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS";
+ public static final Format DEFAULT_MS_DATE_FORMATTER = FastDateFormat.getInstance(DEFAULT_MS_DATE_FORMAT, DATE_TIME_ZONE);
+
+ private DateUtil() {
+ }
+
+ public static Format getDateParser(String pattern) {
+ SimpleDateFormat format = new SimpleDateFormat(pattern) {
+ @Override
+ public java.util.Date parseObject(String source) throws ParseException {
+ java.util.Date date = super.parse(source);
+ return new java.sql.Date(date.getTime());
+ }
+ };
+ format.setTimeZone(DateUtil.DATE_TIME_ZONE);
+ return format;
+ }
+
+ public static Format getTimeParser(String pattern) {
+ SimpleDateFormat format = new SimpleDateFormat(pattern) {
+ @Override
+ public java.util.Date parseObject(String source) throws ParseException {
+ java.util.Date date = super.parse(source);
+ return new java.sql.Time(date.getTime());
+ }
+ };
+ format.setTimeZone(DateUtil.DATE_TIME_ZONE);
+ return format;
+ }
+
+ public static Format getTimestampParser(String pattern) {
+ SimpleDateFormat format = new SimpleDateFormat(pattern) {
+ @Override
+ public java.util.Date parseObject(String source) throws ParseException {
+ java.util.Date date = super.parse(source);
+ return new java.sql.Timestamp(date.getTime());
+ }
+ };
+ format.setTimeZone(DateUtil.DATE_TIME_ZONE);
+ return format;
+ }
+
+ public static Format getDateFormatter(String pattern) {
+ return DateUtil.DEFAULT_DATE_FORMAT.equals(pattern) ? DateUtil.DEFAULT_DATE_FORMATTER : FastDateFormat.getInstance(pattern, DateUtil.DATE_TIME_ZONE);
+ }
+
+ private static ThreadLocal<Format> dateFormat =
+ new ThreadLocal < Format > () {
+ @Override protected Format initialValue() {
+ return getDateParser(DEFAULT_DATE_FORMAT);
+ }
+ };
+
+ public static Date parseDate(String dateValue) {
+ try {
+ return (Date)dateFormat.get().parseObject(dateValue);
+ } catch (ParseException e) {
+ throw new IllegalDataException(e);
+ }
+ }
+
+ private static ThreadLocal<Format> timeFormat =
+ new ThreadLocal < Format > () {
+ @Override protected Format initialValue() {
+ return getTimeParser(DEFAULT_DATE_FORMAT);
+ }
+ };
+
+ public static Time parseTime(String timeValue) {
+ try {
+ return (Time)timeFormat.get().parseObject(timeValue);
+ } catch (ParseException e) {
+ throw new IllegalDataException(e);
+ }
+ }
+
+ private static ThreadLocal<Format> timestampFormat =
+ new ThreadLocal < Format > () {
+ @Override protected Format initialValue() {
+ return getTimestampParser(DEFAULT_DATE_FORMAT);
+ }
+ };
+
+ public static Timestamp parseTimestamp(String timeValue) {
+ try {
+ return (Timestamp)timestampFormat.get().parseObject(timeValue);
+ } catch (ParseException e) {
+ throw new IllegalDataException(e);
+ }
+ }
+
+ /**
+ * Utility function to work around the weirdness of the {@link Timestamp} constructor.
+ * This method takes the milli-seconds that spills over to the nanos part as part of
+ * constructing the {@link Timestamp} object.
+ * If we just set the nanos part of timestamp to the nanos passed in param, we
+ * end up losing the sub-second part of timestamp.
+ */
+ public static Timestamp getTimestamp(long millis, int nanos) {
+ Timestamp ts = new Timestamp(millis);
+ ts.setNanos(ts.getNanos() + nanos);
+ return ts;
+ }
+
+ /**
+ * Utility function to convert a {@link BigDecimal} value to {@link Timestamp}.
+ */
+ public static Timestamp getTimestamp(BigDecimal bd) {
+ return DateUtil.getTimestamp(bd.longValue(), ((bd.remainder(BigDecimal.ONE).multiply(BigDecimal.valueOf(QueryConstants.MILLIS_TO_NANOS_CONVERTOR))).intValue()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/util/IndexUtil.java b/src/main/java/org/apache/phoenix/util/IndexUtil.java
new file mode 100644
index 0000000..7336d5c
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -0,0 +1,199 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.common.collect.Lists;
+import org.apache.hbase.index.ValueGetter;
+import org.apache.hbase.index.covered.update.ColumnReference;
+import org.apache.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
+import org.apache.phoenix.schema.ColumnNotFoundException;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PColumnFamily;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.PTable;
+
+public class IndexUtil {
+ public static final String INDEX_COLUMN_NAME_SEP = ":";
+ public static final byte[] INDEX_COLUMN_NAME_SEP_BYTES = Bytes.toBytes(INDEX_COLUMN_NAME_SEP);
+
+ private IndexUtil() {
+ }
+
+ // Since we cannot have nullable fixed length in a row key
+ // we need to translate to variable length.
+ public static PDataType getIndexColumnDataType(PColumn dataColumn) throws SQLException {
+ PDataType type = getIndexColumnDataType(dataColumn.isNullable(),dataColumn.getDataType());
+ if (type == null) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_INDEX_COLUMN_ON_TYPE).setColumnName(dataColumn.getName().getString())
+ .setMessage("Type="+dataColumn.getDataType()).build().buildException();
+ }
+ return type;
+ }
+
+ // Since we cannot have nullable fixed length in a row key
+ // we need to translate to variable length. The verification that we have a valid index
+ // row key was already done, so here we just need to covert from one built-in type to
+ // another.
+ public static PDataType getIndexColumnDataType(boolean isNullable, PDataType dataType) {
+ if (dataType == null || !isNullable || !dataType.isFixedWidth() || dataType == PDataType.BINARY) {
+ return dataType;
+ }
+ // for INT, BIGINT
+ if (dataType.isCoercibleTo(PDataType.TIMESTAMP) || dataType.isCoercibleTo(PDataType.DECIMAL)) {
+ return PDataType.DECIMAL;
+ }
+ // for CHAR
+ if (dataType.isCoercibleTo(PDataType.VARCHAR)) {
+ return PDataType.VARCHAR;
+ }
+ throw new IllegalArgumentException("Unsupported non nullable index type " + dataType);
+ }
+
+
+ public static String getDataColumnName(String name) {
+ return name.substring(name.indexOf(INDEX_COLUMN_NAME_SEP) + 1);
+ }
+
+ public static String getDataColumnFamilyName(String name) {
+ return name.substring(0,name.indexOf(INDEX_COLUMN_NAME_SEP));
+ }
+
+ public static String getDataColumnFullName(String name) {
+ int index = name.indexOf(INDEX_COLUMN_NAME_SEP) ;
+ if (index == 0) {
+ return name.substring(index+1);
+ }
+ return SchemaUtil.getColumnDisplayName(name.substring(0, index), name.substring(index+1));
+ }
+
+ public static String getIndexColumnName(String dataColumnFamilyName, String dataColumnName) {
+ return (dataColumnFamilyName == null ? "" : dataColumnFamilyName) + INDEX_COLUMN_NAME_SEP + dataColumnName;
+ }
+
+ public static byte[] getIndexColumnName(byte[] dataColumnFamilyName, byte[] dataColumnName) {
+ return ByteUtil.concat(dataColumnFamilyName == null ? ByteUtil.EMPTY_BYTE_ARRAY : dataColumnFamilyName, INDEX_COLUMN_NAME_SEP_BYTES, dataColumnName);
+ }
+
+ public static String getIndexColumnName(PColumn dataColumn) {
+ String dataColumnFamilyName = SchemaUtil.isPKColumn(dataColumn) ? null : dataColumn.getFamilyName().getString();
+ return getIndexColumnName(dataColumnFamilyName, dataColumn.getName().getString());
+ }
+
+ public static PColumn getDataColumn(PTable dataTable, String indexColumnName) {
+ int pos = indexColumnName.indexOf(INDEX_COLUMN_NAME_SEP);
+ if (pos < 0) {
+ throw new IllegalArgumentException("Could not find expected '" + INDEX_COLUMN_NAME_SEP + "' separator in index column name of \"" + indexColumnName + "\"");
+ }
+ if (pos == 0) {
+ try {
+ return dataTable.getPKColumn(indexColumnName.substring(1));
+ } catch (ColumnNotFoundException e) {
+ throw new IllegalArgumentException("Could not find PK column \"" + indexColumnName.substring(pos+1) + "\" in index column name of \"" + indexColumnName + "\"", e);
+ }
+ }
+ PColumnFamily family;
+ try {
+ family = dataTable.getColumnFamily(indexColumnName.substring(0, pos));
+ } catch (ColumnFamilyNotFoundException e) {
+ throw new IllegalArgumentException("Could not find column family \"" + indexColumnName.substring(0, pos) + "\" in index column name of \"" + indexColumnName + "\"", e);
+ }
+ try {
+ return family.getColumn(indexColumnName.substring(pos+1));
+ } catch (ColumnNotFoundException e) {
+ throw new IllegalArgumentException("Could not find column \"" + indexColumnName.substring(pos+1) + "\" in index column name of \"" + indexColumnName + "\"", e);
+ }
+ }
+
+ private static boolean isEmptyKeyValue(PTable table, ColumnReference ref) {
+ byte[] emptyKeyValueCF = SchemaUtil.getEmptyColumnFamily(table.getColumnFamilies());
+ return (Bytes.compareTo(emptyKeyValueCF, ref.getFamily()) == 0 &&
+ Bytes.compareTo(QueryConstants.EMPTY_COLUMN_BYTES, ref.getQualifier()) == 0);
+ }
+ public static List<Mutation> generateIndexData(final PTable table, PTable index, List<Mutation> dataMutations, ImmutableBytesWritable ptr) throws SQLException {
+ try {
+ IndexMaintainer maintainer = index.getIndexMaintainer(table);
+ List<Mutation> indexMutations = Lists.newArrayListWithExpectedSize(dataMutations.size());
+ for (final Mutation dataMutation : dataMutations) {
+ long ts = MetaDataUtil.getClientTimeStamp(dataMutation);
+ ptr.set(dataMutation.getRow());
+ if (dataMutation instanceof Put) {
+ // TODO: is this more efficient than looking in our mutation map
+ // using the key plus finding the PColumn?
+ ValueGetter valueGetter = new ValueGetter() {
+
+ @Override
+ public ImmutableBytesPtr getLatestValue(ColumnReference ref) {
+ // Always return null for our empty key value, as this will cause the index
+ // maintainer to always treat this Put as a new row.
+ if (isEmptyKeyValue(table, ref)) {
+ return null;
+ }
+ Map<byte [], List<KeyValue>> familyMap = dataMutation.getFamilyMap();
+ byte[] family = ref.getFamily();
+ List<KeyValue> kvs = familyMap.get(family);
+ if (kvs == null) {
+ return null;
+ }
+ byte[] qualifier = ref.getQualifier();
+ for (KeyValue kv : kvs) {
+ if (Bytes.compareTo(kv.getBuffer(), kv.getFamilyOffset(), kv.getFamilyLength(), family, 0, family.length) == 0 &&
+ Bytes.compareTo(kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength(), qualifier, 0, qualifier.length) == 0) {
+ return new ImmutableBytesPtr(kv.getBuffer(), kv.getValueOffset(), kv.getValueLength());
+ }
+ }
+ return null;
+ }
+
+ };
+ indexMutations.add(maintainer.buildUpdateMutation(valueGetter, ptr, ts));
+ } else {
+ if (!maintainer.getIndexedColumns().isEmpty()) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.NO_DELETE_IF_IMMUTABLE_INDEX).setSchemaName(table.getSchemaName().getString())
+ .setTableName(table.getTableName().getString()).build().buildException();
+ }
+ indexMutations.add(maintainer.buildDeleteMutation(ptr, ts));
+ }
+ }
+ return indexMutations;
+ } catch (IOException e) {
+ throw new SQLException(e);
+ }
+ }
+
+ public static boolean isDataPKColumn(PColumn column) {
+ return column.getName().getString().startsWith(INDEX_COLUMN_NAME_SEP);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/util/InstanceResolver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/util/InstanceResolver.java b/src/main/java/org/apache/phoenix/util/InstanceResolver.java
new file mode 100644
index 0000000..85cf54d
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/util/InstanceResolver.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.util.ServiceLoader;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Resolves object instances registered using the JDK 6+ {@link java.util.ServiceLoader}.
+ *
+ * @author aaraujo
+ * @since 2.0
+ */
+public class InstanceResolver {
+ private static final ConcurrentHashMap<Class, Object> RESOLVED_SINGLETONS = new ConcurrentHashMap<Class, Object>();
+
+ private InstanceResolver() {/* not allowed */}
+
+ /**
+ * Resolves an instance of the specified class if it has not already been resolved.
+ * @param clazz The type of instance to resolve
+ * @param defaultInstance The instance to use if a custom instance has not been registered
+ * @return The resolved instance or the default instance provided.
+ * {@code null} if an instance is not registered and a default is not provided.
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> T getSingleton(Class<T> clazz, T defaultInstance) {
+ Object obj = RESOLVED_SINGLETONS.get(clazz);
+ if(obj != null) {
+ return (T)obj;
+ }
+ if (defaultInstance != null && !clazz.isInstance(defaultInstance)) throw new IllegalArgumentException("defaultInstance is not of type " + clazz.getName());
+ final Object o = resolveSingleton(clazz, defaultInstance);
+ obj = RESOLVED_SINGLETONS.putIfAbsent(clazz, o);
+ if(obj == null) {
+ obj = o;
+ }
+ return (T)obj;
+ }
+
+ private synchronized static <T> T resolveSingleton(Class<T> clazz, T defaultInstance) {
+ ServiceLoader<T> loader = ServiceLoader.load(clazz);
+ // returns the first registered instance found
+ for (T singleton : loader) {
+ return singleton;
+ }
+ return defaultInstance;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/util/JDBCUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/util/JDBCUtil.java b/src/main/java/org/apache/phoenix/util/JDBCUtil.java
new file mode 100644
index 0000000..b3adb85
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/util/JDBCUtil.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.sql.SQLException;
+import java.util.Properties;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+
+
+
+/**
+ * Utilities for JDBC
+ *
+ * @author jtaylor
+ * @since 178
+ */
+public class JDBCUtil {
+
+ private JDBCUtil() {
+ }
+
+ /**
+ * Find the propName by first looking in the url string and if not found,
+ * next in the info properties. If not found, null is returned.
+ * @param url JDBC connection URL
+ * @param info JDBC connection properties
+ * @param propName the name of the property to find
+ * @return the property value or null if not found
+ */
+ public static String findProperty(String url, Properties info, String propName) {
+ String urlPropName = ";" + propName + "=";
+ String propValue = info.getProperty(propName);
+ if (propValue == null) {
+ int begIndex = url.indexOf(urlPropName);
+ if (begIndex >= 0) {
+ int endIndex = url.indexOf(';',begIndex + urlPropName.length());
+ if (endIndex < 0) {
+ endIndex = url.length();
+ }
+ propValue = url.substring(begIndex + urlPropName.length(), endIndex);
+ }
+ }
+ return propValue;
+ }
+
+ public static Long getCurrentSCN(String url, Properties info) throws SQLException {
+ String scnStr = findProperty(url, info, PhoenixRuntime.CURRENT_SCN_ATTRIB);
+ return (scnStr == null ? null : Long.parseLong(scnStr));
+ }
+
+ public static int getMutateBatchSize(String url, Properties info, ReadOnlyProps props) throws SQLException {
+ String batchSizeStr = findProperty(url, info, PhoenixRuntime.UPSERT_BATCH_SIZE_ATTRIB);
+ return (batchSizeStr == null ? props.getInt(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE) : Integer.parseInt(batchSizeStr));
+ }
+
+ public static byte[] getTenantId(String url, Properties info) throws SQLException {
+ String tenantId = findProperty(url, info, PhoenixRuntime.TENANT_ID_ATTRIB);
+ return (tenantId == null ? null : Bytes.toBytes(tenantId));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/util/KeyValueUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/util/KeyValueUtil.java b/src/main/java/org/apache/phoenix/util/KeyValueUtil.java
new file mode 100644
index 0000000..ea6f28d
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/util/KeyValueUtil.java
@@ -0,0 +1,134 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ *
+ * Utilities for KeyValue. Where there's duplication with KeyValue methods,
+ * these avoid creating new objects when not necessary (primary preventing
+ * byte array copying).
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class KeyValueUtil {
+ private KeyValueUtil() {
+ }
+
+ public static KeyValue newKeyValue(Result r, byte[] cf, byte[] cq, long ts, byte[] value, int valueOffset, int valueLength) {
+ byte[] bytes = ResultUtil.getRawBytes(r);
+ return new KeyValue(bytes, ResultUtil.getKeyOffset(r), ResultUtil.getKeyLength(r),
+ cf, 0, cf.length,
+ cq, 0, cq.length,
+ ts, Type.Put,
+ value, valueOffset, valueLength);
+ }
+
+ public static KeyValue newKeyValue(byte[] key, byte[] cf, byte[] cq, long ts, byte[] value, int valueOffset, int valueLength) {
+ return new KeyValue(key, 0, key.length,
+ cf, 0, cf.length,
+ cq, 0, cq.length,
+ ts, Type.Put,
+ value, valueOffset, valueLength);
+ }
+
+ public static KeyValue newKeyValue(ImmutableBytesWritable key, byte[] cf, byte[] cq, long ts, byte[] value, int valueOffset, int valueLength) {
+ return new KeyValue(key.get(), key.getOffset(), key.getLength(),
+ cf, 0, cf.length,
+ cq, 0, cq.length,
+ ts, Type.Put,
+ value, valueOffset, valueLength);
+ }
+
+ public static KeyValue newKeyValue(byte[] key, int keyOffset, int keyLength, byte[] cf, byte[] cq, long ts, byte[] value, int valueOffset, int valueLength) {
+ return new KeyValue(key, keyOffset, keyLength,
+ cf, 0, cf.length,
+ cq, 0, cq.length,
+ ts, Type.Put,
+ value, valueOffset, valueLength);
+ }
+
+ public static KeyValue newKeyValue(byte[] key, byte[] cf, byte[] cq, long ts, byte[] value) {
+ return newKeyValue(key,cf,cq,ts,value,0,value.length);
+ }
+
+ public static KeyValue newKeyValue(Result r, byte[] cf, byte[] cq, long ts, byte[] value) {
+ return newKeyValue(r,cf,cq,ts,value,0,value.length);
+ }
+
+ /**
+ * Binary search for latest column value without allocating memory in the process
+ * @param kvs
+ * @param searchTerm
+ */
+ public static KeyValue getColumnLatest(List<KeyValue>kvs, KeyValue searchTerm) {
+ if (kvs.size() == 0) {
+ return null;
+ }
+
+ // pos === ( -(insertion point) - 1)
+ int pos = Collections.binarySearch(kvs, searchTerm, KeyValue.COMPARATOR);
+ // never will exact match
+ if (pos < 0) {
+ pos = (pos+1) * -1;
+ // pos is now insertion point
+ }
+ if (pos == kvs.size()) {
+ return null; // doesn't exist
+ }
+
+ KeyValue kv = kvs.get(pos);
+ if (Bytes.compareTo(kv.getBuffer(), kv.getFamilyOffset(), kv.getFamilyLength(),
+ searchTerm.getBuffer(), searchTerm.getFamilyOffset(), searchTerm.getFamilyLength()) != 0) {
+ return null;
+ }
+ if (Bytes.compareTo(kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength(),
+ searchTerm.getBuffer(), searchTerm.getQualifierOffset(), searchTerm.getQualifierLength()) != 0) {
+ return null;
+ }
+ return kv;
+ }
+
+ /**
+ * Binary search for latest column value without allocating memory in the process
+ */
+ public static KeyValue getColumnLatest(List<KeyValue>kvs, byte[] family, byte[] qualifier) {
+ KeyValue kv = kvs.get(0);
+ return KeyValueUtil.getColumnLatest(kvs, kv.getBuffer(), kv.getRowOffset(), kv.getRowLength(), family, 0, family.length, qualifier, 0, qualifier.length);
+ }
+
+ /**
+ * Binary search for latest column value without allocating memory in the process
+ */
+ public static KeyValue getColumnLatest(List<KeyValue>kvs, byte[] row, int roffset, int rlength, byte[] family, int foffset, int flength, byte[] qualifier, int qoffset, int qlength) {
+ KeyValue searchTerm = KeyValue.createFirstOnRow(row, roffset, rlength, family, foffset, flength, qualifier, qoffset, qlength);
+ return getColumnLatest(kvs,searchTerm);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/util/MetaDataUtil.java b/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
new file mode 100644
index 0000000..b778b2a
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
@@ -0,0 +1,197 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import static org.apache.phoenix.util.SchemaUtil.getVarChars;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PDataType;
+
+
+public class MetaDataUtil {
+
+ public static boolean areClientAndServerCompatible(long version) {
+ // A server and client with the same major and minor version number must be compatible.
+ // So it's important that we roll the PHOENIX_MAJOR_VERSION or PHOENIX_MINOR_VERSION
+ // when we make an incompatible change.
+ return areClientAndServerCompatible(MetaDataUtil.decodePhoenixVersion(version), MetaDataProtocol.PHOENIX_MAJOR_VERSION, MetaDataProtocol.PHOENIX_MINOR_VERSION);
+ }
+
+ // For testing
+ static boolean areClientAndServerCompatible(int version, int pMajor, int pMinor) {
+ // A server and client with the same major and minor version number must be compatible.
+ // So it's important that we roll the PHOENIX_MAJOR_VERSION or PHOENIX_MINOR_VERSION
+ // when we make an incompatible change.
+ return MetaDataUtil.encodeMaxPatchVersion(pMajor, pMinor) >= version && MetaDataUtil.encodeMinPatchVersion(pMajor, pMinor) <= version;
+ }
+
+ // Given the encoded integer representing the phoenix version in the encoded version value.
+ // The second byte in int would be the major version, 3rd byte minor version, and 4th byte
+ // patch version.
+ public static int decodePhoenixVersion(long version) {
+ return (int) ((version << Byte.SIZE * 3) >>> Byte.SIZE * 4);
+ }
+
+ // TODO: generalize this to use two bytes to return a SQL error code instead
+ public static long encodeMutableIndexConfiguredProperly(long version, boolean isValid) {
+ if (!isValid) {
+ return version | 1;
+ }
+ return version;
+ }
+
+ public static boolean decodeMutableIndexConfiguredProperly(long version) {
+ return (version & 0xF) == 0;
+ }
+
+ // Given the encoded integer representing the client hbase version in the encoded version value.
+ // The second byte in int would be the major version, 3rd byte minor version, and 4th byte
+ // patch version.
+ public static int decodeHBaseVersion(long version) {
+ return (int) (version >>> Byte.SIZE * 5);
+ }
+
+ public static String decodeHBaseVersionAsString(int version) {
+ int major = (version >>> Byte.SIZE * 2) & 0xFF;
+ int minor = (version >>> Byte.SIZE * 1) & 0xFF;
+ int patch = version & 0xFF;
+ return major + "." + minor + "." + patch;
+ }
+
+ public static long encodeHBaseAndPhoenixVersions(String hbaseVersion) {
+ return (((long) encodeVersion(hbaseVersion)) << (Byte.SIZE * 5)) |
+ (((long) encodeVersion(MetaDataProtocol.PHOENIX_MAJOR_VERSION, MetaDataProtocol.PHOENIX_MINOR_VERSION,
+ MetaDataProtocol.PHOENIX_PATCH_NUMBER)) << (Byte.SIZE * 1));
+ }
+
+ // Encode a version string in the format of "major.minor.patch" into an integer.
+ public static int encodeVersion(String version) {
+ String[] versionParts = version.split("[-\\.]");
+ return encodeVersion(versionParts[0], versionParts.length > 1 ? versionParts[1] : null, versionParts.length > 2 ? versionParts[2] : null);
+ }
+
+ // Encode the major as 2nd byte in the int, minor as the first byte and patch as the last byte.
+ public static int encodeVersion(String major, String minor, String patch) {
+ return encodeVersion(major == null ? 0 : Integer.parseInt(major), minor == null ? 0 : Integer.parseInt(minor),
+ patch == null ? 0 : Integer.parseInt(patch));
+ }
+
+ public static int encodeVersion(int major, int minor, int patch) {
+ int version = 0;
+ version |= (major << Byte.SIZE * 2);
+ version |= (minor << Byte.SIZE);
+ version |= patch;
+ return version;
+ }
+
+ public static int encodeMaxPatchVersion(int major, int minor) {
+ int version = 0;
+ version |= (major << Byte.SIZE * 2);
+ version |= (minor << Byte.SIZE);
+ version |= 0xFF;
+ return version;
+ }
+
+ public static int encodeMinPatchVersion(int major, int minor) {
+ int version = 0;
+ version |= (major << Byte.SIZE * 2);
+ version |= (minor << Byte.SIZE);
+ return version;
+ }
+
+ public static void getSchemaAndTableName(List<Mutation> tableMetadata, byte[][] rowKeyMetaData) {
+ Mutation m = getTableHeaderRow(tableMetadata);
+ getVarChars(m.getRow(), 2, rowKeyMetaData);
+ }
+
+ public static byte[] getParentTableName(List<Mutation> tableMetadata) {
+ if (tableMetadata.size() == 1) {
+ return null;
+ }
+ byte[][] rowKeyMetaData = new byte[2][];
+ getSchemaAndTableName(tableMetadata, rowKeyMetaData);
+ byte[] tableName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
+ Mutation m = getParentTableHeaderRow(tableMetadata);
+ getVarChars(m.getRow(), 2, rowKeyMetaData);
+ if (Bytes.compareTo(tableName, rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX]) == 0) {
+ return null;
+ }
+ return rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
+ }
+
+ public static long getSequenceNumber(Mutation tableMutation) {
+ List<KeyValue> kvs = tableMutation.getFamilyMap().get(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES);
+ if (kvs != null) {
+ for (KeyValue kv : kvs) { // list is not ordered, so search. TODO: we could potentially assume the position
+ if (Bytes.compareTo(kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength(), PhoenixDatabaseMetaData.TABLE_SEQ_NUM_BYTES, 0, PhoenixDatabaseMetaData.TABLE_SEQ_NUM_BYTES.length) == 0) {
+ return PDataType.LONG.getCodec().decodeLong(kv.getBuffer(), kv.getValueOffset(), null);
+ }
+ }
+ }
+ throw new IllegalStateException();
+ }
+
+ public static long getSequenceNumber(List<Mutation> tableMetaData) {
+ return getSequenceNumber(getTableHeaderRow(tableMetaData));
+ }
+
+ public static long getParentSequenceNumber(List<Mutation> tableMetaData) {
+ return getSequenceNumber(getParentTableHeaderRow(tableMetaData));
+ }
+
+ public static Mutation getTableHeaderRow(List<Mutation> tableMetaData) {
+ return tableMetaData.get(0);
+ }
+
+ public static Mutation getParentTableHeaderRow(List<Mutation> tableMetaData) {
+ return tableMetaData.get(tableMetaData.size()-1);
+ }
+
+ public static long getClientTimeStamp(List<Mutation> tableMetadata) {
+ Mutation m = tableMetadata.get(0);
+ return getClientTimeStamp(m);
+ }
+
+ public static long getClientTimeStamp(Mutation m) {
+ Collection<List<KeyValue>> kvs = m.getFamilyMap().values();
+ // Empty if Mutation is a Delete
+ // TODO: confirm that Delete timestamp is reset like Put
+ return kvs.isEmpty() ? m.getTimeStamp() : kvs.iterator().next().get(0).getTimestamp();
+ }
+
+ public static byte[] getParentLinkKey(String schemaName, String tableName, String indexName) {
+ return ByteUtil.concat(schemaName == null ? ByteUtil.EMPTY_BYTE_ARRAY : Bytes.toBytes(schemaName), QueryConstants.SEPARATOR_BYTE_ARRAY, Bytes.toBytes(tableName), QueryConstants.SEPARATOR_BYTE_ARRAY, QueryConstants.SEPARATOR_BYTE_ARRAY, Bytes.toBytes(indexName));
+ }
+
+ public static byte[] getParentLinkKey(byte[] schemaName, byte[] tableName, byte[] indexName) {
+ return ByteUtil.concat(schemaName == null ? ByteUtil.EMPTY_BYTE_ARRAY : schemaName, QueryConstants.SEPARATOR_BYTE_ARRAY, tableName, QueryConstants.SEPARATOR_BYTE_ARRAY, QueryConstants.SEPARATOR_BYTE_ARRAY, indexName);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/util/NumberUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/util/NumberUtil.java b/src/main/java/org/apache/phoenix/util/NumberUtil.java
new file mode 100644
index 0000000..ece1104
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/util/NumberUtil.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.math.BigDecimal;
+
+import org.apache.phoenix.schema.PDataType;
+
+/**
+ * Utility methods for numbers like decimal, long, etc.
+ *
+ * @author elevine
+ * @since 0.1
+ */
+public class NumberUtil {
+
+ public static final String DEFAULT_NUMBER_FORMAT = "#,##0.###";
+
+ /**
+ * Strip all trailing zeros to ensure that no digit will be zero and
+ * round using our default context to ensure precision doesn't exceed max allowed.
+ * @return new {@link BigDecimal} instance
+ */
+ public static BigDecimal normalize(BigDecimal bigDecimal) {
+ return bigDecimal.stripTrailingZeros().round(PDataType.DEFAULT_MATH_CONTEXT);
+ }
+
+ public static BigDecimal setDecimalWidthAndScale(BigDecimal decimal, int precision, int scale) {
+ // If we could not fit all the digits before decimal point into the new desired precision and
+ // scale, return null and the caller method should handle the error.
+ if (((precision - scale) < (decimal.precision() - decimal.scale()))){
+ return null;
+ }
+ decimal = decimal.setScale(scale, BigDecimal.ROUND_DOWN);
+ return decimal;
+ }
+}