You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/01/27 20:23:08 UTC
[07/51] [partial] Initial commit
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
new file mode 100644
index 0000000..b81af60
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
@@ -0,0 +1,320 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.Reader;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.util.Pair;
+
+import com.google.common.collect.Lists;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+
+/**
+ *
+ * Collection of non JDBC compliant utility methods
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class PhoenixRuntime {
+ /**
+ * Use this connection property to control HBase timestamps
+ * by specifying your own long timestamp value at connection time. All
+ * queries will use this as the upper bound of the time range for scans
+ * and DDL, and DML will use this as t he timestamp for key values.
+ */
+ public static final String CURRENT_SCN_ATTRIB = "CurrentSCN";
+
+ /**
+ * Root for the JDBC URL that the Phoenix accepts accepts.
+ */
+ public final static String JDBC_PROTOCOL = "jdbc:phoenix";
+ public final static char JDBC_PROTOCOL_TERMINATOR = ';';
+ public final static char JDBC_PROTOCOL_SEPARATOR = ':';
+
+ @Deprecated
+ public final static String EMBEDDED_JDBC_PROTOCOL = PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+
+ /**
+ * Use this connection property to control the number of rows that are
+ * batched together on an UPSERT INTO table1... SELECT ... FROM table2.
+ * It's only used when autoCommit is true and your source table is
+ * different than your target table or your SELECT statement has a
+ * GROUP BY clause.
+ */
+ public final static String UPSERT_BATCH_SIZE_ATTRIB = "UpsertBatchSize";
+
+ /**
+ * Use this connection property to help with fairness of resource allocation
+ * for the client and server. The value of the attribute determines the
+ * bucket used to rollup resource usage for a particular tenant/organization. Each tenant
+ * may only use a percentage of total resources, governed by the {@link org.apache.phoenix.query.QueryServices}
+ * configuration properties
+ */
+ public static final String TENANT_ID_ATTRIB = "TenantId";
+
+ /**
+ * Use this as the zookeeper quorum name to have a connection-less connection. This enables
+ * Phoenix-compatible HFiles to be created in a map/reduce job by creating tables,
+ * upserting data into them, and getting the uncommitted state through {@link #getUncommittedData(Connection)}
+ */
+ public final static String CONNECTIONLESS = "none";
+
+ private static final String UPGRADE_OPTION = "-u";
+ private static final String TABLE_OPTION = "-t";
+ private static final String HEADER_OPTION = "-h";
+ private static final String STRICT_OPTION = "-s";
+ private static final String CSV_OPTION = "-d";
+ private static final String HEADER_IN_LINE = "in-line";
+ private static final String SQL_FILE_EXT = ".sql";
+ private static final String CSV_FILE_EXT = ".csv";
+
+ private static void usageError() {
+ System.err.println("Usage: psql [-t table-name] [-h comma-separated-column-names | in-line] [-d field-delimiter-char quote-char escape-char]<zookeeper> <path-to-sql-or-csv-file>...\n" +
+ " By default, the name of the CSV file is used to determine the Phoenix table into which the CSV data is loaded\n" +
+ " and the ordinal value of the columns determines the mapping.\n" +
+ " -t overrides the table into which the CSV data is loaded\n" +
+ " -h overrides the column names to which the CSV data maps\n" +
+ " A special value of in-line indicating that the first line of the CSV file\n" +
+ " determines the column to which the data maps.\n" +
+ " -s uses strict mode by throwing an exception if a column name doesn't match during CSV loading.\n" +
+ " -d uses custom delimiters for CSV loader, need to specify single char for field delimiter, phrase delimiter, and escape char.\n" +
+ " number is NOT usually a delimiter and shall be taken as 1 -> ctrl A, 2 -> ctrl B ... 9 -> ctrl I. \n" +
+ "Examples:\n" +
+ " psql localhost my_ddl.sql\n" +
+ " psql localhost my_ddl.sql my_table.csv\n" +
+ " psql -t my_table my_cluster:1825 my_table2012-Q3.csv\n" +
+ " psql -t my_table -h col1,col2,col3 my_cluster:1825 my_table2012-Q3.csv\n" +
+ " psql -t my_table -h col1,col2,col3 -d 1 2 3 my_cluster:1825 my_table2012-Q3.csv\n"
+ );
+ System.exit(-1);
+ }
+ /**
+ * Provides a mechanism to run SQL scripts against, where the arguments are:
+ * 1) connection URL string
+ * 2) one or more paths to either SQL scripts or CSV files
+ * If a CurrentSCN property is set on the connection URL, then it is incremented
+ * between processing, with each file being processed by a new connection at the
+ * increment timestamp value.
+ */
+ public static void main(String [] args) {
+ if (args.length < 2) {
+ usageError();
+ }
+ PhoenixConnection conn = null;
+ try {
+ String tableName = null;
+ List<String> columns = null;
+ boolean isStrict = false;
+ boolean isUpgrade = false;
+ List<String> delimiter = new ArrayList<String>();
+
+ int i = 0;
+ for (; i < args.length; i++) {
+ if (TABLE_OPTION.equals(args[i])) {
+ if (++i == args.length || tableName != null) {
+ usageError();
+ }
+ tableName = args[i];
+ } else if (HEADER_OPTION.equals(args[i])) {
+ if (++i >= args.length || columns != null) {
+ usageError();
+ }
+ String header = args[i];
+ if (HEADER_IN_LINE.equals(header)) {
+ columns = Collections.emptyList();
+ } else {
+ columns = Lists.newArrayList();
+ StringTokenizer tokenizer = new StringTokenizer(header,",");
+ while(tokenizer.hasMoreTokens()) {
+ columns.add(tokenizer.nextToken());
+ }
+ }
+ } else if (STRICT_OPTION.equals(args[i])) {
+ isStrict = true;
+ } else if (UPGRADE_OPTION.equals(args[i])) {
+ isUpgrade = true;
+ } else if (CSV_OPTION.equals(args[i])) {
+ for(int j=0; j < 3; j++) {
+ if(args[++i].length()==1){
+ delimiter.add(args[i]);
+ } else {
+ usageError();
+ }
+ }
+ } else {
+ break;
+ }
+ }
+ if (i == args.length) {
+ usageError();
+ }
+
+ Properties props = new Properties();
+ if (isUpgrade) {
+ props.setProperty(SchemaUtil.UPGRADE_TO_2_0, Integer.toString(SchemaUtil.SYSTEM_TABLE_NULLABLE_VAR_LENGTH_COLUMNS));
+ }
+ String connectionUrl = JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + args[i++];
+ conn = DriverManager.getConnection(connectionUrl, props).unwrap(PhoenixConnection.class);
+
+ if (SchemaUtil.upgradeColumnCount(connectionUrl, props) > 0) {
+ SchemaUtil.upgradeTo2(conn);
+ return;
+ }
+
+ for (; i < args.length; i++) {
+ String fileName = args[i];
+ if (fileName.endsWith(SQL_FILE_EXT)) {
+ PhoenixRuntime.executeStatements(conn, new FileReader(args[i]), Collections.emptyList());
+ } else if (fileName.endsWith(CSV_FILE_EXT)) {
+ if (tableName == null) {
+ tableName = fileName.substring(fileName.lastIndexOf(File.separatorChar) + 1, fileName.length()-CSV_FILE_EXT.length());
+ }
+ CSVLoader csvLoader = new CSVLoader(conn, tableName, columns, isStrict, delimiter);
+ csvLoader.upsert(fileName);
+ } else {
+ usageError();
+ }
+ Long scn = conn.getSCN();
+ // If specifying SCN, increment it between processing files to allow
+ // for later files to see earlier files tables.
+ if (scn != null) {
+ scn++;
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, scn.toString());
+ conn.close();
+ conn = DriverManager.getConnection(connectionUrl, props).unwrap(PhoenixConnection.class);
+ }
+ }
+ } catch (Throwable t) {
+ t.printStackTrace();
+ } finally {
+ if(conn != null) {
+ try {
+ conn.close();
+ } catch (SQLException e) {
+ //going to shut jvm down anyway. So might as well feast on it.
+ }
+ }
+ System.exit(0);
+ }
+ }
+
+ private PhoenixRuntime() {
+ }
+
+ /**
+ * Runs a series of semicolon-terminated SQL statements using the connection provided, returning
+ * the number of SQL statements executed. Note that if the connection has specified an SCN through
+ * the {@link org.apache.phoenix.util.PhoenixRuntime#CURRENT_SCN_ATTRIB} connection property, then the timestamp
+ * is bumped up by one after each statement execution.
+ * @param conn an open JDBC connection
+ * @param reader a reader for semicolumn separated SQL statements
+ * @param binds the binds for all statements
+ * @return the number of SQL statements that were executed
+ * @throws IOException
+ * @throws SQLException
+ */
+ public static int executeStatements(Connection conn, Reader reader, List<Object> binds) throws IOException,SQLException {
+ PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
+ // Turn auto commit to true when running scripts in case there's DML
+ pconn.setAutoCommit(true);
+ return pconn.executeStatements(reader, binds, System.out);
+ }
+
+ /**
+ * Get the list of uncommitted KeyValues for the connection. Currently used to write an
+ * Phoenix-compliant HFile from a map/reduce job.
+ * @param conn an open JDBC connection
+ * @return the list of HBase mutations for uncommitted data
+ * @throws SQLException
+ */
+ @Deprecated
+ public static List<KeyValue> getUncommittedData(Connection conn) throws SQLException {
+ Iterator<Pair<byte[],List<KeyValue>>> iterator = getUncommittedDataIterator(conn);
+ if (iterator.hasNext()) {
+ return iterator.next().getSecond();
+ }
+ return Collections.emptyList();
+ }
+
+ /**
+ * Get the list of uncommitted KeyValues for the connection. Currently used to write an
+ * Phoenix-compliant HFile from a map/reduce job.
+ * @param conn an open JDBC connection
+ * @return the list of HBase mutations for uncommitted data
+ * @throws SQLException
+ */
+ public static Iterator<Pair<byte[],List<KeyValue>>> getUncommittedDataIterator(Connection conn) throws SQLException {
+ return getUncommittedDataIterator(conn, false);
+ }
+
+ /**
+ * Get the list of uncommitted KeyValues for the connection. Currently used to write an
+ * Phoenix-compliant HFile from a map/reduce job.
+ * @param conn an open JDBC connection
+ * @return the list of HBase mutations for uncommitted data
+ * @throws SQLException
+ */
+ public static Iterator<Pair<byte[],List<KeyValue>>> getUncommittedDataIterator(Connection conn, boolean includeMutableIndexes) throws SQLException {
+ final Iterator<Pair<byte[],List<Mutation>>> iterator = conn.unwrap(PhoenixConnection.class).getMutationState().toMutations(includeMutableIndexes);
+ return new Iterator<Pair<byte[],List<KeyValue>>>() {
+
+ @Override
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public Pair<byte[], List<KeyValue>> next() {
+ Pair<byte[],List<Mutation>> pair = iterator.next();
+ List<KeyValue> keyValues = Lists.newArrayListWithExpectedSize(pair.getSecond().size() * 5); // Guess-timate 5 key values per row
+ for (Mutation mutation : pair.getSecond()) {
+ for (List<KeyValue> keyValueList : mutation.getFamilyMap().values()) {
+ for (KeyValue keyValue : keyValueList) {
+ keyValues.add(keyValue);
+ }
+ }
+ }
+ Collections.sort(keyValues, KeyValue.COMPARATOR);
+ return new Pair<byte[], List<KeyValue>>(pair.getFirst(),keyValues);
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/util/QueryUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/util/QueryUtil.java b/src/main/java/org/apache/phoenix/util/QueryUtil.java
new file mode 100644
index 0000000..c932a16
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/util/QueryUtil.java
@@ -0,0 +1,68 @@
+package org.apache.phoenix.util;
+
+import java.sql.*;
+
+public class QueryUtil {
+
+ /**
+ * Column name index within ResultSet resulting from {@link DatabaseMetaData#getColumns(String, String, String, String)}
+ */
+ public static final int COLUMN_NAME_POSITION = 4;
+ /**
+ * Data type index within ResultSet resulting from {@link DatabaseMetaData#getColumns(String, String, String, String)}
+ */
+ public static final int DATA_TYPE_POSITION = 5;
+
+ /**
+ * Generates the upsert statement based on number of ColumnInfo. If
+ * ColumnInfo is unavailable, it produces a generic UPSERT query without
+ * columns information using number of columns.
+ *
+ * @return Upsert Statement
+ */
+ public static String constructUpsertStatement(ColumnInfo[] columnTypes,
+ String tableName, int numColumns) {
+ if(numColumns <= 0) {
+ throw new RuntimeException("Number of columns in HBase table cannot be less than 1");
+ }
+ StringBuilder sb = new StringBuilder();
+ sb.append("UPSERT INTO ");
+ sb.append(tableName);
+ if (columnTypes != null) {
+ sb.append("(");
+ for (ColumnInfo columnType : columnTypes) {
+ if (columnType != null) {
+ sb.append(columnType.getColumnName());
+ sb.append(",");
+ }
+ }
+ // Remove the trailing comma
+ sb.setLength(sb.length() - 1);
+ sb.append(") ");
+ }
+ sb.append("\n");
+ sb.append("VALUES (");
+ for (short i = 0; i < numColumns - 1; i++) {
+ sb.append("?,");
+ }
+ sb.append("?)");
+
+ return sb.toString();
+ }
+
+ public static String getUrl(String server) {
+ return PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + server;
+ }
+
+ public static String getExplainPlan(ResultSet rs) throws SQLException {
+ StringBuilder buf = new StringBuilder();
+ while (rs.next()) {
+ buf.append(rs.getString(1));
+ buf.append('\n');
+ }
+ if (buf.length() > 0) {
+ buf.setLength(buf.length()-1);
+ }
+ return buf.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/util/ReadOnlyProps.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/util/ReadOnlyProps.java b/src/main/java/org/apache/phoenix/util/ReadOnlyProps.java
new file mode 100644
index 0000000..6fcbc3d
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/util/ReadOnlyProps.java
@@ -0,0 +1,239 @@
+package org.apache.phoenix.util;
+
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.google.common.collect.*;
+
+/**
+ *
+ * Read-only properties that avoids unnecessary synchronization in
+ * java.util.Properties.
+ *
+ * @author jtaylor
+ * @since 1.2.2
+ */
+public class ReadOnlyProps implements Iterable<Entry<String, String>> {
+ public static final ReadOnlyProps EMPTY_PROPS = new ReadOnlyProps(Iterators.<Entry<String, String>>emptyIterator());
+ private final Map<String, String> props;
+
+ public ReadOnlyProps(Iterator<Entry<String, String>> iterator) {
+ Map<String, String> map = Maps.newHashMap();
+ while (iterator.hasNext()) {
+ Entry<String,String> entry = iterator.next();
+ map.put(entry.getKey(), entry.getValue());
+ }
+ this.props = ImmutableMap.copyOf(map);
+ }
+
+ public ReadOnlyProps(Map<String, String> props) {
+ this.props = ImmutableMap.copyOf(props);
+ }
+
+ private static Pattern varPat = Pattern.compile("\\$\\{[^\\}\\$\u0020]+\\}");
+ private static int MAX_SUBST = 20;
+
+ private String substituteVars(String expr) {
+ if (expr == null) {
+ return null;
+ }
+ Matcher match = varPat.matcher("");
+ String eval = expr;
+ for(int s=0; s<MAX_SUBST; s++) {
+ match.reset(eval);
+ if (!match.find()) {
+ return eval;
+ }
+ String var = match.group();
+ var = var.substring(2, var.length()-1); // remove ${ .. }
+ String val = null;
+ try {
+ val = System.getProperty(var);
+ } catch(SecurityException se) {
+ }
+ if (val == null) {
+ val = getRaw(var);
+ }
+ if (val == null) {
+ return eval; // return literal ${var}: var is unbound
+ }
+ // substitute
+ eval = eval.substring(0, match.start())+val+eval.substring(match.end());
+ }
+ throw new IllegalStateException("Variable substitution depth too large: "
+ + MAX_SUBST + " " + expr);
+ }
+
+ /**
+ * Get the value of the <code>name</code> property, without doing
+ * <a href="#VariableExpansion">variable expansion</a>.
+ *
+ * @param name the property name.
+ * @return the value of the <code>name</code> property,
+ * or null if no such property exists.
+ */
+ public String getRaw(String name) {
+ return props.get(name);
+ }
+
+ public String getRaw(String name, String defaultValue) {
+ String value = getRaw(name);
+ if (value == null) {
+ return defaultValue;
+ }
+ return value;
+ }
+
+ /**
+ * Get the value of the <code>name</code> property. If no such property
+ * exists, then <code>defaultValue</code> is returned.
+ *
+ * @param name property name.
+ * @param defaultValue default value.
+ * @return property value, or <code>defaultValue</code> if the property
+ * doesn't exist.
+ */
+ public String get(String name, String defaultValue) {
+ return substituteVars(getRaw(name, defaultValue));
+ }
+
+ /**
+ * Get the value of the <code>name</code> property, <code>null</code> if
+ * no such property exists.
+ *
+ * Values are processed for <a href="#VariableExpansion">variable expansion</a>
+ * before being returned.
+ *
+ * @param name the property name.
+ * @return the value of the <code>name</code> property,
+ * or null if no such property exists.
+ */
+ public String get(String name) {
+ return substituteVars(getRaw(name));
+ }
+
+ private String getHexDigits(String value) {
+ boolean negative = false;
+ String str = value;
+ String hexString = null;
+ if (value.startsWith("-")) {
+ negative = true;
+ str = value.substring(1);
+ }
+ if (str.startsWith("0x") || str.startsWith("0X")) {
+ hexString = str.substring(2);
+ if (negative) {
+ hexString = "-" + hexString;
+ }
+ return hexString;
+ }
+ return null;
+ }
+
+ /**
+ * Get the value of the <code>name</code> property as a <code>boolean</code>.
+ * If no such property is specified, or if the specified value is not a valid
+ * <code>boolean</code>, then <code>defaultValue</code> is returned.
+ *
+ * @param name property name.
+ * @param defaultValue default value.
+ * @return property value as a <code>boolean</code>,
+ * or <code>defaultValue</code>.
+ */
+ public boolean getBoolean(String name, boolean defaultValue) {
+ String valueString = get(name);
+ if ("true".equals(valueString))
+ return true;
+ else if ("false".equals(valueString))
+ return false;
+ else return defaultValue;
+ }
+
+ /**
+ * Get the value of the <code>name</code> property as an <code>int</code>.
+ *
+ * If no such property exists, or if the specified value is not a valid
+ * <code>int</code>, then <code>defaultValue</code> is returned.
+ *
+ * @param name property name.
+ * @param defaultValue default value.
+ * @return property value as an <code>int</code>,
+ * or <code>defaultValue</code>.
+ */
+ public int getInt(String name, int defaultValue) {
+ String valueString = get(name);
+ if (valueString == null)
+ return defaultValue;
+ try {
+ String hexString = getHexDigits(valueString);
+ if (hexString != null) {
+ return Integer.parseInt(hexString, 16);
+ }
+ return Integer.parseInt(valueString);
+ } catch (NumberFormatException e) {
+ return defaultValue;
+ }
+ }
+
+ /**
+ * Get the value of the <code>name</code> property as a <code>long</code>.
+ * If no such property is specified, or if the specified value is not a valid
+ * <code>long</code>, then <code>defaultValue</code> is returned.
+ *
+ * @param name property name.
+ * @param defaultValue default value.
+ * @return property value as a <code>long</code>,
+ * or <code>defaultValue</code>.
+ */
+ public long getLong(String name, long defaultValue) {
+ String valueString = get(name);
+ if (valueString == null)
+ return defaultValue;
+ try {
+ String hexString = getHexDigits(valueString);
+ if (hexString != null) {
+ return Long.parseLong(hexString, 16);
+ }
+ return Long.parseLong(valueString);
+ } catch (NumberFormatException e) {
+ return defaultValue;
+ }
+ }
+
+ /**
+ * Get the value of the <code>name</code> property as a <code>float</code>.
+ * If no such property is specified, or if the specified value is not a valid
+ * <code>float</code>, then <code>defaultValue</code> is returned.
+ *
+ * @param name property name.
+ * @param defaultValue default value.
+ * @return property value as a <code>float</code>,
+ * or <code>defaultValue</code>.
+ */
+ public float getFloat(String name, float defaultValue) {
+ String valueString = get(name);
+ if (valueString == null)
+ return defaultValue;
+ try {
+ return Float.parseFloat(valueString);
+ } catch (NumberFormatException e) {
+ return defaultValue;
+ }
+ }
+
+ /**
+ * Get the properties as a <code>Map<String,String></code>
+ *
+ * @return Map<String,String>
+ */
+ public Map<String,String> asMap() {
+ return props;
+ }
+
+ @Override
+ public Iterator<Entry<String, String>> iterator() {
+ return props.entrySet().iterator();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/util/ResultUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/util/ResultUtil.java b/src/main/java/org/apache/phoenix/util/ResultUtil.java
new file mode 100644
index 0000000..f9f2f5a
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/util/ResultUtil.java
@@ -0,0 +1,172 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.util.Arrays;
+import java.util.Comparator;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Static class for various methods that would be nice to have added to {@link org.apache.hadoop.hbase.client.Result}.
+ * These methods work off of the raw bytes preventing the explosion of Result into object form.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class ResultUtil {
+ private ResultUtil() {
+ }
+
+ /**
+ * Return a pointer into a potentially much bigger byte buffer that points to the key of a Result.
+ * @param r
+ */
+ public static ImmutableBytesWritable getKey(Result r) {
+ return getKey(r, 0);
+ }
+
+ public static void getKey(Result r, ImmutableBytesWritable key) {
+ key.set(r.getRow());
+ //key.set(getRawBytes(r), getKeyOffset(r), getKeyLength(r));
+ }
+
+ public static void getKey(KeyValue value, ImmutableBytesWritable key) {
+ key.set(value.getBuffer(), value.getRowOffset(), value.getRowLength());
+ }
+
+ /**
+ * Return a pointer into a potentially much bigger byte buffer that points to the key of a Result.
+ * Use offset to return a subset of the key bytes, for example to skip the organization ID embedded
+ * in all of our keys.
+ * @param r
+ * @param offset offset added to start of key and subtracted from key length (to select subset of key bytes)
+ */
+ public static ImmutableBytesWritable getKey(Result r, int offset) {
+ return new ImmutableBytesWritable(getRawBytes(r), getKeyOffset(r) + offset, getKeyLength(r) - offset);
+ }
+
+ public static void getKey(Result r, int offset, int length, ImmutableBytesWritable key) {
+ key.set(getRawBytes(r), getKeyOffset(r) + offset, length);
+ }
+
+ /**
+ * Comparator for comparing the keys from two Results in-place, without allocating new byte arrays
+ */
+ public static final Comparator<Result> KEY_COMPARATOR = new Comparator<Result>() {
+
+ @Override
+ public int compare(Result r1, Result r2) {
+ byte[] r1Bytes = getRawBytes(r1);
+ byte[] r2Bytes = getRawBytes(r2);
+ return Bytes.compareTo(r1Bytes, getKeyOffset(r1), getKeyLength(r1), r2Bytes, getKeyOffset(r2), getKeyLength(r2));
+ }
+
+ };
+
+ /**
+ * Get the offset into the Result byte array to the key.
+ * @param r
+ * @return
+ */
+ static int getKeyOffset(Result r) {
+ // Special case for when Result was instantiated via KeyValue array (no bytes in that case) versus returned from a scanner
+ return (r.getBytes() == null ? r.raw()[0].getOffset() : (r.getBytes().getOffset() + Bytes.SIZEOF_INT /* KV length in Result */)) + KeyValue.ROW_OFFSET /* key offset in KV */ + Bytes.SIZEOF_SHORT /* key length */;
+ }
+
+ static int getKeyLength(Result r) {
+ // Key length stored right before key as a short
+ return Bytes.toShort(getRawBytes(r), getKeyOffset(r) - Bytes.SIZEOF_SHORT);
+ }
+
+ static byte[] getRawBytes(Result r) {
+ // Handle special case for when Result was instantiated via KeyValue array (no bytes in that case) versus returned from a scanner
+ ImmutableBytesWritable rPtr = r.getBytes();
+ if (rPtr != null)
+ return rPtr.get();
+ return r.raw()[0].getBuffer();
+ }
+
+ public static int compareKeys(Result r1, Result r2) {
+ return Bytes.compareTo(getRawBytes(r1), getKeyOffset(r1), getKeyLength(r1), getRawBytes(r2), getKeyOffset(r2), getKeyLength(r2));
+ }
+
+ /**
+ * Binary search for latest column value without allocating memory in the process
+ */
+ public static KeyValue getColumnLatest(Result r, byte[] family, byte[] qualifier) {
+ byte[] rbytes = getRawBytes(r);
+ int roffset = getKeyOffset(r);
+ int rlength = getKeyLength(r);
+ return getColumnLatest(r, rbytes, roffset, rlength, family, 0, family.length, qualifier, 0, qualifier.length);
+ }
+
+ public static KeyValue getSearchTerm(Result r, byte[] family, byte[] qualifier) {
+ byte[] rbytes = getRawBytes(r);
+ int roffset = getKeyOffset(r);
+ int rlength = getKeyLength(r);
+ return KeyValue.createFirstOnRow(rbytes, roffset, rlength, family, 0, family.length, qualifier, 0, qualifier.length);
+ }
+ /**
+ * Binary search for latest column value without allocating memory in the process
+ */
+ public static KeyValue getColumnLatest(Result r, byte[] row, int roffset, int rlength, byte[] family, int foffset, int flength, byte[] qualifier, int qoffset, int qlength) {
+ KeyValue searchTerm = KeyValue.createFirstOnRow(row, roffset, rlength, family, foffset, flength, qualifier, qoffset, qlength);
+ return getColumnLatest(r,searchTerm);
+
+ }
+
+ /**
+ * Binary search for latest column value without allocating memory in the process
+ * @param r
+ * @param searchTerm
+ */
+ public static KeyValue getColumnLatest(Result r, KeyValue searchTerm) {
+ KeyValue [] kvs = r.raw(); // side effect possibly.
+ if (kvs == null || kvs.length == 0) {
+ return null;
+ }
+
+ // pos === ( -(insertion point) - 1)
+ int pos = Arrays.binarySearch(kvs, searchTerm, KeyValue.COMPARATOR);
+ // never will exact match
+ if (pos < 0) {
+ pos = (pos+1) * -1;
+ // pos is now insertion point
+ }
+ if (pos == kvs.length) {
+ return null; // doesn't exist
+ }
+
+ KeyValue kv = kvs[pos];
+ if (Bytes.compareTo(kv.getBuffer(), kv.getFamilyOffset(), kv.getFamilyLength(),
+ searchTerm.getBuffer(), searchTerm.getFamilyOffset(), searchTerm.getFamilyLength()) != 0) {
+ return null;
+ }
+ if (Bytes.compareTo(kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength(),
+ searchTerm.getBuffer(), searchTerm.getQualifierOffset(), searchTerm.getQualifierLength()) != 0) {
+ return null;
+ }
+ return kv;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/util/SQLCloseable.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/util/SQLCloseable.java b/src/main/java/org/apache/phoenix/util/SQLCloseable.java
new file mode 100644
index 0000000..11cf02f
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/util/SQLCloseable.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.sql.SQLException;
+
+/**
+ *
+ * Interface for a SQL resource that should be closed
+ * after it is no longer in use.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public interface SQLCloseable {
+ void close() throws SQLException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/util/SQLCloseables.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/util/SQLCloseables.java b/src/main/java/org/apache/phoenix/util/SQLCloseables.java
new file mode 100644
index 0000000..c255c1d
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/util/SQLCloseables.java
@@ -0,0 +1,133 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.sql.SQLException;
+import java.util.*;
+
+import com.google.common.collect.Iterables;
+
+
+/**
+ * Utilities for operating on {@link SQLCloseable}s.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class SQLCloseables {
+ /** Not constructed */
+ private SQLCloseables() { }
+
+ /**
+ * Allows you to close as many of the {@link SQLCloseable}s as possible.
+ *
+ * If any of the close's fail with an IOException, those exception(s) will
+ * be thrown after attempting to close all of the inputs.
+ */
+ public static void closeAll(Iterable<? extends SQLCloseable> iterable) throws SQLException {
+ SQLException ex = closeAllQuietly(iterable);
+ if (ex != null) throw ex;
+ }
+
+ public static SQLException closeAllQuietly(Iterable<? extends SQLCloseable> iterable) {
+ if (iterable == null) return null;
+
+ LinkedList<SQLException> exceptions = null;
+ for (SQLCloseable closeable : iterable) {
+ try {
+ closeable.close();
+ } catch (SQLException x) {
+ if (exceptions == null) exceptions = new LinkedList<SQLException>();
+ exceptions.add(x);
+ }
+ }
+
+ SQLException ex = MultipleCausesSQLException.fromSQLExceptions(exceptions);
+ return ex;
+ }
+
+ /**
+ * A subclass of {@link SQLException} that allows you to chain multiple
+ * causes together.
+ *
+ * @author jtaylor
+ * @since 0.1
+ * @see SQLCloseables
+ */
+ static private class MultipleCausesSQLException extends SQLException {
+ private static final long serialVersionUID = 1L;
+
+ static SQLException fromSQLExceptions(Collection<? extends SQLException> exceptions) {
+ if (exceptions == null || exceptions.isEmpty()) return null;
+ if (exceptions.size() == 1) return Iterables.getOnlyElement(exceptions);
+
+ return new MultipleCausesSQLException(exceptions);
+ }
+
+ private final Collection<? extends SQLException> exceptions;
+ private boolean hasSetStackTrace;
+
+ /**
+ * Use the {@link #fromIOExceptions(Collection) factory}.
+ */
+ private MultipleCausesSQLException(Collection<? extends SQLException> exceptions) {
+ this.exceptions = exceptions;
+ }
+
+ @Override
+ public String getMessage() {
+ StringBuilder sb = new StringBuilder(this.exceptions.size() * 50);
+ int exceptionNum = 0;
+ for (SQLException ex : this.exceptions) {
+ sb.append("Cause Number " + exceptionNum + ": " + ex.getMessage() + "\n");
+ exceptionNum++;
+ }
+ return sb.toString();
+ }
+
+ @Override
+ public StackTraceElement[] getStackTrace() {
+ if (!this.hasSetStackTrace) {
+ ArrayList<StackTraceElement> frames = new ArrayList<StackTraceElement>(this.exceptions.size() * 20);
+
+ int exceptionNum = 0;
+ for (SQLException exception : this.exceptions) {
+ StackTraceElement header = new StackTraceElement(MultipleCausesSQLException.class.getName(),
+ "Exception Number " + exceptionNum,
+ "<no file>",
+ 0);
+
+ frames.add(header);
+ for (StackTraceElement ste : exception.getStackTrace()) {
+ frames.add(ste);
+ }
+ exceptionNum++;
+ }
+
+ setStackTrace(frames.toArray(new StackTraceElement[frames.size()]));
+ this.hasSetStackTrace = true;
+ }
+
+ return super.getStackTrace();
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/util/ScanUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/util/ScanUtil.java b/src/main/java/org/apache/phoenix/util/ScanUtil.java
new file mode 100644
index 0000000..05337b7
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -0,0 +1,425 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.common.collect.Lists;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.filter.SkipScanFilter;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.KeyRange.Bound;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.RowKeySchema;
+
+
+/**
+ *
+ * Various utilities for scans
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class ScanUtil {
+
+ private ScanUtil() {
+ }
+
+ public static void setTenantId(Scan scan, byte[] tenantId) {
+ scan.setAttribute(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+ }
+
+ // Use getTenantId and pass in column name to match against
+ // in as PSchema attribute. If column name matches in
+ // KeyExpressions, set on scan as attribute
+ public static ImmutableBytesWritable getTenantId(Scan scan) {
+ // Create Scan with special aggregation column over which to aggregate
+ byte[] tenantId = scan.getAttribute(PhoenixRuntime.TENANT_ID_ATTRIB);
+ if (tenantId == null) {
+ return null;
+ }
+ return new ImmutableBytesWritable(tenantId);
+ }
+
+ public static Scan newScan(Scan scan) {
+ try {
+ Scan newScan = new Scan(scan);
+ // Clone the underlying family map instead of sharing it between
+ // the existing and cloned Scan (which is the retarded default
+ // behavior).
+ TreeMap<byte [], NavigableSet<byte []>> existingMap = (TreeMap<byte[], NavigableSet<byte[]>>)scan.getFamilyMap();
+ Map<byte [], NavigableSet<byte []>> clonedMap = new TreeMap<byte [], NavigableSet<byte []>>(existingMap);
+ newScan.setFamilyMap(clonedMap);
+ return newScan;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ /**
+ * Intersects the scan start/stop row with the startKey and stopKey
+ * @param scan
+ * @param startKey
+ * @param stopKey
+ * @return false if the Scan cannot possibly return rows and true otherwise
+ */
+ public static boolean intersectScanRange(Scan scan, byte[] startKey, byte[] stopKey) {
+ return intersectScanRange(scan, startKey, stopKey, false);
+ }
+
+ public static boolean intersectScanRange(Scan scan, byte[] startKey, byte[] stopKey, boolean useSkipScan) {
+ boolean mayHaveRows = false;
+ byte[] existingStartKey = scan.getStartRow();
+ byte[] existingStopKey = scan.getStopRow();
+ if (existingStartKey.length > 0) {
+ if (startKey.length == 0 || Bytes.compareTo(existingStartKey, startKey) > 0) {
+ startKey = existingStartKey;
+ }
+ } else {
+ mayHaveRows = true;
+ }
+ if (existingStopKey.length > 0) {
+ if (stopKey.length == 0 || Bytes.compareTo(existingStopKey, stopKey) < 0) {
+ stopKey = existingStopKey;
+ }
+ } else {
+ mayHaveRows = true;
+ }
+ scan.setStartRow(startKey);
+ scan.setStopRow(stopKey);
+
+ mayHaveRows = mayHaveRows || Bytes.compareTo(scan.getStartRow(), scan.getStopRow()) < 0;
+
+ // If the scan is using skip scan filter, intersect and replace the filter.
+ if (mayHaveRows && useSkipScan) {
+ Filter filter = scan.getFilter();
+ if (filter instanceof SkipScanFilter) {
+ SkipScanFilter oldFilter = (SkipScanFilter)filter;
+ SkipScanFilter newFilter = oldFilter.intersect(startKey, stopKey);
+ if (newFilter == null) {
+ return false;
+ }
+ // Intersect found: replace skip scan with intersected one
+ scan.setFilter(newFilter);
+ } else if (filter instanceof FilterList) {
+ FilterList filterList = (FilterList)filter;
+ Filter firstFilter = filterList.getFilters().get(0);
+ if (firstFilter instanceof SkipScanFilter) {
+ SkipScanFilter oldFilter = (SkipScanFilter)firstFilter;
+ SkipScanFilter newFilter = oldFilter.intersect(startKey, stopKey);
+ if (newFilter == null) {
+ return false;
+ }
+ // Intersect found: replace skip scan with intersected one
+ List<Filter> allFilters = new ArrayList<Filter>(filterList.getFilters().size());
+ allFilters.addAll(filterList.getFilters());
+ allFilters.set(0, newFilter);
+ scan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL,allFilters));
+ }
+ }
+ }
+ return mayHaveRows;
+ }
+
+ public static void andFilterAtBeginning(Scan scan, Filter andWithFilter) {
+ if (andWithFilter == null) {
+ return;
+ }
+ Filter filter = scan.getFilter();
+ if (filter == null) {
+ scan.setFilter(andWithFilter);
+ } else if (filter instanceof FilterList && ((FilterList)filter).getOperator() == FilterList.Operator.MUST_PASS_ALL) {
+ FilterList filterList = (FilterList)filter;
+ List<Filter> allFilters = new ArrayList<Filter>(filterList.getFilters().size() + 1);
+ allFilters.add(andWithFilter);
+ allFilters.addAll(filterList.getFilters());
+ scan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL,allFilters));
+ } else {
+ scan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL,Arrays.asList(andWithFilter, filter)));
+ }
+ }
+
+ public static void andFilterAtEnd(Scan scan, Filter andWithFilter) {
+ if (andWithFilter == null) {
+ return;
+ }
+ Filter filter = scan.getFilter();
+ if (filter == null) {
+ scan.setFilter(andWithFilter);
+ } else if (filter instanceof FilterList && ((FilterList)filter).getOperator() == FilterList.Operator.MUST_PASS_ALL) {
+ FilterList filterList = (FilterList)filter;
+ List<Filter> allFilters = new ArrayList<Filter>(filterList.getFilters().size() + 1);
+ allFilters.addAll(filterList.getFilters());
+ allFilters.add(andWithFilter);
+ scan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL,allFilters));
+ } else {
+ scan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL,Arrays.asList(filter, andWithFilter)));
+ }
+ }
+
+ public static void setTimeRange(Scan scan, long ts) {
+ try {
+ scan.setTimeRange(MetaDataProtocol.MIN_TABLE_TIMESTAMP, ts);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static byte[] getMinKey(RowKeySchema schema, List<List<KeyRange>> slots) {
+ return getKey(schema, slots, Bound.LOWER);
+ }
+
+ public static byte[] getMaxKey(RowKeySchema schema, List<List<KeyRange>> slots) {
+ return getKey(schema, slots, Bound.UPPER);
+ }
+
+ private static byte[] getKey(RowKeySchema schema, List<List<KeyRange>> slots, Bound bound) {
+ if (slots.isEmpty()) {
+ return null;
+ }
+ int[] position = new int[slots.size()];
+ int maxLength = 0;
+ for (int i = 0; i < position.length; i++) {
+ position[i] = bound == Bound.LOWER ? 0 : slots.get(i).size()-1;
+ KeyRange range = slots.get(i).get(position[i]);
+ maxLength += range.getRange(bound).length + (schema.getField(i).getDataType().isFixedWidth() ? 0 : 1);
+ }
+ byte[] key = new byte[maxLength];
+ int length = setKey(schema, slots, position, bound, key, 0, 0, position.length);
+ if (length == 0) {
+ return null;
+ }
+ if (length == maxLength) {
+ return key;
+ }
+ byte[] keyCopy = new byte[length];
+ System.arraycopy(key, 0, keyCopy, 0, length);
+ return keyCopy;
+ }
+
+ public static int estimateMaximumKeyLength(RowKeySchema schema, int schemaStartIndex, List<List<KeyRange>> slots) {
+ int maxLowerKeyLength = 0, maxUpperKeyLength = 0;
+ for (int i = 0; i < slots.size(); i++) {
+ int maxLowerRangeLength = 0, maxUpperRangeLength = 0;
+ for (KeyRange range: slots.get(i)) {
+ maxLowerRangeLength = Math.max(maxLowerRangeLength, range.getLowerRange().length);
+ maxUpperRangeLength = Math.max(maxUpperRangeLength, range.getUpperRange().length);
+ }
+ int trailingByte = (schema.getField(schemaStartIndex).getDataType().isFixedWidth() ||
+ schemaStartIndex == schema.getFieldCount() - 1 ? 0 : 1);
+ maxLowerKeyLength += maxLowerRangeLength + trailingByte;
+ maxUpperKeyLength += maxUpperKeyLength + trailingByte;
+ schemaStartIndex++;
+ }
+ return Math.max(maxLowerKeyLength, maxUpperKeyLength);
+ }
+
+ /*
+ * Set the key by appending the keyRanges inside slots at positions as specified by the position array.
+ *
+ * We need to increment part of the key range, or increment the whole key at the end, depending on the
+ * bound we are setting and whether the key range is inclusive or exclusive. The logic for determining
+ * whether to increment or not is:
+ * range/single boundary bound increment
+ * range inclusive lower no
+ * range inclusive upper yes, at the end if occurs at any slots.
+ * range exclusive lower yes
+ * range exclusive upper no
+ * single inclusive lower no
+ * single inclusive upper yes, at the end if it is the last slots.
+ */
+ public static int setKey(RowKeySchema schema, List<List<KeyRange>> slots, int[] position, Bound bound,
+ byte[] key, int byteOffset, int slotStartIndex, int slotEndIndex) {
+ return setKey(schema, slots, position, bound, key, byteOffset, slotStartIndex, slotEndIndex, slotStartIndex);
+ }
+
+ public static int setKey(RowKeySchema schema, List<List<KeyRange>> slots, int[] position, Bound bound,
+ byte[] key, int byteOffset, int slotStartIndex, int slotEndIndex, int schemaStartIndex) {
+ int offset = byteOffset;
+ boolean lastInclusiveUpperSingleKey = false;
+ boolean anyInclusiveUpperRangeKey = false;
+ for (int i = slotStartIndex; i < slotEndIndex; i++) {
+ // Build up the key by appending the bound of each key range
+ // from the current position of each slot.
+ KeyRange range = slots.get(i).get(position[i]);
+ boolean isFixedWidth = schema.getField(schemaStartIndex++).getDataType().isFixedWidth();
+ /*
+ * If the current slot is unbound then stop if:
+ * 1) setting the upper bound. There's no value in
+ * continuing because nothing will be filtered.
+ * 2) setting the lower bound when the type is fixed length
+ * for the same reason. However, if the type is variable width
+ * continue building the key because null values will be filtered
+ * since our separator byte will be appended and incremented.
+ */
+ if ( range.isUnbound(bound) &&
+ ( bound == Bound.UPPER || isFixedWidth) ){
+ break;
+ }
+ byte[] bytes = range.getRange(bound);
+ System.arraycopy(bytes, 0, key, offset, bytes.length);
+ offset += bytes.length;
+ /*
+ * We must add a terminator to a variable length key even for the last PK column if
+ * the lower key is non inclusive or the upper key is inclusive. Otherwise, we'd be
+ * incrementing the key value itself, and thus bumping it up too much.
+ */
+ boolean inclusiveUpper = range.isInclusive(bound) && bound == Bound.UPPER;
+ boolean exclusiveLower = !range.isInclusive(bound) && bound == Bound.LOWER;
+ if (!isFixedWidth && ( i < schema.getMaxFields()-1 || inclusiveUpper || exclusiveLower)) {
+ key[offset++] = QueryConstants.SEPARATOR_BYTE;
+ }
+ // If we are setting the upper bound of using inclusive single key, we remember
+ // to increment the key if we exit the loop after this iteration.
+ //
+ // We remember to increment the last slot if we are setting the upper bound with an
+ // inclusive range key.
+ //
+ // We cannot combine the two flags together in case for single-inclusive key followed
+ // by the range-exclusive key. In that case, we do not need to increment the end at the
+ // end. But if we combine the two flag, the single inclusive key in the middle of the
+ // key slots would cause the flag to become true.
+ lastInclusiveUpperSingleKey = range.isSingleKey() && inclusiveUpper;
+ anyInclusiveUpperRangeKey |= !range.isSingleKey() && inclusiveUpper;
+ // If we are setting the lower bound with an exclusive range key, we need to bump the
+ // slot up for each key part. For an upper bound, we bump up an inclusive key, but
+ // only after the last key part.
+ if (!range.isSingleKey() && exclusiveLower) {
+ if (!ByteUtil.nextKey(key, offset)) {
+ // Special case for not being able to increment.
+ // In this case we return a negative byteOffset to
+ // remove this part from the key being formed. Since the
+ // key has overflowed, this means that we should not
+ // have an end key specified.
+ return -byteOffset;
+ }
+ }
+ }
+ if (lastInclusiveUpperSingleKey || anyInclusiveUpperRangeKey) {
+ if (!ByteUtil.nextKey(key, offset)) {
+ // Special case for not being able to increment.
+ // In this case we return a negative byteOffset to
+ // remove this part from the key being formed. Since the
+ // key has overflowed, this means that we should not
+ // have an end key specified.
+ return -byteOffset;
+ }
+ }
+ // Remove trailing separator bytes, since the columns may have been added
+ // after the table has data, in which case there won't be a separator
+ // byte.
+ if (bound == Bound.LOWER) {
+ while (schemaStartIndex > 0 && offset > byteOffset &&
+ !schema.getField(--schemaStartIndex).getDataType().isFixedWidth() &&
+ key[offset-1] == QueryConstants.SEPARATOR_BYTE) {
+ offset--;
+ }
+ }
+ return offset - byteOffset;
+ }
+
+ public static boolean isAllSingleRowScan(List<List<KeyRange>> ranges, RowKeySchema schema) {
+ if (ranges.size() < schema.getMaxFields()) {
+ return false;
+ }
+ for (int i = 0; i < ranges.size(); i++) {
+ List<KeyRange> orRanges = ranges.get(i);
+ for (KeyRange range: orRanges) {
+ if (!range.isSingleKey()) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Perform a binary lookup on the list of KeyRange for the tightest slot such that the slotBound
+ * of the current slot is higher or equal than the slotBound of our range.
+ * @return the index of the slot whose slot bound equals or are the tightest one that is
+ * smaller than rangeBound of range, or slots.length if no bound can be found.
+ */
+ public static int searchClosestKeyRangeWithUpperHigherThanPtr(List<KeyRange> slots, ImmutableBytesWritable ptr, int lower) {
+ int upper = slots.size() - 1;
+ int mid;
+ while (lower <= upper) {
+ mid = (lower + upper) / 2;
+ int cmp = slots.get(mid).compareUpperToLowerBound(ptr, true);
+ if (cmp < 0) {
+ lower = mid + 1;
+ } else if (cmp > 0) {
+ upper = mid - 1;
+ } else {
+ return mid;
+ }
+ }
+ mid = (lower + upper) / 2;
+ if (mid == 0 && slots.get(mid).compareUpperToLowerBound(ptr, true) > 0) {
+ return mid;
+ } else {
+ return ++mid;
+ }
+ }
+
+ public static ScanRanges newScanRanges(List<Mutation> mutations) throws SQLException {
+ List<KeyRange> keys = Lists.newArrayListWithExpectedSize(mutations.size());
+ for (Mutation m : mutations) {
+ keys.add(PDataType.VARBINARY.getKeyRange(m.getRow()));
+ }
+ ScanRanges keyRanges = ScanRanges.create(Collections.singletonList(keys), SchemaUtil.VAR_BINARY_SCHEMA);
+ return keyRanges;
+ }
+
+ public static byte[] nextKey(byte[] key, PTable table, ImmutableBytesWritable ptr) {
+ int pos = 0;
+ RowKeySchema schema = table.getRowKeySchema();
+ int maxOffset = schema.iterator(key, ptr);
+ while (schema.next(ptr, pos, maxOffset) != null) {
+ pos++;
+ }
+ if (!schema.getField(pos-1).getDataType().isFixedWidth()) {
+ byte[] newLowerRange = new byte[key.length + 1];
+ System.arraycopy(key, 0, newLowerRange, 0, key.length);
+ newLowerRange[key.length] = QueryConstants.SEPARATOR_BYTE;
+ key = newLowerRange;
+ } else {
+ key = Arrays.copyOf(key, key.length);
+ }
+ ByteUtil.nextKey(key, key.length);
+ return key;
+ }
+}