You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/01/27 23:15:36 UTC
[20/51] [partial] Initial commit of master branch from github
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/join/MaxServerCacheSizeExceededException.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/MaxServerCacheSizeExceededException.java b/phoenix-core/src/main/java/org/apache/phoenix/join/MaxServerCacheSizeExceededException.java
new file mode 100644
index 0000000..e72d628
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/MaxServerCacheSizeExceededException.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.join;
+
+/**
+ *
+ * Exception thrown when the size of the hash cache exceeds the
+ * maximum size as specified by the phoenix.query.maxHashCacheBytes
+ * parameter in the {@link org.apache.hadoop.conf.Configuration}
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class MaxServerCacheSizeExceededException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public MaxServerCacheSizeExceededException() {
+ }
+
+ public MaxServerCacheSizeExceededException(String message) {
+ super(message);
+ }
+
+ public MaxServerCacheSizeExceededException(Throwable cause) {
+ super(cause);
+ }
+
+ public MaxServerCacheSizeExceededException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/join/ScanProjector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/ScanProjector.java b/phoenix-core/src/main/java/org/apache/phoenix/join/ScanProjector.java
new file mode 100644
index 0000000..9f8777d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/ScanProjector.java
@@ -0,0 +1,242 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.join;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.WritableUtils;
+
+import org.apache.phoenix.compile.JoinCompiler.ProjectedPTableWrapper;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.ExpressionType;
+import org.apache.phoenix.schema.KeyValueSchema;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.ValueBitSet;
+import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.SchemaUtil;
+
+public class ScanProjector {
+ public static final byte[] VALUE_COLUMN_FAMILY = Bytes.toBytes("_v");
+ public static final byte[] VALUE_COLUMN_QUALIFIER = new byte[0];
+
+ private static final String SCAN_PROJECTOR = "scanProjector";
+
+ private final KeyValueSchema schema;
+ private final Expression[] expressions;
+ private ValueBitSet valueSet;
+ private final ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+
+ public ScanProjector(ProjectedPTableWrapper projected) {
+ List<PColumn> columns = projected.getTable().getColumns();
+ expressions = new Expression[columns.size() - projected.getTable().getPKColumns().size()];
+ // we do not count minNullableIndex for we might do later merge.
+ KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
+ int i = 0;
+ for (PColumn column : projected.getTable().getColumns()) {
+ if (!SchemaUtil.isPKColumn(column)) {
+ builder.addField(column);
+ expressions[i++] = projected.getSourceExpression(column);
+ }
+ }
+ schema = builder.build();
+ valueSet = ValueBitSet.newInstance(schema);
+ }
+
+ private ScanProjector(KeyValueSchema schema, Expression[] expressions) {
+ this.schema = schema;
+ this.expressions = expressions;
+ this.valueSet = ValueBitSet.newInstance(schema);
+ }
+
+ public void setValueBitSet(ValueBitSet bitSet) {
+ this.valueSet = bitSet;
+ }
+
+ public static void serializeProjectorIntoScan(Scan scan, ScanProjector projector) {
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ try {
+ DataOutputStream output = new DataOutputStream(stream);
+ projector.schema.write(output);
+ int count = projector.expressions.length;
+ WritableUtils.writeVInt(output, count);
+ for (int i = 0; i < count; i++) {
+ WritableUtils.writeVInt(output, ExpressionType.valueOf(projector.expressions[i]).ordinal());
+ projector.expressions[i].write(output);
+ }
+ scan.setAttribute(SCAN_PROJECTOR, stream.toByteArray());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ }
+
+ public static ScanProjector deserializeProjectorFromScan(Scan scan) {
+ byte[] proj = scan.getAttribute(SCAN_PROJECTOR);
+ if (proj == null) {
+ return null;
+ }
+ ByteArrayInputStream stream = new ByteArrayInputStream(proj);
+ try {
+ DataInputStream input = new DataInputStream(stream);
+ KeyValueSchema schema = new KeyValueSchema();
+ schema.readFields(input);
+ int count = WritableUtils.readVInt(input);
+ Expression[] expressions = new Expression[count];
+ for (int i = 0; i < count; i++) {
+ int ordinal = WritableUtils.readVInt(input);
+ expressions[i] = ExpressionType.values()[ordinal].newInstance();
+ expressions[i].readFields(input);
+ }
+ return new ScanProjector(schema, expressions);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public static class ProjectedValueTuple implements Tuple {
+ private ImmutableBytesWritable keyPtr = new ImmutableBytesWritable();
+ private long timestamp;
+ private byte[] projectedValue;
+ private int bitSetLen;
+ private KeyValue keyValue;
+
+ private ProjectedValueTuple(byte[] keyBuffer, int keyOffset, int keyLength, long timestamp, byte[] projectedValue, int bitSetLen) {
+ this.keyPtr.set(keyBuffer, keyOffset, keyLength);
+ this.timestamp = timestamp;
+ this.projectedValue = projectedValue;
+ this.bitSetLen = bitSetLen;
+ }
+
+ public ImmutableBytesWritable getKeyPtr() {
+ return keyPtr;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public byte[] getProjectedValue() {
+ return projectedValue;
+ }
+
+ public int getBitSetLength() {
+ return bitSetLen;
+ }
+
+ @Override
+ public void getKey(ImmutableBytesWritable ptr) {
+ ptr.set(keyPtr.get(), keyPtr.getOffset(), keyPtr.getLength());
+ }
+
+ @Override
+ public KeyValue getValue(int index) {
+ if (index != 0) {
+ throw new IndexOutOfBoundsException(Integer.toString(index));
+ }
+ return getValue(VALUE_COLUMN_FAMILY, VALUE_COLUMN_QUALIFIER);
+ }
+
+ @Override
+ public KeyValue getValue(byte[] family, byte[] qualifier) {
+ if (keyValue == null) {
+ keyValue = KeyValueUtil.newKeyValue(keyPtr.get(), keyPtr.getOffset(), keyPtr.getLength(),
+ VALUE_COLUMN_FAMILY, VALUE_COLUMN_QUALIFIER, timestamp, projectedValue, 0, projectedValue.length);
+ }
+ return keyValue;
+ }
+
+ @Override
+ public boolean getValue(byte[] family, byte[] qualifier,
+ ImmutableBytesWritable ptr) {
+ ptr.set(projectedValue);
+ return true;
+ }
+
+ @Override
+ public boolean isImmutable() {
+ return true;
+ }
+
+ @Override
+ public int size() {
+ return 1;
+ }
+ }
+
+ public ProjectedValueTuple projectResults(Tuple tuple) {
+ byte[] bytesValue = schema.toBytes(tuple, expressions, valueSet, ptr);
+ KeyValue base = tuple.getValue(0);
+ return new ProjectedValueTuple(base.getBuffer(), base.getRowOffset(), base.getRowLength(), base.getTimestamp(), bytesValue, valueSet.getEstimatedLength());
+ }
+
+ public static void decodeProjectedValue(Tuple tuple, ImmutableBytesWritable ptr) throws IOException {
+ boolean b = tuple.getValue(VALUE_COLUMN_FAMILY, VALUE_COLUMN_QUALIFIER, ptr);
+ if (!b)
+ throw new IOException("Trying to decode a non-projected value.");
+ }
+
+ public static ProjectedValueTuple mergeProjectedValue(ProjectedValueTuple dest, KeyValueSchema destSchema, ValueBitSet destBitSet,
+ Tuple src, KeyValueSchema srcSchema, ValueBitSet srcBitSet, int offset) throws IOException {
+ ImmutableBytesWritable destValue = new ImmutableBytesWritable(dest.getProjectedValue());
+ destBitSet.clear();
+ destBitSet.or(destValue);
+ int origDestBitSetLen = dest.getBitSetLength();
+ ImmutableBytesWritable srcValue = new ImmutableBytesWritable();
+ decodeProjectedValue(src, srcValue);
+ srcBitSet.clear();
+ srcBitSet.or(srcValue);
+ int origSrcBitSetLen = srcBitSet.getEstimatedLength();
+ for (int i = 0; i < srcBitSet.getMaxSetBit(); i++) {
+ if (srcBitSet.get(i)) {
+ destBitSet.set(offset + i);
+ }
+ }
+ int destBitSetLen = destBitSet.getEstimatedLength();
+ byte[] merged = new byte[destValue.getLength() - origDestBitSetLen + srcValue.getLength() - origSrcBitSetLen + destBitSetLen];
+ int o = Bytes.putBytes(merged, 0, destValue.get(), destValue.getOffset(), destValue.getLength() - origDestBitSetLen);
+ o = Bytes.putBytes(merged, o, srcValue.get(), srcValue.getOffset(), srcValue.getLength() - origSrcBitSetLen);
+ destBitSet.toBytes(merged, o);
+ ImmutableBytesWritable keyPtr = dest.getKeyPtr();
+ return new ProjectedValueTuple(keyPtr.get(), keyPtr.getOffset(), keyPtr.getLength(), dest.getTimestamp(), merged, destBitSetLen);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/map/reduce/CSVBulkLoader.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/map/reduce/CSVBulkLoader.java b/phoenix-core/src/main/java/org/apache/phoenix/map/reduce/CSVBulkLoader.java
new file mode 100644
index 0000000..e7c6159
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/map/reduce/CSVBulkLoader.java
@@ -0,0 +1,401 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.map.reduce;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.Map;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+import com.google.common.io.Closeables;
+import org.apache.phoenix.map.reduce.util.ConfigReader;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.StringUtil;
+
+public class CSVBulkLoader {
+ private static final String UNDERSCORE = "_";
+
+ static FileWriter wr = null;
+ static BufferedWriter bw = null;
+ static boolean isDebug = false; //Set to true, if you need to log the bulk-import time.
+ static ConfigReader systemConfig = null;
+
+ static String schemaName = "";
+ static String tableName = "";
+ static String idxTable = "";
+ static String createPSQL[] = null;
+ static String skipErrors = null;
+ static String zookeeperIP = null;
+ static String mapredIP = null;
+ static String hdfsNameNode = null;
+
+ static{
+ /** load the log-file writer, if debug is true **/
+ if(isDebug){
+ try {
+ wr = new FileWriter("phoenix-bulk-import.log", false);
+ bw = new BufferedWriter(wr);
+ } catch (IOException e) {
+ System.err.println("Error preparing writer for log file :: " + e.getMessage());
+ }
+ }
+
+ /** load the Map-Reduce configs **/
+ try {
+ systemConfig = new ConfigReader("csv-bulk-load-config.properties");
+ } catch (Exception e) {
+ System.err.println("Exception occurred while reading config properties");
+ System.err.println("The bulk loader will run slower than estimated");
+ }
+ }
+
+ /**
+ * -i CSV data file path in hdfs
+ * -s Phoenix schema name
+ * -t Phoenix table name
+ * -sql Phoenix create table sql path (1 SQL statement per line)
+ * -zk Zookeeper IP:<port>
+ * -mr MapReduce Job Tracker IP:<port>
+ * -hd HDFS NameNode IP:<port>
+ * -o Output directory path in hdfs (Optional)
+ * -idx Phoenix index table name (Optional)
+ * -error Ignore error while reading rows from CSV ? (1 - YES/0 - NO, defaults to 1) (OPtional)
+ * -help Print all options (Optional)
+ */
+
+ @SuppressWarnings("deprecation")
+ public static void main(String[] args) throws Exception{
+
+ String inputFile = null;
+ String outFile = null;
+
+ Options options = new Options();
+ options.addOption("i", true, "CSV data file path");
+ options.addOption("o", true, "Output directory path");
+ options.addOption("s", true, "Phoenix schema name");
+ options.addOption("t", true, "Phoenix table name");
+ options.addOption("idx", true, "Phoenix index table name");
+ options.addOption("zk", true, "Zookeeper IP:<port>");
+ options.addOption("mr", true, "MapReduce Job Tracker IP:<port>");
+ options.addOption("hd", true, "HDFS NameNode IP:<port>");
+ options.addOption("sql", true, "Phoenix create table sql path");
+ options.addOption("error", true, "Ignore error while reading rows from CSV ? (1 - YES/0 - NO, defaults to 1)");
+ options.addOption("help", false, "All options");
+
+ CommandLineParser parser = new PosixParser();
+ CommandLine cmd = parser.parse( options, args);
+
+ if(cmd.hasOption("help")){
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp( "help", options );
+ System.exit(0);
+ }
+
+ String parser_error = "ERROR while parsing arguments. ";
+ //CSV input, table name, sql and zookeeper IP are mandatory fields
+ if(cmd.hasOption("i")){
+ inputFile = cmd.getOptionValue("i");
+ }else{
+ System.err.println(parser_error + "Please provide CSV file input path");
+ System.exit(0);
+ }
+ if(cmd.hasOption("t")){
+ tableName = cmd.getOptionValue("t");
+ }else{
+ System.err.println(parser_error + "Please provide Phoenix table name");
+ System.exit(0);
+ }
+ if(cmd.hasOption("sql")){
+ String sqlPath = cmd.getOptionValue("sql");
+ createPSQL = getCreatePSQLstmts(sqlPath);
+ }
+ if(cmd.hasOption("zk")){
+ zookeeperIP = cmd.getOptionValue("zk");
+ }else{
+ System.err.println(parser_error + "Please provide Zookeeper address");
+ System.exit(0);
+ }
+ if(cmd.hasOption("mr")){
+ mapredIP = cmd.getOptionValue("mr");
+ }else{
+ System.err.println(parser_error + "Please provide MapReduce address");
+ System.exit(0);
+ }
+ if(cmd.hasOption("hd")){
+ hdfsNameNode = cmd.getOptionValue("hd");
+ }else{
+ System.err.println(parser_error + "Please provide HDFS NameNode address");
+ System.exit(0);
+ }
+
+ if(cmd.hasOption("o")){
+ outFile = cmd.getOptionValue("o");
+ }else{
+ outFile = "phoenix-output-dir";
+ }
+ if(cmd.hasOption("s")){
+ schemaName = cmd.getOptionValue("s");
+ }
+ if(cmd.hasOption("idx")){
+ idxTable = cmd.getOptionValue("idx");
+ }
+ if(cmd.hasOption("error")){
+ skipErrors = cmd.getOptionValue("error");
+ }else{
+ skipErrors = "1";
+ }
+
+ log("[TS - START] :: " + new Date() + "\n");
+
+ Path inputPath = new Path(inputFile);
+ Path outPath = new Path(outFile);
+
+ //Create the Phoenix table in HBase
+ if (createPSQL != null) {
+ for(String s : createPSQL){
+ if(s == null || s.trim().length() == 0) {
+ continue;
+ }
+ createTable(s);
+ }
+
+ log("[TS - Table created] :: " + new Date() + "\n");
+ }
+
+ String dataTable = "";
+ if(schemaName != null && schemaName.trim().length() > 0)
+ dataTable = SchemaUtil.normalizeIdentifier(schemaName) + "." + SchemaUtil.normalizeIdentifier(tableName);
+ else
+ dataTable = SchemaUtil.normalizeIdentifier(tableName);
+
+ try {
+ validateTable();
+ } catch (SQLException e) {
+ System.err.println(e.getMessage());
+ System.exit(0);
+ }
+
+ Configuration conf = new Configuration();
+ loadMapRedConfigs(conf);
+
+ Job job = new Job(conf, "MapReduce - Phoenix bulk import");
+ job.setJarByClass(MapReduceJob.class);
+ job.setInputFormatClass(TextInputFormat.class);
+ FileInputFormat.addInputPath(job, inputPath);
+
+ FileSystem fs = FileSystem.get(conf);
+ fs.delete(outPath);
+ FileOutputFormat.setOutputPath(job, outPath);
+
+ job.setMapperClass(MapReduceJob.PhoenixMapper.class);
+ job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+ job.setMapOutputValueClass(KeyValue.class);
+
+ SchemaMetrics.configureGlobally(conf);
+
+ HTable hDataTable = new HTable(conf, dataTable);
+
+ // Auto configure partitioner and reducer according to the Main Data table
+ HFileOutputFormat.configureIncrementalLoad(job, hDataTable);
+
+ job.waitForCompletion(true);
+
+ log("[TS - M-R HFile generated..Now dumping to HBase] :: " + new Date() + "\n");
+
+ LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
+ loader.doBulkLoad(new Path(outFile), hDataTable);
+
+ log("[TS - FINISH] :: " + new Date() + "\n");
+ if(isDebug) bw.close();
+
+ }
+
+ private static void createTable(String stmt) {
+
+ Connection conn = null;
+ PreparedStatement statement = null;
+
+ try {
+ conn = DriverManager.getConnection(getUrl(), "", "");
+ try {
+ statement = conn.prepareStatement(stmt);
+ statement.execute();
+ conn.commit();
+ } finally {
+ if(statement != null) {
+ statement.close();
+ }
+ }
+ } catch (Exception e) {
+ System.err.println("Error creating the table :: " + e.getMessage());
+ } finally{
+ try {
+ if(conn != null) {
+ conn.close();
+ }
+ } catch (Exception e) {
+ System.err.println("Failed to close connection :: " + e.getMessage());
+ }
+ }
+ }
+
+ /**
+ * Perform any required validation on the table being bulk loaded into:
+ * - ensure no column family names start with '_', as they'd be ignored leading to problems.
+ * @throws SQLException
+ */
+ private static void validateTable() throws SQLException {
+
+ Connection conn = DriverManager.getConnection(getUrl());
+ try {
+ ResultSet rs = conn.getMetaData().getColumns(null, StringUtil.escapeLike(schemaName), StringUtil.escapeLike(tableName), null);
+ while (rs.next()) {
+ String familyName = rs.getString(1);
+ if (familyName != null && familyName.startsWith(UNDERSCORE)) {
+ String msg;
+ if (QueryConstants.DEFAULT_COLUMN_FAMILY.equals(familyName)) {
+ msg = "CSV Bulk Loader error: All column names that are not part of the primary key constraint must be prefixed with a column family name (i.e. f.my_column VARCHAR)";
+ } else {
+ msg = "CSV Bulk Loader error: Column family name must not start with '_': " + familyName;
+ }
+ throw new SQLException(msg);
+ }
+ }
+ } finally{
+ conn.close();
+ }
+ }
+
+ private static String getUrl() {
+ return PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zookeeperIP;
+ }
+
+ private static void loadMapRedConfigs(Configuration conf){
+
+ conf.set("IGNORE.INVALID.ROW", skipErrors);
+ conf.set("schemaName", schemaName);
+ conf.set("tableName", tableName);
+ conf.set("zk", zookeeperIP);
+ conf.set("hbase.zookeeper.quorum", zookeeperIP);
+ conf.set("fs.default.name", hdfsNameNode);
+ conf.set("mapred.job.tracker", mapredIP);
+
+ //Load the other System-Configs
+ try {
+
+ Map<String, String> configs = systemConfig.getAllConfigMap();
+
+ if(configs.containsKey("mapreduce.map.output.compress")){
+ String s = configs.get("mapreduce.map.output.compress");
+ if(s != null && s.trim().length() > 0)
+ conf.set("mapreduce.map.output.compress", s);
+ }
+
+ if(configs.containsKey("mapreduce.map.output.compress.codec")){
+ String s = configs.get("mapreduce.map.output.compress.codec");
+ if(s != null && s.trim().length() > 0)
+ conf.set("mapreduce.map.output.compress.codec", s);
+ }
+
+ if(configs.containsKey("io.sort.record.percent")){
+ String s = configs.get("io.sort.record.percent");
+ if(s != null && s.trim().length() > 0)
+ conf.set("io.sort.record.percent", s);
+ }
+
+ if(configs.containsKey("io.sort.factor")){
+ String s = configs.get("io.sort.factor");
+ if(s != null && s.trim().length() > 0)
+ conf.set("io.sort.factor", s);
+ }
+
+ if(configs.containsKey("mapred.tasktracker.map.tasks.maximum")){
+ String s = configs.get("mapred.tasktracker.map.tasks.maximum");
+ if(s != null && s.trim().length() > 0)
+ conf.set("mapred.tasktracker.map.tasks.maximum", s);
+ }
+
+ } catch (Exception e) {
+ System.err.println("Error loading the configs :: " + e.getMessage());
+ System.err.println("The bulk loader will run slower than estimated");
+ }
+ }
+
+ private static String[] getCreatePSQLstmts(String path){
+
+ BufferedReader br = null;
+ try {
+ FileReader file = new FileReader(path);
+ br = new BufferedReader(file);
+ //Currently, we can have at-most 2 SQL statements - 1 for create table and 1 for index
+ String[] sb = new String[2];
+ String line;
+ for(int i = 0; i < 2 && (line = br.readLine()) != null ; i++){
+ sb[i] = line;
+ }
+ return sb;
+
+ } catch (IOException e) {
+ System.err.println("Error reading the file :: " + path + ", " + e.getMessage());
+ } finally {
+ if (br != null) Closeables.closeQuietly(br);
+ }
+ return null;
+ }
+
+ private static void log(String msg){
+ if(isDebug){
+ try {
+ bw.write(msg);
+ } catch (IOException e) {
+ System.err.println("Error logging the statement :: " + msg);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/map/reduce/MapReduceJob.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/map/reduce/MapReduceJob.java b/phoenix-core/src/main/java/org/apache/phoenix/map/reduce/MapReduceJob.java
new file mode 100644
index 0000000..6dd9910
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/map/reduce/MapReduceJob.java
@@ -0,0 +1,204 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.map.reduce;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import au.com.bytecode.opencsv.CSVReader;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+
+public class MapReduceJob {
+
+ public static class PhoenixMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue>{
+
+ private Connection conn_zk = null;
+ private PreparedStatement[] stmtCache;
+ private String tableName;
+ private String schemaName;
+ Map<Integer, Integer> colDetails = new LinkedHashMap<Integer, Integer>();
+ boolean ignoreUpsertError = true;
+ private String zookeeperIP;
+
+ /**
+ * Get the phoenix jdbc connection.
+ */
+
+ private static String getUrl(String url) {
+ return PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + url;
+ }
+
+ /***
+ * Get the column information from the table metaData.
+ * Cretae a map of col-index and col-data-type.
+ * Create the upsert Prepared Statement based on the map-size.
+ */
+
+ @Override
+ public void setup(Context context) throws InterruptedException{
+ Properties props = new Properties();
+
+ try {
+ zookeeperIP = context.getConfiguration().get("zk");
+
+ //ZK connection used to get the table meta-data
+ conn_zk = DriverManager.getConnection(getUrl(zookeeperIP), props);
+
+ schemaName = context.getConfiguration().get("schemaName");
+ tableName = context.getConfiguration().get("tableName");
+ ignoreUpsertError = context.getConfiguration().get("IGNORE.INVALID.ROW").equalsIgnoreCase("0") ? false : true;
+
+ //Get the resultset from the actual zookeeper connection. Connectionless mode throws "UnSupportedOperation" exception for this
+ ResultSet rs = conn_zk.getMetaData().getColumns(null, schemaName, tableName, null);
+ //This map holds the key-value pair of col-position and its data type
+ int i = 1;
+ while(rs.next()){
+ colDetails.put(i, rs.getInt(QueryUtil.DATA_TYPE_POSITION));
+ i++;
+ }
+
+ stmtCache = new PreparedStatement[colDetails.size()];
+ ArrayList<String> cols = new ArrayList<String>();
+ for(i = 0 ; i < colDetails.size() ; i++){
+ cols.add("?");
+ String prepValues = StringUtils.join(cols, ",");
+ String upsertStmt = "";
+ if(schemaName != null && schemaName.trim().length() > 0)
+ upsertStmt = "upsert into " + schemaName + "." + tableName + " values (" + prepValues + ")";
+ else
+ upsertStmt = "upsert into " + tableName + " values (" + prepValues + ")";
+ try {
+ stmtCache[i] = conn_zk.prepareStatement(upsertStmt);
+ } catch (SQLException e) {
+ System.err.println("Error preparing the upsert statement" + e.getMessage());
+ if(!ignoreUpsertError){
+ throw (new InterruptedException(e.getMessage()));
+ }
+ }
+ }
+ } catch (SQLException e) {
+ System.err.println("Error occurred in connecting to Phoenix HBase" + e.getMessage());
+ }
+
+ }
+
+ /* Tokenize the text input line based on the "," delimeter.
+ * TypeCast the token based on the col-data-type using the convertTypeSpecificValue API below.
+ * Upsert the data. DO NOT COMMIT.
+ * Use Phoenix's getUncommittedDataIterator API to parse the uncommited data to KeyValue pairs.
+ * Emit the row-key and KeyValue pairs from Mapper to allow sorting based on row-key.
+ * Finally, do connection.rollback( to preserve table state).
+ */
+
+ @Override
+ public void map(LongWritable key, Text line, Context context) throws IOException, InterruptedException{
+
+ CSVReader reader = new CSVReader(new InputStreamReader(new ByteArrayInputStream(line.toString().getBytes())), ',');
+ try {
+ String[] tokens = reader.readNext();
+
+ PreparedStatement upsertStatement;
+ if(tokens.length >= stmtCache.length){
+ //If CVS values are more than the number of cols in the table, apply the col count cap
+ upsertStatement = stmtCache[stmtCache.length - 1];
+ }else{
+ //Else, take the corresponding upsertStmt from cached array
+ upsertStatement = stmtCache[tokens.length - 1];
+ }
+
+ for(int i = 0 ; i < tokens.length && i < colDetails.size() ;i++){
+ upsertStatement.setObject(i+1, convertTypeSpecificValue(tokens[i], colDetails.get(new Integer(i+1))));
+ }
+
+ upsertStatement.execute();
+ } catch (SQLException e) {
+ System.err.println("Failed to upsert data in the Phoenix :: " + e.getMessage());
+ if(!ignoreUpsertError){
+ throw (new InterruptedException(e.getMessage()));
+ }
+ } catch (Exception e) {
+ System.err.println("Failed to upsert data in the Phoenix :: " + e.getMessage());
+ }finally {
+ reader.close();
+ }
+
+ Iterator<Pair<byte[],List<KeyValue>>> dataIterator = null;
+ try {
+ dataIterator = PhoenixRuntime.getUncommittedDataIterator(conn_zk);
+ } catch (SQLException e) {
+ System.err.println("Failed to retrieve the data iterator for Phoenix table :: " + e.getMessage());
+ }
+
+ while(dataIterator != null && dataIterator.hasNext()){
+ Pair<byte[],List<KeyValue>> row = dataIterator.next();
+ for(KeyValue kv : row.getSecond()){
+ context.write(new ImmutableBytesWritable(kv.getRow()), kv);
+ }
+ }
+
+ try {
+ conn_zk.rollback();
+ } catch (SQLException e) {
+ System.err.println("Transaction rollback failed.");
+ }
+ }
+
+ /*
+ * Do connection.close()
+ */
+
+ @Override
+ public void cleanup(Context context) {
+ try {
+ conn_zk.close();
+ } catch (SQLException e) {
+ System.err.println("Failed to close the JDBC connection");
+ }
+ }
+
+ private Object convertTypeSpecificValue(String s, Integer sqlType) throws Exception {
+ return PDataType.fromTypeId(sqlType).toObject(s);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/map/reduce/util/ConfigReader.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/map/reduce/util/ConfigReader.java b/phoenix-core/src/main/java/org/apache/phoenix/map/reduce/util/ConfigReader.java
new file mode 100644
index 0000000..8f739f5
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/map/reduce/util/ConfigReader.java
@@ -0,0 +1,114 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.map.reduce.util;
+
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.ResourceBundle;
+
+/**
+ * Class to read configs.
+ *
+ */
+
+public class ConfigReader
+ {
+
+ private String propertyFile = null;
+ private boolean loaded = false;
+ private static final Object _synObj = new Object();
+ private Map<String, String> properties = new HashMap<String, String>();
+ private Exception loadException = null;
+
+ /**
+ * Retrieves singleton config objects from a hashmap of stored objects,
+ * creates these objects if they aren't in the hashmap.
+ */
+
+ public ConfigReader(String propertyFile) {
+ this.propertyFile = propertyFile;
+ }
+
+ public void load() throws Exception {
+ if (loaded) {
+ if (loadException != null) {
+ throw new Exception(loadException);
+ }
+ return;
+ }
+ synchronized (_synObj) {
+ if (!loaded) {
+ try {
+ String tmpFile = propertyFile.trim();
+ if (tmpFile.endsWith(".properties")) {
+ tmpFile = tmpFile
+ .substring(0, tmpFile.lastIndexOf("."));
+ }
+ ResourceBundle resource = ResourceBundle.getBundle(tmpFile);
+ Enumeration<String> enm = resource.getKeys();
+
+ while (enm.hasMoreElements()) {
+ String key = enm.nextElement();
+ String value = resource.getString(key);
+ properties.put(key, value);
+ }
+ } catch (Exception e) {
+ System.err
+ .println("Exception while loading the config.properties file :: "
+ + e.getMessage());
+ loadException = e;
+ loaded = true;
+ throw e;
+ }
+ loaded = true;
+ }
+ }
+ }
+
+ public void addConfig(String key, String value) {
+ try {
+ load();
+ } catch (Exception e) {
+ System.err.println("ERROR :: " + e.getMessage());
+ }
+ properties.put(key, value);
+ }
+
+ public boolean hasConfig(String key) {
+ try {
+ load();
+ } catch (Exception e) {
+ System.err.println("ERROR :: " + e.getMessage());
+ }
+ return properties.containsKey(key);
+ }
+
+ public String getConfig(String key) throws Exception {
+ load();
+ return properties.get(key);
+ }
+
+ public Map<String, String> getAllConfigMap() throws Exception {
+ load();
+ return properties;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/memory/ChildMemoryManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/memory/ChildMemoryManager.java b/phoenix-core/src/main/java/org/apache/phoenix/memory/ChildMemoryManager.java
new file mode 100644
index 0000000..925cdfd
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/memory/ChildMemoryManager.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.memory;
+
+import org.apache.http.annotation.GuardedBy;
+import org.apache.http.annotation.ThreadSafe;
+
+/**
+ *
+ * Child memory manager that delegates through to global memory manager,
+ * but enforces that at most a threshold percentage is used by this
+ * memory manager. No blocking is done if the threshold is exceeded,
+ * but the standard blocking will be done by the global memory manager.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+@ThreadSafe
+public class ChildMemoryManager extends DelegatingMemoryManager {
+ private final Object sync = new Object();
+ private final int maxPercOfTotal;
+ @GuardedBy("sync")
+ private long allocatedBytes;
+
+ public ChildMemoryManager(MemoryManager mm, int maxPercOfTotal) {
+ super(mm);
+ if (mm instanceof ChildMemoryManager) {
+ throw new IllegalStateException("ChildMemoryManager cannot delegate to another ChildMemoryManager");
+ }
+ this.maxPercOfTotal = maxPercOfTotal;
+ if (maxPercOfTotal <= 0 || maxPercOfTotal > 100) {
+ throw new IllegalArgumentException("Max percentage of total memory (" + maxPercOfTotal + "%) must be greater than zero and less than or equal to 100");
+ }
+ }
+
+
+ private long adjustAllocation(long minBytes, long reqBytes) {
+ assert(reqBytes >= minBytes);
+ long availBytes = getAvailableMemory();
+ // Check if this memory managers percentage of allocated bytes exceeds its allowed maximum
+ if (minBytes > availBytes) {
+ throw new InsufficientMemoryException("Attempt to allocate more memory than the max allowed of " + maxPercOfTotal + "%");
+ }
+ // Revise reqBytes down to available memory if necessary
+ return Math.min(reqBytes,availBytes);
+ }
+
+ @Override
+ public MemoryChunk allocate(long minBytes, long nBytes) {
+ synchronized (sync) {
+ nBytes = adjustAllocation(minBytes, nBytes);
+ final MemoryChunk chunk = super.allocate(minBytes, nBytes);
+ allocatedBytes += chunk.getSize();
+ // Instantiate delegate chunk to track allocatedBytes correctly
+ return new MemoryChunk() {
+ @Override
+ public void close() {
+ synchronized (sync) {
+ allocatedBytes -= chunk.getSize();
+ chunk.close();
+ }
+ }
+
+ @Override
+ public long getSize() {
+ return chunk.getSize();
+ }
+
+ @Override
+ public void resize(long nBytes) {
+ synchronized (sync) {
+ long size = getSize();
+ long deltaBytes = nBytes - size;
+ if (deltaBytes > 0) {
+ adjustAllocation(deltaBytes,deltaBytes); // Throw if too much memory
+ }
+ chunk.resize(nBytes);
+ allocatedBytes += deltaBytes;
+ }
+ }
+ };
+ }
+ }
+
+ @Override
+ public long getAvailableMemory() {
+ synchronized (sync) {
+ long availBytes = getMaxMemory() - allocatedBytes;
+ // Sanity check (should never happen)
+ if (availBytes < 0) {
+ throw new IllegalStateException("Available memory has become negative: " + availBytes + " bytes. Allocated memory: " + allocatedBytes + " bytes.");
+ }
+ return availBytes;
+ }
+ }
+
+ @Override
+ public long getMaxMemory() {
+ return maxPercOfTotal * super.getMaxMemory() / 100;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/memory/DelegatingMemoryManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/memory/DelegatingMemoryManager.java b/phoenix-core/src/main/java/org/apache/phoenix/memory/DelegatingMemoryManager.java
new file mode 100644
index 0000000..f50c43d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/memory/DelegatingMemoryManager.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.memory;
+
+/**
+ *
+ * Memory manager that delegates through to another memory manager.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class DelegatingMemoryManager implements MemoryManager {
+ private final MemoryManager parent;
+
+ public DelegatingMemoryManager(MemoryManager globalMemoryManager){
+ this.parent = globalMemoryManager;
+ }
+
+ @Override
+ public long getAvailableMemory() {
+ return parent.getAvailableMemory();
+ }
+
+ @Override
+ public long getMaxMemory() {
+ return parent.getMaxMemory();
+ }
+
+ @Override
+ public MemoryChunk allocate(long minBytes, long reqBytes) {
+ return parent.allocate(minBytes, reqBytes);
+ }
+
+
+ @Override
+ public MemoryChunk allocate(long nBytes) {
+ return allocate(nBytes, nBytes);
+ }
+
+ public MemoryManager getParent() {
+ return parent;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java b/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java
new file mode 100644
index 0000000..38a3cd9
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.memory;
+
+import org.apache.http.annotation.GuardedBy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ *
+ * Global memory manager to track course grained memory usage across all requests.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class GlobalMemoryManager implements MemoryManager {
+ private static final Logger logger = LoggerFactory.getLogger(GlobalMemoryManager.class);
+
+ private final Object sync = new Object();
+ private final long maxMemoryBytes;
+ private final int maxWaitMs;
+ @GuardedBy("sync")
+ private volatile long usedMemoryBytes;
+
+ public GlobalMemoryManager(long maxBytes, int maxWaitMs) {
+ if (maxBytes <= 0) {
+ throw new IllegalStateException("Total number of available bytes (" + maxBytes + ") must be greater than zero");
+ }
+ if (maxWaitMs < 0) {
+ throw new IllegalStateException("Maximum wait time (" + maxWaitMs + ") must be greater than or equal to zero");
+ }
+ this.maxMemoryBytes = maxBytes;
+ this.maxWaitMs = maxWaitMs;
+ this.usedMemoryBytes = 0;
+ }
+
+ @Override
+ public long getAvailableMemory() {
+ synchronized(sync) {
+ return maxMemoryBytes - usedMemoryBytes;
+ }
+ }
+
+ @Override
+ public long getMaxMemory() {
+ return maxMemoryBytes;
+ }
+
+
+ // TODO: Work on fairness: One big memory request can cause all others to block here.
+ private long allocateBytes(long minBytes, long reqBytes) {
+ if (minBytes < 0 || reqBytes < 0) {
+ throw new IllegalStateException("Minimum requested bytes (" + minBytes + ") and requested bytes (" + reqBytes + ") must be greater than zero");
+ }
+ if (minBytes > maxMemoryBytes) { // No need to wait, since we'll never have this much available
+ throw new InsufficientMemoryException("Requested memory of " + minBytes + " bytes is larger than global pool of " + maxMemoryBytes + " bytes.");
+ }
+ long startTimeMs = System.currentTimeMillis(); // Get time outside of sync block to account for waiting for lock
+ long nBytes;
+ synchronized(sync) {
+ while (usedMemoryBytes + minBytes > maxMemoryBytes) { // Only wait if minBytes not available
+ try {
+ long remainingWaitTimeMs = maxWaitMs - (System.currentTimeMillis() - startTimeMs);
+ if (remainingWaitTimeMs <= 0) { // Ran out of time waiting for some memory to get freed up
+ throw new InsufficientMemoryException("Requested memory of " + minBytes + " bytes could not be allocated from remaining memory of " + usedMemoryBytes + " bytes from global pool of " + maxMemoryBytes + " bytes after waiting for " + maxWaitMs + "ms.");
+ }
+ sync.wait(remainingWaitTimeMs);
+ } catch (InterruptedException ie) {
+ throw new RuntimeException("Interrupted allocation of " + minBytes + " bytes", ie);
+ }
+ }
+ // Allocate at most reqBytes, but at least minBytes
+ nBytes = Math.min(reqBytes, maxMemoryBytes - usedMemoryBytes);
+ if (nBytes < minBytes) {
+ throw new IllegalStateException("Allocated bytes (" + nBytes + ") should be at least the minimum requested bytes (" + minBytes + ")");
+ }
+ usedMemoryBytes += nBytes;
+ }
+ return nBytes;
+ }
+
+ @Override
+ public MemoryChunk allocate(long minBytes, long reqBytes) {
+ long nBytes = allocateBytes(minBytes, reqBytes);
+ return newMemoryChunk(nBytes);
+ }
+
+ @Override
+ public MemoryChunk allocate(long nBytes) {
+ return allocate(nBytes,nBytes);
+ }
+
+ protected MemoryChunk newMemoryChunk(long sizeBytes) {
+ return new GlobalMemoryChunk(sizeBytes);
+ }
+
+ private class GlobalMemoryChunk implements MemoryChunk {
+ private volatile long size;
+
+ private GlobalMemoryChunk(long size) {
+ if (size < 0) {
+ throw new IllegalStateException("Size of memory chunk must be greater than zero, but instead is " + size);
+ }
+ this.size = size;
+ }
+
+ @Override
+ public long getSize() {
+ synchronized(sync) {
+ return size; // TODO: does this need to be synchronized?
+ }
+ }
+
+ @Override
+ public void resize(long nBytes) {
+ if (nBytes < 0) {
+ throw new IllegalStateException("Number of bytes to resize to must be greater than zero, but instead is " + nBytes);
+ }
+ synchronized(sync) {
+ long nAdditionalBytes = (nBytes - size);
+ if (nAdditionalBytes < 0) {
+ usedMemoryBytes += nAdditionalBytes;
+ size = nBytes;
+ sync.notifyAll();
+ } else {
+ allocateBytes(nAdditionalBytes, nAdditionalBytes);
+ size = nBytes;
+ }
+ }
+ }
+
+ /**
+ * Check that MemoryChunk has previously been closed.
+ */
+ @Override
+ protected void finalize() throws Throwable {
+ try {
+ close();
+ if (size > 0) {
+ logger.warn("Orphaned chunk of " + size + " bytes found during finalize");
+ }
+ // TODO: log error here, but we can't use SFDC logging
+ // because this runs in an hbase coprocessor.
+ // Create a gack-like API (talk with GridForce or HBase folks)
+ } finally {
+ super.finalize();
+ }
+ }
+
+ @Override
+ public void close() {
+ synchronized(sync) {
+ usedMemoryBytes -= size;
+ size = 0;
+ sync.notifyAll();
+ }
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/memory/InsufficientMemoryException.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/memory/InsufficientMemoryException.java b/phoenix-core/src/main/java/org/apache/phoenix/memory/InsufficientMemoryException.java
new file mode 100644
index 0000000..8e7eaff
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/memory/InsufficientMemoryException.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.memory;
+
+/**
+ *
+ * Exception thrown by MemoryManager when insufficient memory is available
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class InsufficientMemoryException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public InsufficientMemoryException() {
+ }
+
+ public InsufficientMemoryException(String message) {
+ super(message);
+ }
+
+ public InsufficientMemoryException(Throwable cause) {
+ super(cause);
+ }
+
+ public InsufficientMemoryException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/memory/MemoryManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/memory/MemoryManager.java b/phoenix-core/src/main/java/org/apache/phoenix/memory/MemoryManager.java
new file mode 100644
index 0000000..e9b9355
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/memory/MemoryManager.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.memory;
+
+import java.io.Closeable;
+
+/**
+ *
+ * Memory manager used to track memory usage. Either throttles
+ * memory usage by blocking when the max memory is reached or
+ * allocates up to a maximum without blocking.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public interface MemoryManager {
+ /**
+ * Get the total amount of memory (in bytes) that may be allocated.
+ */
+ long getMaxMemory();
+
+ /**
+ * Get the amount of available memory (in bytes) not yet allocated.
+ */
+ long getAvailableMemory();
+
+ /**
+ * Allocate up to reqBytes of memory, dialing the amount down to
+ * minBytes if full amount is not available. If minBytes is not
+ * available, then this call will block for a configurable amount
+ * of time and throw if minBytes does not become available.
+ * @param minBytes minimum number of bytes required
+ * @param reqBytes requested number of bytes. Must be greater
+ * than or equal to minBytes
+ * @return MemoryChunk that was allocated
+ * @throws InsufficientMemoryException if unable to allocate minBytes
+ * during configured amount of time
+ */
+ MemoryChunk allocate(long minBytes, long reqBytes);
+
+ /**
+ * Equivalent to calling {@link #allocate(long, long)} where
+ * minBytes and reqBytes being the same.
+ */
+ MemoryChunk allocate(long nBytes);
+
+ /**
+ *
+ * Chunk of allocated memory. To reclaim the memory, call {@link #close()}
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+ public static interface MemoryChunk extends Closeable {
+ /**
+ * Get the size in bytes of the allocated chunk.
+ */
+ long getSize();
+
+ /**
+ * Free up the memory associated with this chunk
+ */
+ @Override
+ void close();
+
+ /**
+ * Resize an already allocated memory chunk up or down to a
+ * new amount. If decreasing allocation, this call will not block.
+ * If increasing allocation, and nBytes is not available, then
+ * this call will block for a configurable amount of time and
+ * throw if nBytes does not become available. Most commonly
+ * used to adjust the allocation of a memory buffer that was
+ * originally sized for the worst case scenario.
+ * @param nBytes new number of bytes required for this chunk
+ * @throws InsufficientMemoryException if unable to allocate minBytes
+ * during configured amount of time
+ */
+ void resize(long nBytes);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
new file mode 100644
index 0000000..b60f742
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
@@ -0,0 +1,239 @@
+package org.apache.phoenix.optimize;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.apache.phoenix.compile.ColumnProjector;
+import org.apache.phoenix.compile.IndexStatementRewriter;
+import org.apache.phoenix.compile.QueryCompiler;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.parse.HintNode;
+import org.apache.phoenix.parse.HintNode.Hint;
+import org.apache.phoenix.parse.ParseNodeFactory;
+import org.apache.phoenix.parse.SelectStatement;
+import org.apache.phoenix.parse.TableNode;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.ColumnNotFoundException;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PDatum;
+import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+
+public class QueryOptimizer {
+ private static final ParseNodeFactory FACTORY = new ParseNodeFactory();
+
+ private final QueryServices services;
+ private final boolean useIndexes;
+
+ public QueryOptimizer(QueryServices services) {
+ this.services = services;
+ this.useIndexes = this.services.getProps().getBoolean(QueryServices.USE_INDEXES_ATTRIB, QueryServicesOptions.DEFAULT_USE_INDEXES);
+ }
+
+ public QueryPlan optimize(SelectStatement select, PhoenixStatement statement) throws SQLException {
+ return optimize(select, statement, Collections.<PColumn>emptyList(), null);
+ }
+
+ public QueryPlan optimize(SelectStatement select, PhoenixStatement statement, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory) throws SQLException {
+ QueryCompiler compiler = new QueryCompiler(statement, targetColumns, parallelIteratorFactory);
+ QueryPlan dataPlan = compiler.compile(select);
+ if (!useIndexes || select.getFrom().size() > 1) {
+ return dataPlan;
+ }
+ // Get the statement as it's been normalized now
+ // TODO: the recompile for the index tables could skip the normalize step
+ select = (SelectStatement)dataPlan.getStatement();
+ PTable dataTable = dataPlan.getTableRef().getTable();
+ List<PTable>indexes = Lists.newArrayList(dataTable.getIndexes());
+ if (indexes.isEmpty() || dataPlan.getTableRef().hasDynamicCols() || select.getHint().hasHint(Hint.NO_INDEX)) {
+ return dataPlan;
+ }
+
+ // The targetColumns is set for UPSERT SELECT to ensure that the proper type conversion takes place.
+ // For a SELECT, it is empty. In this case, we want to set the targetColumns to match the projection
+ // from the dataPlan to ensure that the metadata for when an index is used matches the metadata for
+ // when the data table is used.
+ if (targetColumns.isEmpty()) {
+ List<? extends ColumnProjector> projectors = dataPlan.getProjector().getColumnProjectors();
+ List<PDatum> targetDatums = Lists.newArrayListWithExpectedSize(projectors.size());
+ for (ColumnProjector projector : projectors) {
+ targetDatums.add(projector.getExpression());
+ }
+ targetColumns = targetDatums;
+ }
+
+ SelectStatement translatedIndexSelect = IndexStatementRewriter.translate(select, dataPlan.getContext().getResolver());
+ List<QueryPlan> plans = Lists.newArrayListWithExpectedSize(1 + indexes.size());
+ plans.add(dataPlan);
+ QueryPlan hintedPlan = getHintedQueryPlan(statement, translatedIndexSelect, indexes, targetColumns, parallelIteratorFactory, plans);
+ if (hintedPlan != null) {
+ return hintedPlan;
+ }
+ for (PTable index : indexes) {
+ addPlan(statement, translatedIndexSelect, index, targetColumns, parallelIteratorFactory, plans);
+ }
+
+ return chooseBestPlan(select, plans);
+ }
+
+ private static QueryPlan getHintedQueryPlan(PhoenixStatement statement, SelectStatement select, List<PTable> indexes, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, List<QueryPlan> plans) throws SQLException {
+ QueryPlan dataPlan = plans.get(0);
+ String indexHint = select.getHint().getHint(Hint.INDEX);
+ if (indexHint == null) {
+ return null;
+ }
+ int startIndex = 0;
+ String alias = dataPlan.getTableRef().getTableAlias();
+ String prefix = HintNode.PREFIX + (alias == null ? dataPlan.getTableRef().getTable().getName().getString() : alias) + HintNode.SEPARATOR;
+ while (startIndex < indexHint.length()) {
+ startIndex = indexHint.indexOf(prefix, startIndex);
+ if (startIndex < 0) {
+ return null;
+ }
+ startIndex += prefix.length();
+ boolean done = false; // true when SUFFIX found
+ while (startIndex < indexHint.length() && !done) {
+ int endIndex;
+ int endIndex1 = indexHint.indexOf(HintNode.SEPARATOR, startIndex);
+ int endIndex2 = indexHint.indexOf(HintNode.SUFFIX, startIndex);
+ if (endIndex1 < 0 && endIndex2 < 0) { // Missing SUFFIX shouldn't happen
+ endIndex = indexHint.length();
+ } else if (endIndex1 < 0) {
+ done = true;
+ endIndex = endIndex2;
+ } else if (endIndex2 < 0) {
+ endIndex = endIndex1;
+ } else {
+ endIndex = Math.min(endIndex1, endIndex2);
+ done = endIndex2 == endIndex;
+ }
+ String indexName = indexHint.substring(startIndex, endIndex);
+ int indexPos = getIndexPosition(indexes, indexName);
+ if (indexPos >= 0) {
+ // Hinted index is applicable, so return it. It'll be the plan at position 1, after the data plan
+ if (addPlan(statement, select, indexes.get(indexPos), targetColumns, parallelIteratorFactory, plans)) {
+ return plans.get(1);
+ }
+ indexes.remove(indexPos);
+ }
+ startIndex = endIndex + 1;
+ }
+ }
+ return null;
+ }
+
+ private static int getIndexPosition(List<PTable> indexes, String indexName) {
+ for (int i = 0; i < indexes.size(); i++) {
+ if (indexName.equals(indexes.get(i).getTableName().getString())) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ private static boolean addPlan(PhoenixStatement statement, SelectStatement select, PTable index, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, List<QueryPlan> plans) throws SQLException {
+ QueryPlan dataPlan = plans.get(0);
+ int nColumns = dataPlan.getProjector().getColumnCount();
+ String alias = '"' + dataPlan.getTableRef().getTableAlias() + '"'; // double quote in case it's case sensitive
+ String schemaName = dataPlan.getTableRef().getTable().getSchemaName().getString();
+ schemaName = schemaName.length() == 0 ? null : '"' + schemaName + '"';
+
+ String tableName = '"' + index.getTableName().getString() + '"';
+ List<? extends TableNode> tables = Collections.singletonList(FACTORY.namedTable(alias, FACTORY.table(schemaName, tableName)));
+ try {
+ SelectStatement indexSelect = FACTORY.select(select, tables);
+ QueryCompiler compiler = new QueryCompiler(statement, targetColumns, parallelIteratorFactory);
+ QueryPlan plan = compiler.compile(indexSelect);
+ // Checking the index status and number of columns handles the wildcard cases correctly
+ // We can't check the status earlier, because the index table may be out-of-date.
+ if (plan.getTableRef().getTable().getIndexState() == PIndexState.ACTIVE && plan.getProjector().getColumnCount() == nColumns) {
+ plans.add(plan);
+ return true;
+ }
+ } catch (ColumnNotFoundException e) {
+ /* Means that a column is being used that's not in our index.
+ * Since we currently don't keep stats, we don't know the selectivity of the index.
+ * For now, we just don't use this index (as opposed to trying to join back from
+ * the index table to the data table.
+ */
+ }
+ return false;
+ }
+
+ /**
+ * Choose the best plan among all the possible ones.
+ * Since we don't keep stats yet, we use the following simple algorithm:
+ * 1) If the query has an ORDER BY and a LIMIT, choose the plan that has all the ORDER BY expression
+ * in the same order as the row key columns.
+ * 2) If there are more than one plan that meets (1), choose the plan with:
+ * a) the most row key columns that may be used to form the start/stop scan key.
+ * b) the plan that preserves ordering for a group by.
+ * c) the data table plan
+ * @param plans the list of candidate plans
+ * @return
+ */
+ private QueryPlan chooseBestPlan(SelectStatement select, List<QueryPlan> plans) {
+ QueryPlan firstPlan = plans.get(0);
+ if (plans.size() == 1) {
+ return firstPlan;
+ }
+
+ List<QueryPlan> candidates = Lists.newArrayListWithExpectedSize(plans.size());
+ if (firstPlan.getLimit() == null) {
+ candidates.addAll(plans);
+ } else {
+ for (QueryPlan plan : plans) {
+ // If ORDER BY optimized out (or not present at all)
+ if (plan.getOrderBy().getOrderByExpressions().isEmpty()) {
+ candidates.add(plan);
+ }
+ }
+ if (candidates.isEmpty()) {
+ candidates.addAll(plans);
+ }
+ }
+ final int comparisonOfDataVersusIndexTable = select.getHint().hasHint(Hint.USE_DATA_OVER_INDEX_TABLE) ? -1 : 1;
+ Collections.sort(candidates, new Comparator<QueryPlan>() {
+
+ @Override
+ public int compare(QueryPlan plan1, QueryPlan plan2) {
+ int c = plan2.getContext().getScanRanges().getRanges().size() - plan1.getContext().getScanRanges().getRanges().size();
+ if (c != 0) return c;
+ if (plan1.getGroupBy()!=null && plan2.getGroupBy()!=null) {
+ if (plan1.getGroupBy().isOrderPreserving() != plan2.getGroupBy().isOrderPreserving()) {
+ return plan1.getGroupBy().isOrderPreserving() ? -1 : 1;
+ }
+ }
+ // Use smaller table (table with fewest kv columns)
+ PTable table1 = plan1.getTableRef().getTable();
+ PTable table2 = plan2.getTableRef().getTable();
+ c = (table1.getColumns().size() - table1.getPKColumns().size()) - (table2.getColumns().size() - table2.getPKColumns().size());
+ if (c != 0) return c;
+
+ // All things being equal, just use the index table
+ // TODO: have hint that drives this
+ if (plan1.getTableRef().getTable().getType() == PTableType.INDEX) {
+ return comparisonOfDataVersusIndexTable;
+ }
+ if (plan2.getTableRef().getTable().getType() == PTableType.INDEX) {
+ return -comparisonOfDataVersusIndexTable;
+ }
+
+ return 0;
+ }
+
+ });
+
+ return candidates.get(0);
+
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/parse/AddColumnStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/AddColumnStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/AddColumnStatement.java
new file mode 100644
index 0000000..1059fce
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/AddColumnStatement.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.phoenix.schema.PTableType;
+
+public class AddColumnStatement extends AlterTableStatement {
+ private final List<ColumnDef> columnDefs;
+ private final boolean ifNotExists;
+ private final Map<String,Object> props;
+
+ protected AddColumnStatement(NamedTableNode table, PTableType tableType, List<ColumnDef> columnDefs, boolean ifNotExists, Map<String, Object> props) {
+ super(table, tableType);
+ this.columnDefs = columnDefs;
+ this.props = props == null ? Collections.<String,Object>emptyMap() : props;
+ this.ifNotExists = ifNotExists;
+ }
+
+ public List<ColumnDef> getColumnDefs() {
+ return columnDefs;
+ }
+
+ public boolean ifNotExists() {
+ return ifNotExists;
+ }
+
+ public Map<String,Object> getProps() {
+ return props;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/parse/AddParseNode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/AddParseNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/AddParseNode.java
new file mode 100644
index 0000000..f4ae491
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/AddParseNode.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+
+
+
+/**
+ *
+ * Node representing addition in a SQL expression
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class AddParseNode extends ArithmeticParseNode {
+
+ AddParseNode(List<ParseNode> children) {
+ super(children);
+ }
+
+ @Override
+ public <T> T accept(ParseNodeVisitor<T> visitor) throws SQLException {
+ List<T> l = Collections.emptyList();
+ if (visitor.visitEnter(this)) {
+ l = acceptChildren(visitor);
+ }
+ return visitor.visitLeave(this, l);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/parse/AggregateFunctionParseNode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/AggregateFunctionParseNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/AggregateFunctionParseNode.java
new file mode 100644
index 0000000..a715dd7
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/AggregateFunctionParseNode.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import java.util.List;
+
+
+public class AggregateFunctionParseNode extends FunctionParseNode {
+
+ public AggregateFunctionParseNode(String name, List<ParseNode> children, BuiltInFunctionInfo info) {
+ super(name, children, info);
+ }
+
+ /**
+ * Aggregate function are not stateless, even though all the args may be stateless,
+ * for example, COUNT(1)
+ */
+ @Override
+ public boolean isStateless() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/parse/AliasedNode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/AliasedNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/AliasedNode.java
new file mode 100644
index 0000000..d2302fd
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/AliasedNode.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import org.apache.phoenix.util.SchemaUtil;
+
+/**
+ *
+ * Node representing an aliased parse node in a SQL select clause
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class AliasedNode {
+ private final String alias;
+ private final ParseNode node;
+ private final boolean isCaseSensitve;
+
+ public AliasedNode(String alias, ParseNode node) {
+ this.isCaseSensitve = alias != null && SchemaUtil.isCaseSensitive(alias);
+ this.alias = alias == null ? null : SchemaUtil.normalizeIdentifier(alias);
+ this.node = node;
+ }
+
+ public String getAlias() {
+ return alias;
+ }
+
+ public ParseNode getNode() {
+ return node;
+ }
+
+ public boolean isCaseSensitve() {
+ return isCaseSensitve;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/parse/AlterIndexStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/AlterIndexStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/AlterIndexStatement.java
new file mode 100644
index 0000000..bee7498
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/AlterIndexStatement.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import org.apache.phoenix.schema.PIndexState;
+
+public class AlterIndexStatement extends SingleTableSQLStatement {
+ private final String dataTableName;
+ private final boolean ifExists;
+ private final PIndexState indexState;
+
+ public AlterIndexStatement(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState indexState) {
+ super(indexTableNode,0);
+ this.dataTableName = dataTableName;
+ this.ifExists = ifExists;
+ this.indexState = indexState;
+ }
+
+ public String getTableName() {
+ return dataTableName;
+ }
+
+ @Override
+ public int getBindCount() {
+ return 0;
+ }
+
+ public boolean ifExists() {
+ return ifExists;
+ }
+
+ public PIndexState getIndexState() {
+ return indexState;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/parse/AlterTableStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/AlterTableStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/AlterTableStatement.java
new file mode 100644
index 0000000..e6d4c80
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/AlterTableStatement.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import org.apache.phoenix.schema.PTableType;
+
+public abstract class AlterTableStatement extends SingleTableSQLStatement {
+ private final PTableType tableType;
+
+ AlterTableStatement(NamedTableNode table, PTableType tableType) {
+ super(table, 0);
+ this.tableType = tableType;
+ }
+
+ public PTableType getTableType() {
+ return tableType;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/parse/AndParseNode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/AndParseNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/AndParseNode.java
new file mode 100644
index 0000000..452d893
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/AndParseNode.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+
+
+
+
+/**
+ *
+ * Node representing AND in a SQL expression
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class AndParseNode extends CompoundParseNode {
+
+ AndParseNode(List<ParseNode> children) {
+ super(children);
+ }
+
+ @Override
+ public <T> T accept(ParseNodeVisitor<T> visitor) throws SQLException {
+ List<T> l = Collections.emptyList();
+ if (visitor.visitEnter(this)) {
+ l = acceptChildren(visitor);
+ }
+ return visitor.visitLeave(this, l);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/parse/ArithmeticParseNode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ArithmeticParseNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ArithmeticParseNode.java
new file mode 100644
index 0000000..4e90960
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ArithmeticParseNode.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import java.util.List;
+
+public abstract class ArithmeticParseNode extends CompoundParseNode {
+
+ public ArithmeticParseNode(List<ParseNode> children) {
+ super(children);
+ }
+
+}