You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/01/27 20:23:08 UTC

[07/51] [partial] Initial commit

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
new file mode 100644
index 0000000..b81af60
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
@@ -0,0 +1,320 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.Reader;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.util.Pair;
+
+import com.google.common.collect.Lists;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+
+/**
+ * 
+ * Collection of non JDBC compliant utility methods
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class PhoenixRuntime {
+    /**
+     * Use this connection property to control HBase timestamps
+     * by specifying your own long timestamp value at connection time. All
+     * queries will use this as the upper bound of the time range for scans
+     * and DDL, and DML will use this as t he timestamp for key values.
+     */
+    public static final String CURRENT_SCN_ATTRIB = "CurrentSCN";
+
+    /**
+     * Root for the JDBC URL that the Phoenix accepts accepts.
+     */
+    public final static String JDBC_PROTOCOL = "jdbc:phoenix";
+    public final static char JDBC_PROTOCOL_TERMINATOR = ';';
+    public final static char JDBC_PROTOCOL_SEPARATOR = ':';
+    
+    @Deprecated
+    public final static String EMBEDDED_JDBC_PROTOCOL = PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+    
+    /**
+     * Use this connection property to control the number of rows that are
+     * batched together on an UPSERT INTO table1... SELECT ... FROM table2.
+     * It's only used when autoCommit is true and your source table is
+     * different than your target table or your SELECT statement has a 
+     * GROUP BY clause.
+     */
+    public final static String UPSERT_BATCH_SIZE_ATTRIB = "UpsertBatchSize";
+    
+    /**
+     * Use this connection property to help with fairness of resource allocation
+     * for the client and server. The value of the attribute determines the
+     * bucket used to rollup resource usage for a particular tenant/organization. Each tenant
+     * may only use a percentage of total resources, governed by the {@link org.apache.phoenix.query.QueryServices}
+     * configuration properties
+     */
+    public static final String TENANT_ID_ATTRIB = "TenantId";
+
+    /**
+     * Use this as the zookeeper quorum name to have a connection-less connection. This enables
+     * Phoenix-compatible HFiles to be created in a map/reduce job by creating tables,
+     * upserting data into them, and getting the uncommitted state through {@link #getUncommittedData(Connection)}
+     */
+    public final static String CONNECTIONLESS = "none";
+    
+    private static final String UPGRADE_OPTION = "-u";
+    private static final String TABLE_OPTION = "-t";
+    private static final String HEADER_OPTION = "-h";
+    private static final String STRICT_OPTION = "-s";
+    private static final String CSV_OPTION = "-d";
+    private static final String HEADER_IN_LINE = "in-line";
+    private static final String SQL_FILE_EXT = ".sql";
+    private static final String CSV_FILE_EXT = ".csv";
+    
+    private static void usageError() {
+        System.err.println("Usage: psql [-t table-name] [-h comma-separated-column-names | in-line] [-d field-delimiter-char quote-char escape-char]<zookeeper>  <path-to-sql-or-csv-file>...\n" +
+                "  By default, the name of the CSV file is used to determine the Phoenix table into which the CSV data is loaded\n" +
+                "  and the ordinal value of the columns determines the mapping.\n" +
+                "  -t overrides the table into which the CSV data is loaded\n" +
+                "  -h overrides the column names to which the CSV data maps\n" +
+                "     A special value of in-line indicating that the first line of the CSV file\n" +
+                "     determines the column to which the data maps.\n" +
+                "  -s uses strict mode by throwing an exception if a column name doesn't match during CSV loading.\n" +
+                "  -d uses custom delimiters for CSV loader, need to specify single char for field delimiter, phrase delimiter, and escape char.\n" +
+                "     number is NOT usually a delimiter and shall be taken as 1 -> ctrl A, 2 -> ctrl B ... 9 -> ctrl I. \n" +
+                "Examples:\n" +
+                "  psql localhost my_ddl.sql\n" +
+                "  psql localhost my_ddl.sql my_table.csv\n" +
+                "  psql -t my_table my_cluster:1825 my_table2012-Q3.csv\n" +
+                "  psql -t my_table -h col1,col2,col3 my_cluster:1825 my_table2012-Q3.csv\n" +
+                "  psql -t my_table -h col1,col2,col3 -d 1 2 3 my_cluster:1825 my_table2012-Q3.csv\n"
+        );
+        System.exit(-1);
+    }
+    /**
+     * Provides a mechanism to run SQL scripts against, where the arguments are:
+     * 1) connection URL string
+     * 2) one or more paths to either SQL scripts or CSV files
+     * If a CurrentSCN property is set on the connection URL, then it is incremented
+     * between processing, with each file being processed by a new connection at the
+     * increment timestamp value.
+     */
+    public static void main(String [] args) {
+        if (args.length < 2) {
+            usageError();
+        }
+        PhoenixConnection conn = null;
+        try {
+            String tableName = null;
+            List<String> columns = null;
+            boolean isStrict = false;
+            boolean isUpgrade = false;
+            List<String> delimiter = new ArrayList<String>();
+
+            int i = 0;
+            for (; i < args.length; i++) {
+                if (TABLE_OPTION.equals(args[i])) {
+                    if (++i == args.length || tableName != null) {
+                        usageError();
+                    }
+                    tableName = args[i];
+                } else if (HEADER_OPTION.equals(args[i])) {
+                    if (++i >= args.length || columns != null) {
+                        usageError();
+                    }
+                    String header = args[i];
+                    if (HEADER_IN_LINE.equals(header)) {
+                        columns = Collections.emptyList();
+                    } else {
+                        columns = Lists.newArrayList();
+                        StringTokenizer tokenizer = new StringTokenizer(header,",");
+                        while(tokenizer.hasMoreTokens()) {
+                            columns.add(tokenizer.nextToken());
+                        }
+                    }
+                } else if (STRICT_OPTION.equals(args[i])) {
+                    isStrict = true;
+                } else if (UPGRADE_OPTION.equals(args[i])) {
+                    isUpgrade = true;
+                } else if (CSV_OPTION.equals(args[i])) {
+                    for(int j=0; j < 3; j++) {
+                        if(args[++i].length()==1){
+                            delimiter.add(args[i]);
+                        } else {
+                            usageError();
+                        }
+                    }
+                } else {
+                    break;
+                }
+            }
+            if (i == args.length) {
+                usageError();
+            }
+            
+            Properties props = new Properties();
+            if (isUpgrade) {
+                props.setProperty(SchemaUtil.UPGRADE_TO_2_0, Integer.toString(SchemaUtil.SYSTEM_TABLE_NULLABLE_VAR_LENGTH_COLUMNS));
+            }
+            String connectionUrl = JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + args[i++];
+            conn = DriverManager.getConnection(connectionUrl, props).unwrap(PhoenixConnection.class);
+            
+            if (SchemaUtil.upgradeColumnCount(connectionUrl, props) > 0) {
+                SchemaUtil.upgradeTo2(conn);
+                return;
+            }
+            
+            for (; i < args.length; i++) {
+                String fileName = args[i];
+                if (fileName.endsWith(SQL_FILE_EXT)) {
+               		PhoenixRuntime.executeStatements(conn, new FileReader(args[i]), Collections.emptyList());
+                } else if (fileName.endsWith(CSV_FILE_EXT)) {
+                    if (tableName == null) {
+                        tableName = fileName.substring(fileName.lastIndexOf(File.separatorChar) + 1, fileName.length()-CSV_FILE_EXT.length());
+                    }
+                    CSVLoader csvLoader = new CSVLoader(conn, tableName, columns, isStrict, delimiter);
+                    csvLoader.upsert(fileName);
+                } else {
+                    usageError();
+                }
+                Long scn = conn.getSCN();
+                // If specifying SCN, increment it between processing files to allow
+                // for later files to see earlier files tables.
+                if (scn != null) {
+                    scn++;
+                    props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, scn.toString());
+                    conn.close();
+                    conn = DriverManager.getConnection(connectionUrl, props).unwrap(PhoenixConnection.class);
+                }
+            }
+        } catch (Throwable t) {
+            t.printStackTrace();
+        } finally {
+            if(conn != null) {
+                try {
+                    conn.close();
+                } catch (SQLException e) {
+                    //going to shut jvm down anyway. So might as well feast on it.
+                }
+            }
+            System.exit(0);
+        }
+    }
+
+    private PhoenixRuntime() {
+    }
+    
+    /**
+     * Runs a series of semicolon-terminated SQL statements using the connection provided, returning
+     * the number of SQL statements executed. Note that if the connection has specified an SCN through
+     * the {@link org.apache.phoenix.util.PhoenixRuntime#CURRENT_SCN_ATTRIB} connection property, then the timestamp
+     * is bumped up by one after each statement execution.
+     * @param conn an open JDBC connection
+     * @param reader a reader for semicolumn separated SQL statements
+     * @param binds the binds for all statements
+     * @return the number of SQL statements that were executed
+     * @throws IOException
+     * @throws SQLException
+     */
+    public static int executeStatements(Connection conn, Reader reader, List<Object> binds) throws IOException,SQLException {
+        PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
+        // Turn auto commit to true when running scripts in case there's DML
+        pconn.setAutoCommit(true);
+        return pconn.executeStatements(reader, binds, System.out);
+    }
+    
+    /**
+     * Get the list of uncommitted KeyValues for the connection. Currently used to write an
+     * Phoenix-compliant HFile from a map/reduce job.
+     * @param conn an open JDBC connection
+     * @return the list of HBase mutations for uncommitted data
+     * @throws SQLException 
+     */
+    @Deprecated
+    public static List<KeyValue> getUncommittedData(Connection conn) throws SQLException {
+        Iterator<Pair<byte[],List<KeyValue>>> iterator = getUncommittedDataIterator(conn);
+        if (iterator.hasNext()) {
+            return iterator.next().getSecond();
+        }
+        return Collections.emptyList();
+    }
+    
+    /**
+     * Get the list of uncommitted KeyValues for the connection. Currently used to write an
+     * Phoenix-compliant HFile from a map/reduce job.
+     * @param conn an open JDBC connection
+     * @return the list of HBase mutations for uncommitted data
+     * @throws SQLException 
+     */
+    public static Iterator<Pair<byte[],List<KeyValue>>> getUncommittedDataIterator(Connection conn) throws SQLException {
+        return getUncommittedDataIterator(conn, false);
+    }
+    
+    /**
+     * Get the list of uncommitted KeyValues for the connection. Currently used to write an
+     * Phoenix-compliant HFile from a map/reduce job.
+     * @param conn an open JDBC connection
+     * @return the list of HBase mutations for uncommitted data
+     * @throws SQLException 
+     */
+    public static Iterator<Pair<byte[],List<KeyValue>>> getUncommittedDataIterator(Connection conn, boolean includeMutableIndexes) throws SQLException {
+        final Iterator<Pair<byte[],List<Mutation>>> iterator = conn.unwrap(PhoenixConnection.class).getMutationState().toMutations(includeMutableIndexes);
+        return new Iterator<Pair<byte[],List<KeyValue>>>() {
+
+            @Override
+            public boolean hasNext() {
+                return iterator.hasNext();
+            }
+
+            @Override
+            public Pair<byte[], List<KeyValue>> next() {
+                Pair<byte[],List<Mutation>> pair = iterator.next();
+                List<KeyValue> keyValues = Lists.newArrayListWithExpectedSize(pair.getSecond().size() * 5); // Guess-timate 5 key values per row
+                for (Mutation mutation : pair.getSecond()) {
+                    for (List<KeyValue> keyValueList : mutation.getFamilyMap().values()) {
+                        for (KeyValue keyValue : keyValueList) {
+                            keyValues.add(keyValue);
+                        }
+                    }
+                }
+                Collections.sort(keyValues, KeyValue.COMPARATOR);
+                return new Pair<byte[], List<KeyValue>>(pair.getFirst(),keyValues);
+            }
+
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException();
+            }
+            
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/util/QueryUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/util/QueryUtil.java b/src/main/java/org/apache/phoenix/util/QueryUtil.java
new file mode 100644
index 0000000..c932a16
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/util/QueryUtil.java
@@ -0,0 +1,68 @@
+package org.apache.phoenix.util;
+
+import java.sql.*;
+
+public class QueryUtil {
+	
+	/**
+	 *  Column name index within ResultSet resulting from {@link DatabaseMetaData#getColumns(String, String, String, String)}
+	 */
+	public static final int COLUMN_NAME_POSITION = 4;
+	/**
+	 * Data type index within ResultSet resulting from {@link DatabaseMetaData#getColumns(String, String, String, String)}
+	 */
+	public static final int DATA_TYPE_POSITION = 5;
+
+	/**
+	 * Generates the upsert statement based on number of ColumnInfo. If
+	 * ColumnInfo is unavailable, it produces a generic UPSERT query without
+	 * columns information using number of columns.
+	 * 
+	 * @return Upsert Statement
+	 */
+	public static String constructUpsertStatement(ColumnInfo[] columnTypes,
+			String tableName, int numColumns) {
+		if(numColumns <= 0) {
+			throw new RuntimeException("Number of columns in HBase table cannot be less than 1");
+		}
+		StringBuilder sb = new StringBuilder();
+		sb.append("UPSERT INTO ");
+		sb.append(tableName);
+		if (columnTypes != null) {
+			sb.append("(");
+			for (ColumnInfo columnType : columnTypes) {
+				if (columnType != null) {
+					sb.append(columnType.getColumnName());
+					sb.append(",");
+				}
+			}
+			// Remove the trailing comma
+			sb.setLength(sb.length() - 1);
+			sb.append(") ");
+		}
+		sb.append("\n");
+		sb.append("VALUES (");
+		for (short i = 0; i < numColumns - 1; i++) {
+			sb.append("?,");
+		}
+		sb.append("?)");
+
+		return sb.toString();
+	}
+
+	public static String getUrl(String server) {
+		return PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + server;
+	}
+
+    public static String getExplainPlan(ResultSet rs) throws SQLException {
+        StringBuilder buf = new StringBuilder();
+        while (rs.next()) {
+            buf.append(rs.getString(1));
+            buf.append('\n');
+        }
+        if (buf.length() > 0) {
+            buf.setLength(buf.length()-1);
+        }
+        return buf.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/util/ReadOnlyProps.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/util/ReadOnlyProps.java b/src/main/java/org/apache/phoenix/util/ReadOnlyProps.java
new file mode 100644
index 0000000..6fcbc3d
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/util/ReadOnlyProps.java
@@ -0,0 +1,239 @@
+package org.apache.phoenix.util;
+
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.google.common.collect.*;
+
+/**
+ * 
+ * Read-only properties that avoids unnecessary synchronization in
+ * java.util.Properties.
+ *
+ * @author jtaylor
+ * @since 1.2.2
+ */
+public class ReadOnlyProps implements Iterable<Entry<String, String>> {
+    public static final ReadOnlyProps EMPTY_PROPS = new ReadOnlyProps(Iterators.<Entry<String, String>>emptyIterator());
+    private final Map<String, String> props;
+    
+    public ReadOnlyProps(Iterator<Entry<String, String>> iterator) {
+        Map<String, String> map = Maps.newHashMap();
+        while (iterator.hasNext()) {
+            Entry<String,String> entry = iterator.next();
+            map.put(entry.getKey(), entry.getValue());
+        }
+        this.props = ImmutableMap.copyOf(map);
+    }
+
+    public ReadOnlyProps(Map<String, String> props) {
+        this.props = ImmutableMap.copyOf(props);
+    }
+
+    private static Pattern varPat = Pattern.compile("\\$\\{[^\\}\\$\u0020]+\\}");
+    private static int MAX_SUBST = 20;
+
+    private String substituteVars(String expr) {
+        if (expr == null) {
+          return null;
+        }
+        Matcher match = varPat.matcher("");
+        String eval = expr;
+        for(int s=0; s<MAX_SUBST; s++) {
+          match.reset(eval);
+          if (!match.find()) {
+            return eval;
+          }
+          String var = match.group();
+          var = var.substring(2, var.length()-1); // remove ${ .. }
+          String val = null;
+          try {
+            val = System.getProperty(var);
+          } catch(SecurityException se) {
+          }
+          if (val == null) {
+            val = getRaw(var);
+          }
+          if (val == null) {
+            return eval; // return literal ${var}: var is unbound
+          }
+          // substitute
+          eval = eval.substring(0, match.start())+val+eval.substring(match.end());
+        }
+        throw new IllegalStateException("Variable substitution depth too large: " 
+                                        + MAX_SUBST + " " + expr);
+      }
+      
+    /**
+     * Get the value of the <code>name</code> property, without doing
+     * <a href="#VariableExpansion">variable expansion</a>.
+     * 
+     * @param name the property name.
+     * @return the value of the <code>name</code> property, 
+     *         or null if no such property exists.
+     */
+    public String getRaw(String name) {
+      return props.get(name);
+    }
+
+    public String getRaw(String name, String defaultValue) {
+        String value = getRaw(name);
+        if (value == null) {
+            return defaultValue;
+        }
+        return value;
+      }
+
+    /** 
+     * Get the value of the <code>name</code> property. If no such property 
+     * exists, then <code>defaultValue</code> is returned.
+     * 
+     * @param name property name.
+     * @param defaultValue default value.
+     * @return property value, or <code>defaultValue</code> if the property 
+     *         doesn't exist.                    
+     */
+    public String get(String name, String defaultValue) {
+      return substituteVars(getRaw(name, defaultValue));
+    }
+      
+    /**
+     * Get the value of the <code>name</code> property, <code>null</code> if
+     * no such property exists.
+     * 
+     * Values are processed for <a href="#VariableExpansion">variable expansion</a> 
+     * before being returned. 
+     * 
+     * @param name the property name.
+     * @return the value of the <code>name</code> property, 
+     *         or null if no such property exists.
+     */
+    public String get(String name) {
+      return substituteVars(getRaw(name));
+    }
+
+    private String getHexDigits(String value) {
+        boolean negative = false;
+        String str = value;
+        String hexString = null;
+        if (value.startsWith("-")) {
+          negative = true;
+          str = value.substring(1);
+        }
+        if (str.startsWith("0x") || str.startsWith("0X")) {
+          hexString = str.substring(2);
+          if (negative) {
+            hexString = "-" + hexString;
+          }
+          return hexString;
+        }
+        return null;
+      }
+      
+    /** 
+     * Get the value of the <code>name</code> property as a <code>boolean</code>.  
+     * If no such property is specified, or if the specified value is not a valid
+     * <code>boolean</code>, then <code>defaultValue</code> is returned.
+     * 
+     * @param name property name.
+     * @param defaultValue default value.
+     * @return property value as a <code>boolean</code>, 
+     *         or <code>defaultValue</code>. 
+     */
+    public boolean getBoolean(String name, boolean defaultValue) {
+      String valueString = get(name);
+      if ("true".equals(valueString))
+        return true;
+      else if ("false".equals(valueString))
+        return false;
+      else return defaultValue;
+    }
+
+    /** 
+     * Get the value of the <code>name</code> property as an <code>int</code>.
+     *   
+     * If no such property exists, or if the specified value is not a valid
+     * <code>int</code>, then <code>defaultValue</code> is returned.
+     * 
+     * @param name property name.
+     * @param defaultValue default value.
+     * @return property value as an <code>int</code>, 
+     *         or <code>defaultValue</code>. 
+     */
+    public int getInt(String name, int defaultValue) {
+      String valueString = get(name);
+      if (valueString == null)
+        return defaultValue;
+      try {
+        String hexString = getHexDigits(valueString);
+        if (hexString != null) {
+          return Integer.parseInt(hexString, 16);
+        }
+        return Integer.parseInt(valueString);
+      } catch (NumberFormatException e) {
+        return defaultValue;
+      }
+    }
+    
+    /** 
+     * Get the value of the <code>name</code> property as a <code>long</code>.  
+     * If no such property is specified, or if the specified value is not a valid
+     * <code>long</code>, then <code>defaultValue</code> is returned.
+     * 
+     * @param name property name.
+     * @param defaultValue default value.
+     * @return property value as a <code>long</code>, 
+     *         or <code>defaultValue</code>. 
+     */
+    public long getLong(String name, long defaultValue) {
+      String valueString = get(name);
+      if (valueString == null)
+        return defaultValue;
+      try {
+        String hexString = getHexDigits(valueString);
+        if (hexString != null) {
+          return Long.parseLong(hexString, 16);
+        }
+        return Long.parseLong(valueString);
+      } catch (NumberFormatException e) {
+        return defaultValue;
+      }
+    }
+
+    /** 
+     * Get the value of the <code>name</code> property as a <code>float</code>.  
+     * If no such property is specified, or if the specified value is not a valid
+     * <code>float</code>, then <code>defaultValue</code> is returned.
+     * 
+     * @param name property name.
+     * @param defaultValue default value.
+     * @return property value as a <code>float</code>, 
+     *         or <code>defaultValue</code>. 
+     */
+    public float getFloat(String name, float defaultValue) {
+      String valueString = get(name);
+      if (valueString == null)
+        return defaultValue;
+      try {
+        return Float.parseFloat(valueString);
+      } catch (NumberFormatException e) {
+        return defaultValue;
+      }
+    }
+
+    /**
+     * Get the properties as a <code>Map<String,String></code>
+     * 
+     * @return Map<String,String> 
+     */
+    public Map<String,String> asMap() {
+        return props;
+    }
+    
+    @Override
+    public Iterator<Entry<String, String>> iterator() {
+        return props.entrySet().iterator();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/util/ResultUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/util/ResultUtil.java b/src/main/java/org/apache/phoenix/util/ResultUtil.java
new file mode 100644
index 0000000..f9f2f5a
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/util/ResultUtil.java
@@ -0,0 +1,172 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.util.Arrays;
+import java.util.Comparator;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Static class for various methods that would be nice to have added to {@link org.apache.hadoop.hbase.client.Result}.
+ * These methods work off of the raw bytes preventing the explosion of Result into object form.
+ * 
+ * @author jtaylor
+ * @since 0.1
+ */
+public class ResultUtil {
+    private ResultUtil() {
+    }
+    
+    /**
+     * Return a pointer into a potentially much bigger byte buffer that points to the key of a Result.
+     * @param r
+     */
+    public static ImmutableBytesWritable getKey(Result r) {
+        return getKey(r, 0);
+    }
+    
+    public static void getKey(Result r, ImmutableBytesWritable key) {
+        key.set(r.getRow());
+        //key.set(getRawBytes(r), getKeyOffset(r), getKeyLength(r));
+    }
+    
+    public static void getKey(KeyValue value, ImmutableBytesWritable key) {
+        key.set(value.getBuffer(), value.getRowOffset(), value.getRowLength());
+    }
+    
+    /**
+     * Return a pointer into a potentially much bigger byte buffer that points to the key of a Result.
+     * Use offset to return a subset of the key bytes, for example to skip the organization ID embedded
+     * in all of our keys.
+     * @param r
+     * @param offset offset added to start of key and subtracted from key length (to select subset of key bytes)
+     */
+    public static ImmutableBytesWritable getKey(Result r, int offset) {
+        return new ImmutableBytesWritable(getRawBytes(r), getKeyOffset(r) + offset, getKeyLength(r) - offset);
+    }
+
+    public static void getKey(Result r, int offset, int length, ImmutableBytesWritable key) {
+        key.set(getRawBytes(r), getKeyOffset(r) + offset, length);
+    }
+
+    /**
+     * Comparator for comparing the keys from two Results in-place, without allocating new byte arrays
+     */
+    public static final Comparator<Result> KEY_COMPARATOR = new Comparator<Result>() {
+
+        @Override
+        public int compare(Result r1, Result r2) {
+            byte[] r1Bytes = getRawBytes(r1);
+            byte[] r2Bytes = getRawBytes(r2);
+            return Bytes.compareTo(r1Bytes, getKeyOffset(r1), getKeyLength(r1), r2Bytes, getKeyOffset(r2), getKeyLength(r2));
+        }
+        
+    };
+    
+    /**
+     * Get the offset into the Result byte array to the key.
+     * @param r
+     * @return
+     */
+    static int getKeyOffset(Result r) {
+        // Special case for when Result was instantiated via KeyValue array (no bytes in that case) versus returned from a scanner
+        return (r.getBytes() == null ? r.raw()[0].getOffset() : (r.getBytes().getOffset() + Bytes.SIZEOF_INT /* KV length in Result */)) + KeyValue.ROW_OFFSET /* key offset in KV */ + Bytes.SIZEOF_SHORT /* key length */;
+    }
+    
+    static int getKeyLength(Result r) {
+        // Key length stored right before key as a short
+        return Bytes.toShort(getRawBytes(r), getKeyOffset(r) - Bytes.SIZEOF_SHORT);
+    }
+    
+    static byte[] getRawBytes(Result r) {
+        // Handle special case for when Result was instantiated via KeyValue array (no bytes in that case) versus returned from a scanner
+        ImmutableBytesWritable rPtr = r.getBytes();
+        if (rPtr != null)
+            return rPtr.get();
+        return r.raw()[0].getBuffer();
+    }
+
+    public static int compareKeys(Result r1, Result r2) {
+        return Bytes.compareTo(getRawBytes(r1), getKeyOffset(r1), getKeyLength(r1), getRawBytes(r2), getKeyOffset(r2), getKeyLength(r2));
+    }
+
+    /**
+     * Binary search for latest column value without allocating memory in the process
+     */
+    public static KeyValue getColumnLatest(Result r, byte[] family, byte[] qualifier) {
+        byte[] rbytes = getRawBytes(r);
+        int roffset = getKeyOffset(r);
+        int rlength = getKeyLength(r);
+        return getColumnLatest(r, rbytes, roffset, rlength, family, 0, family.length, qualifier, 0, qualifier.length);
+    }
+
+    public static KeyValue getSearchTerm(Result r, byte[] family, byte[] qualifier) {
+        byte[] rbytes = getRawBytes(r);
+        int roffset = getKeyOffset(r);
+        int rlength = getKeyLength(r);
+        return KeyValue.createFirstOnRow(rbytes, roffset, rlength, family, 0, family.length, qualifier, 0, qualifier.length);
+    }
+    /**
+     * Binary search for latest column value without allocating memory in the process
+     */
+    public static KeyValue getColumnLatest(Result r, byte[] row, int roffset, int rlength, byte[] family, int foffset, int flength, byte[] qualifier, int qoffset, int qlength) {
+        KeyValue searchTerm = KeyValue.createFirstOnRow(row, roffset, rlength, family, foffset, flength, qualifier, qoffset, qlength);
+        return getColumnLatest(r,searchTerm);
+        
+    }
+
+     /**
+     * Binary search for latest column value without allocating memory in the process
+     * @param r
+     * @param searchTerm
+     */
+    public static KeyValue getColumnLatest(Result r, KeyValue searchTerm) {
+        KeyValue [] kvs = r.raw(); // side effect possibly.
+        if (kvs == null || kvs.length == 0) {
+          return null;
+        }
+        
+        // pos === ( -(insertion point) - 1)
+        int pos = Arrays.binarySearch(kvs, searchTerm, KeyValue.COMPARATOR);
+        // never will exact match
+        if (pos < 0) {
+          pos = (pos+1) * -1;
+          // pos is now insertion point
+        }
+        if (pos == kvs.length) {
+          return null; // doesn't exist
+        }
+
+        KeyValue kv = kvs[pos];
+        if (Bytes.compareTo(kv.getBuffer(), kv.getFamilyOffset(), kv.getFamilyLength(),
+                searchTerm.getBuffer(), searchTerm.getFamilyOffset(), searchTerm.getFamilyLength()) != 0) {
+            return null;
+        }
+        if (Bytes.compareTo(kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength(),
+                searchTerm.getBuffer(), searchTerm.getQualifierOffset(), searchTerm.getQualifierLength()) != 0) {
+            return null;
+        }
+        return kv;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/util/SQLCloseable.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/util/SQLCloseable.java b/src/main/java/org/apache/phoenix/util/SQLCloseable.java
new file mode 100644
index 0000000..11cf02f
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/util/SQLCloseable.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.sql.SQLException;
+
+/**
+ * 
+ * Interface for a SQL resource that should be closed
+ * after it is no longer in use.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public interface SQLCloseable {
+    void close() throws SQLException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/util/SQLCloseables.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/util/SQLCloseables.java b/src/main/java/org/apache/phoenix/util/SQLCloseables.java
new file mode 100644
index 0000000..c255c1d
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/util/SQLCloseables.java
@@ -0,0 +1,133 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.sql.SQLException;
+import java.util.*;
+
+import com.google.common.collect.Iterables;
+
+
+/**
+ * Utilities for operating on {@link SQLCloseable}s.
+ * 
+ * @author jtaylor
+ * @since 0.1
+ */
+public class SQLCloseables {
+    /** Not constructed */
+    private SQLCloseables() { }
+    
+    /**
+     * Allows you to close as many of the {@link SQLCloseable}s as possible.
+     * 
+     * If any of the close's fail with an IOException, those exception(s) will
+     * be thrown after attempting to close all of the inputs.
+     */
+    public static void closeAll(Iterable<? extends SQLCloseable> iterable) throws SQLException {
+        SQLException ex = closeAllQuietly(iterable);
+        if (ex != null) throw ex;
+    }
+ 
+    public static SQLException closeAllQuietly(Iterable<? extends SQLCloseable> iterable) {
+        if (iterable == null) return null;
+        
+        LinkedList<SQLException> exceptions = null;
+        for (SQLCloseable closeable : iterable) {
+            try {
+                closeable.close();
+            } catch (SQLException x) {
+                if (exceptions == null) exceptions = new LinkedList<SQLException>();
+                exceptions.add(x);
+            }
+        }
+        
+        SQLException ex = MultipleCausesSQLException.fromSQLExceptions(exceptions);
+        return ex;
+    }
+
+    /**
+     * A subclass of {@link SQLException} that allows you to chain multiple 
+     * causes together.
+     * 
+     * @author jtaylor
+     * @since 0.1
+     * @see SQLCloseables
+     */
+    static private class MultipleCausesSQLException extends SQLException {
+		private static final long serialVersionUID = 1L;
+
+		static SQLException fromSQLExceptions(Collection<? extends SQLException> exceptions) {
+            if (exceptions == null || exceptions.isEmpty()) return null;
+            if (exceptions.size() == 1) return Iterables.getOnlyElement(exceptions);
+            
+            return new MultipleCausesSQLException(exceptions);
+        }
+        
+        private final Collection<? extends SQLException> exceptions;
+        private boolean hasSetStackTrace;
+        
+        /**
+         * Use the {@link #fromIOExceptions(Collection) factory}.
+         */
+        private MultipleCausesSQLException(Collection<? extends SQLException> exceptions) {
+            this.exceptions = exceptions;
+        }
+
+        @Override
+        public String getMessage() {
+            StringBuilder sb = new StringBuilder(this.exceptions.size() * 50);
+            int exceptionNum = 0;
+            for (SQLException ex : this.exceptions) {
+                sb.append("Cause Number " + exceptionNum + ": " + ex.getMessage() + "\n");
+                exceptionNum++;
+            }
+            return sb.toString();
+        }
+        
+        @Override
+        public StackTraceElement[] getStackTrace() {
+            if (!this.hasSetStackTrace) {
+                ArrayList<StackTraceElement> frames = new ArrayList<StackTraceElement>(this.exceptions.size() * 20);
+                
+                int exceptionNum = 0;
+                for (SQLException exception : this.exceptions) {
+                    StackTraceElement header = new StackTraceElement(MultipleCausesSQLException.class.getName(), 
+                            "Exception Number " + exceptionNum, 
+                            "<no file>",
+                            0);
+                    
+                    frames.add(header);
+                    for (StackTraceElement ste : exception.getStackTrace()) {
+                        frames.add(ste);
+                    }
+                    exceptionNum++;
+                }
+                
+                setStackTrace(frames.toArray(new StackTraceElement[frames.size()]));
+                this.hasSetStackTrace = true;
+            }        
+            
+            return super.getStackTrace();
+        }
+
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/util/ScanUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/util/ScanUtil.java b/src/main/java/org/apache/phoenix/util/ScanUtil.java
new file mode 100644
index 0000000..05337b7
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -0,0 +1,425 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.common.collect.Lists;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.filter.SkipScanFilter;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.KeyRange.Bound;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.RowKeySchema;
+
+
+/**
+ * 
+ * Various utilities for scans
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class ScanUtil {
+
+    private ScanUtil() {
+    }
+
+    public static void setTenantId(Scan scan, byte[] tenantId) {
+        scan.setAttribute(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+    }
+
+    // Use getTenantId and pass in column name to match against
+    // in as PSchema attribute. If column name matches in 
+    // KeyExpressions, set on scan as attribute
+    public static ImmutableBytesWritable getTenantId(Scan scan) {
+        // Create Scan with special aggregation column over which to aggregate
+        byte[] tenantId = scan.getAttribute(PhoenixRuntime.TENANT_ID_ATTRIB);
+        if (tenantId == null) {
+            return null;
+        }
+        return new ImmutableBytesWritable(tenantId);
+    }
+
+    public static Scan newScan(Scan scan) {
+        try {
+            Scan newScan = new Scan(scan);
+            // Clone the underlying family map instead of sharing it between
+            // the existing and cloned Scan (which is the retarded default
+            // behavior).
+            TreeMap<byte [], NavigableSet<byte []>> existingMap = (TreeMap<byte[], NavigableSet<byte[]>>)scan.getFamilyMap();
+            Map<byte [], NavigableSet<byte []>> clonedMap = new TreeMap<byte [], NavigableSet<byte []>>(existingMap);
+            newScan.setFamilyMap(clonedMap);
+            return newScan;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+    /**
+     * Intersects the scan start/stop row with the startKey and stopKey
+     * @param scan
+     * @param startKey
+     * @param stopKey
+     * @return false if the Scan cannot possibly return rows and true otherwise
+     */
+    public static boolean intersectScanRange(Scan scan, byte[] startKey, byte[] stopKey) {
+        return intersectScanRange(scan, startKey, stopKey, false);
+    }
+
+    public static boolean intersectScanRange(Scan scan, byte[] startKey, byte[] stopKey, boolean useSkipScan) {
+        boolean mayHaveRows = false;
+        byte[] existingStartKey = scan.getStartRow();
+        byte[] existingStopKey = scan.getStopRow();
+        if (existingStartKey.length > 0) {
+            if (startKey.length == 0 || Bytes.compareTo(existingStartKey, startKey) > 0) {
+                startKey = existingStartKey;
+            }
+        } else {
+            mayHaveRows = true;
+        }
+        if (existingStopKey.length > 0) {
+            if (stopKey.length == 0 || Bytes.compareTo(existingStopKey, stopKey) < 0) {
+                stopKey = existingStopKey;
+            }
+        } else {
+            mayHaveRows = true;
+        }
+        scan.setStartRow(startKey);
+        scan.setStopRow(stopKey);
+        
+        mayHaveRows = mayHaveRows || Bytes.compareTo(scan.getStartRow(), scan.getStopRow()) < 0;
+        
+        // If the scan is using skip scan filter, intersect and replace the filter.
+        if (mayHaveRows && useSkipScan) {
+            Filter filter = scan.getFilter();
+            if (filter instanceof SkipScanFilter) {
+                SkipScanFilter oldFilter = (SkipScanFilter)filter;
+                SkipScanFilter newFilter = oldFilter.intersect(startKey, stopKey);
+                if (newFilter == null) {
+                    return false;
+                }
+                // Intersect found: replace skip scan with intersected one
+                scan.setFilter(newFilter);
+            } else if (filter instanceof FilterList) {
+                FilterList filterList = (FilterList)filter;
+                Filter firstFilter = filterList.getFilters().get(0);
+                if (firstFilter instanceof SkipScanFilter) {
+                    SkipScanFilter oldFilter = (SkipScanFilter)firstFilter;
+                    SkipScanFilter newFilter = oldFilter.intersect(startKey, stopKey);
+                    if (newFilter == null) {
+                        return false;
+                    }
+                    // Intersect found: replace skip scan with intersected one
+                    List<Filter> allFilters = new ArrayList<Filter>(filterList.getFilters().size());
+                    allFilters.addAll(filterList.getFilters());
+                    allFilters.set(0, newFilter);
+                    scan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL,allFilters));
+                }
+            }
+        }
+        return mayHaveRows;
+    }
+
+    public static void andFilterAtBeginning(Scan scan, Filter andWithFilter) {
+        if (andWithFilter == null) {
+            return;
+        }
+        Filter filter = scan.getFilter();
+        if (filter == null) {
+            scan.setFilter(andWithFilter); 
+        } else if (filter instanceof FilterList && ((FilterList)filter).getOperator() == FilterList.Operator.MUST_PASS_ALL) {
+            FilterList filterList = (FilterList)filter;
+            List<Filter> allFilters = new ArrayList<Filter>(filterList.getFilters().size() + 1);
+            allFilters.add(andWithFilter);
+            allFilters.addAll(filterList.getFilters());
+            scan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL,allFilters));
+        } else {
+            scan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL,Arrays.asList(andWithFilter, filter)));
+        }
+    }
+
+    public static void andFilterAtEnd(Scan scan, Filter andWithFilter) {
+        if (andWithFilter == null) {
+            return;
+        }
+        Filter filter = scan.getFilter();
+        if (filter == null) {
+            scan.setFilter(andWithFilter); 
+        } else if (filter instanceof FilterList && ((FilterList)filter).getOperator() == FilterList.Operator.MUST_PASS_ALL) {
+            FilterList filterList = (FilterList)filter;
+            List<Filter> allFilters = new ArrayList<Filter>(filterList.getFilters().size() + 1);
+            allFilters.addAll(filterList.getFilters());
+            allFilters.add(andWithFilter);
+            scan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL,allFilters));
+        } else {
+            scan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL,Arrays.asList(filter, andWithFilter)));
+        }
+    }
+
+    public static void setTimeRange(Scan scan, long ts) {
+        try {
+            scan.setTimeRange(MetaDataProtocol.MIN_TABLE_TIMESTAMP, ts);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static byte[] getMinKey(RowKeySchema schema, List<List<KeyRange>> slots) {
+        return getKey(schema, slots, Bound.LOWER);
+    }
+
+    public static byte[] getMaxKey(RowKeySchema schema, List<List<KeyRange>> slots) {
+        return getKey(schema, slots, Bound.UPPER);
+    }
+
+    private static byte[] getKey(RowKeySchema schema, List<List<KeyRange>> slots, Bound bound) {
+        if (slots.isEmpty()) {
+            return null;
+        }
+        int[] position = new int[slots.size()];
+        int maxLength = 0;
+        for (int i = 0; i < position.length; i++) {
+            position[i] = bound == Bound.LOWER ? 0 : slots.get(i).size()-1;
+            KeyRange range = slots.get(i).get(position[i]);
+            maxLength += range.getRange(bound).length + (schema.getField(i).getDataType().isFixedWidth() ? 0 : 1);
+        }
+        byte[] key = new byte[maxLength];
+        int length = setKey(schema, slots, position, bound, key, 0, 0, position.length);
+        if (length == 0) {
+            return null;
+        }
+        if (length == maxLength) {
+            return key;
+        }
+        byte[] keyCopy = new byte[length];
+        System.arraycopy(key, 0, keyCopy, 0, length);
+        return keyCopy;
+    }
+
+    public static int estimateMaximumKeyLength(RowKeySchema schema, int schemaStartIndex, List<List<KeyRange>> slots) {
+        int maxLowerKeyLength = 0, maxUpperKeyLength = 0;
+        for (int i = 0; i < slots.size(); i++) {
+            int maxLowerRangeLength = 0, maxUpperRangeLength = 0;
+            for (KeyRange range: slots.get(i)) {
+                maxLowerRangeLength = Math.max(maxLowerRangeLength, range.getLowerRange().length); 
+                maxUpperRangeLength = Math.max(maxUpperRangeLength, range.getUpperRange().length);
+            }
+            int trailingByte = (schema.getField(schemaStartIndex).getDataType().isFixedWidth() ||
+                    schemaStartIndex == schema.getFieldCount() - 1 ? 0 : 1);
+            maxLowerKeyLength += maxLowerRangeLength + trailingByte;
+            maxUpperKeyLength += maxUpperKeyLength + trailingByte;
+            schemaStartIndex++;
+        }
+        return Math.max(maxLowerKeyLength, maxUpperKeyLength);
+    }
+
+    /*
+     * Set the key by appending the keyRanges inside slots at positions as specified by the position array.
+     * 
+     * We need to increment part of the key range, or increment the whole key at the end, depending on the
+     * bound we are setting and whether the key range is inclusive or exclusive. The logic for determining
+     * whether to increment or not is:
+     * range/single    boundary       bound      increment
+     *  range          inclusive      lower         no
+     *  range          inclusive      upper         yes, at the end if occurs at any slots.
+     *  range          exclusive      lower         yes
+     *  range          exclusive      upper         no
+     *  single         inclusive      lower         no
+     *  single         inclusive      upper         yes, at the end if it is the last slots.
+     */
+    public static int setKey(RowKeySchema schema, List<List<KeyRange>> slots, int[] position, Bound bound,
+            byte[] key, int byteOffset, int slotStartIndex, int slotEndIndex) {
+        return setKey(schema, slots, position, bound, key, byteOffset, slotStartIndex, slotEndIndex, slotStartIndex);
+    }
+
+    public static int setKey(RowKeySchema schema, List<List<KeyRange>> slots, int[] position, Bound bound,
+            byte[] key, int byteOffset, int slotStartIndex, int slotEndIndex, int schemaStartIndex) {
+        int offset = byteOffset;
+        boolean lastInclusiveUpperSingleKey = false;
+        boolean anyInclusiveUpperRangeKey = false;
+        for (int i = slotStartIndex; i < slotEndIndex; i++) {
+            // Build up the key by appending the bound of each key range
+            // from the current position of each slot. 
+            KeyRange range = slots.get(i).get(position[i]);
+            boolean isFixedWidth = schema.getField(schemaStartIndex++).getDataType().isFixedWidth();
+            /*
+             * If the current slot is unbound then stop if:
+             * 1) setting the upper bound. There's no value in
+             *    continuing because nothing will be filtered.
+             * 2) setting the lower bound when the type is fixed length
+             *    for the same reason. However, if the type is variable width
+             *    continue building the key because null values will be filtered
+             *    since our separator byte will be appended and incremented.
+             */
+            if (  range.isUnbound(bound) &&
+                ( bound == Bound.UPPER || isFixedWidth) ){
+                break;
+            }
+            byte[] bytes = range.getRange(bound);
+            System.arraycopy(bytes, 0, key, offset, bytes.length);
+            offset += bytes.length;
+            /*
+             * We must add a terminator to a variable length key even for the last PK column if
+             * the lower key is non inclusive or the upper key is inclusive. Otherwise, we'd be
+             * incrementing the key value itself, and thus bumping it up too much.
+             */
+            boolean inclusiveUpper = range.isInclusive(bound) && bound == Bound.UPPER;
+            boolean exclusiveLower = !range.isInclusive(bound) && bound == Bound.LOWER;
+            if (!isFixedWidth && ( i < schema.getMaxFields()-1 || inclusiveUpper || exclusiveLower)) {
+                key[offset++] = QueryConstants.SEPARATOR_BYTE;
+            }
+            // If we are setting the upper bound of using inclusive single key, we remember 
+            // to increment the key if we exit the loop after this iteration.
+            // 
+            // We remember to increment the last slot if we are setting the upper bound with an
+            // inclusive range key.
+            //
+            // We cannot combine the two flags together in case for single-inclusive key followed
+            // by the range-exclusive key. In that case, we do not need to increment the end at the
+            // end. But if we combine the two flag, the single inclusive key in the middle of the
+            // key slots would cause the flag to become true.
+            lastInclusiveUpperSingleKey = range.isSingleKey() && inclusiveUpper;
+            anyInclusiveUpperRangeKey |= !range.isSingleKey() && inclusiveUpper;
+            // If we are setting the lower bound with an exclusive range key, we need to bump the
+            // slot up for each key part. For an upper bound, we bump up an inclusive key, but
+            // only after the last key part.
+            if (!range.isSingleKey() && exclusiveLower) {
+                if (!ByteUtil.nextKey(key, offset)) {
+                    // Special case for not being able to increment.
+                    // In this case we return a negative byteOffset to
+                    // remove this part from the key being formed. Since the
+                    // key has overflowed, this means that we should not
+                    // have an end key specified.
+                    return -byteOffset;
+                }
+            }
+        }
+        if (lastInclusiveUpperSingleKey || anyInclusiveUpperRangeKey) {
+            if (!ByteUtil.nextKey(key, offset)) {
+                // Special case for not being able to increment.
+                // In this case we return a negative byteOffset to
+                // remove this part from the key being formed. Since the
+                // key has overflowed, this means that we should not
+                // have an end key specified.
+                return -byteOffset;
+            }
+        }
+        // Remove trailing separator bytes, since the columns may have been added
+        // after the table has data, in which case there won't be a separator
+        // byte.
+        if (bound == Bound.LOWER) {
+            while (schemaStartIndex > 0 && offset > byteOffset && 
+                    !schema.getField(--schemaStartIndex).getDataType().isFixedWidth() && 
+                    key[offset-1] == QueryConstants.SEPARATOR_BYTE) {
+                offset--;
+            }
+        }
+        return offset - byteOffset;
+    }
+
+    public static boolean isAllSingleRowScan(List<List<KeyRange>> ranges, RowKeySchema schema) {
+        if (ranges.size() < schema.getMaxFields()) {
+            return false;
+        }
+        for (int i = 0; i < ranges.size(); i++) {
+            List<KeyRange> orRanges = ranges.get(i);
+            for (KeyRange range: orRanges) {
+                if (!range.isSingleKey()) {
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
+
+    /**
+     * Perform a binary lookup on the list of KeyRange for the tightest slot such that the slotBound
+     * of the current slot is higher or equal than the slotBound of our range. 
+     * @return  the index of the slot whose slot bound equals or are the tightest one that is 
+     *          smaller than rangeBound of range, or slots.length if no bound can be found.
+     */
+    public static int searchClosestKeyRangeWithUpperHigherThanPtr(List<KeyRange> slots, ImmutableBytesWritable ptr, int lower) {
+        int upper = slots.size() - 1;
+        int mid;
+        while (lower <= upper) {
+            mid = (lower + upper) / 2;
+            int cmp = slots.get(mid).compareUpperToLowerBound(ptr, true);
+            if (cmp < 0) {
+                lower = mid + 1;
+            } else if (cmp > 0) {
+                upper = mid - 1;
+            } else {
+                return mid;
+            }
+        }
+        mid = (lower + upper) / 2;
+        if (mid == 0 && slots.get(mid).compareUpperToLowerBound(ptr, true) > 0) {
+            return mid;
+        } else {
+            return ++mid;
+        }
+    }
+    
+    public static ScanRanges newScanRanges(List<Mutation> mutations) throws SQLException {
+        List<KeyRange> keys = Lists.newArrayListWithExpectedSize(mutations.size());
+        for (Mutation m : mutations) {
+            keys.add(PDataType.VARBINARY.getKeyRange(m.getRow()));
+        }
+        ScanRanges keyRanges = ScanRanges.create(Collections.singletonList(keys), SchemaUtil.VAR_BINARY_SCHEMA);
+        return keyRanges;
+    }
+
+    public static byte[] nextKey(byte[] key, PTable table, ImmutableBytesWritable ptr) {
+        int pos = 0;
+        RowKeySchema schema = table.getRowKeySchema();
+        int maxOffset = schema.iterator(key, ptr);
+        while (schema.next(ptr, pos, maxOffset) != null) {
+            pos++;
+        }
+        if (!schema.getField(pos-1).getDataType().isFixedWidth()) {
+            byte[] newLowerRange = new byte[key.length + 1];
+            System.arraycopy(key, 0, newLowerRange, 0, key.length);
+            newLowerRange[key.length] = QueryConstants.SEPARATOR_BYTE;
+            key = newLowerRange;
+        } else {
+            key = Arrays.copyOf(key, key.length);
+        }
+        ByteUtil.nextKey(key, key.length);
+        return key;
+    }
+}