You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/01/27 23:15:36 UTC

[20/51] [partial] Initial commit of master branch from github

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/join/MaxServerCacheSizeExceededException.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/MaxServerCacheSizeExceededException.java b/phoenix-core/src/main/java/org/apache/phoenix/join/MaxServerCacheSizeExceededException.java
new file mode 100644
index 0000000..e72d628
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/MaxServerCacheSizeExceededException.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.join;
+
+/**
+ * 
+ * Exception thrown when the size of the hash cache exceeds the
+ * maximum size as specified by the phoenix.query.maxHashCacheBytes
+ * parameter in the {@link org.apache.hadoop.conf.Configuration}
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class MaxServerCacheSizeExceededException extends RuntimeException {
+	private static final long serialVersionUID = 1L;
+
+	public MaxServerCacheSizeExceededException() {
+    }
+
+    public MaxServerCacheSizeExceededException(String message) {
+        super(message);
+    }
+
+    public MaxServerCacheSizeExceededException(Throwable cause) {
+        super(cause);
+    }
+
+    public MaxServerCacheSizeExceededException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/join/ScanProjector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/ScanProjector.java b/phoenix-core/src/main/java/org/apache/phoenix/join/ScanProjector.java
new file mode 100644
index 0000000..9f8777d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/ScanProjector.java
@@ -0,0 +1,242 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.join;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.WritableUtils;
+
+import org.apache.phoenix.compile.JoinCompiler.ProjectedPTableWrapper;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.ExpressionType;
+import org.apache.phoenix.schema.KeyValueSchema;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.ValueBitSet;
+import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.SchemaUtil;
+
+public class ScanProjector {    
+    public static final byte[] VALUE_COLUMN_FAMILY = Bytes.toBytes("_v");
+    public static final byte[] VALUE_COLUMN_QUALIFIER = new byte[0];
+    
+    private static final String SCAN_PROJECTOR = "scanProjector";
+    
+    private final KeyValueSchema schema;
+    private final Expression[] expressions;
+    private ValueBitSet valueSet;
+    private final ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+    
+    public ScanProjector(ProjectedPTableWrapper projected) {
+    	List<PColumn> columns = projected.getTable().getColumns();
+    	expressions = new Expression[columns.size() - projected.getTable().getPKColumns().size()];
+    	// we do not count minNullableIndex for we might do later merge.
+    	KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
+    	int i = 0;
+        for (PColumn column : projected.getTable().getColumns()) {
+        	if (!SchemaUtil.isPKColumn(column)) {
+        		builder.addField(column);
+        		expressions[i++] = projected.getSourceExpression(column);
+        	}
+        }
+        schema = builder.build();
+        valueSet = ValueBitSet.newInstance(schema);
+    }
+    
+    private ScanProjector(KeyValueSchema schema, Expression[] expressions) {
+    	this.schema = schema;
+    	this.expressions = expressions;
+    	this.valueSet = ValueBitSet.newInstance(schema);
+    }
+    
+    public void setValueBitSet(ValueBitSet bitSet) {
+        this.valueSet = bitSet;
+    }
+    
+    public static void serializeProjectorIntoScan(Scan scan, ScanProjector projector) {
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+        try {
+            DataOutputStream output = new DataOutputStream(stream);
+            projector.schema.write(output);
+            int count = projector.expressions.length;
+            WritableUtils.writeVInt(output, count);
+            for (int i = 0; i < count; i++) {
+            	WritableUtils.writeVInt(output, ExpressionType.valueOf(projector.expressions[i]).ordinal());
+            	projector.expressions[i].write(output);
+            }
+            scan.setAttribute(SCAN_PROJECTOR, stream.toByteArray());
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } finally {
+            try {
+                stream.close();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+        
+    }
+    
+    public static ScanProjector deserializeProjectorFromScan(Scan scan) {
+        byte[] proj = scan.getAttribute(SCAN_PROJECTOR);
+        if (proj == null) {
+            return null;
+        }
+        ByteArrayInputStream stream = new ByteArrayInputStream(proj);
+        try {
+            DataInputStream input = new DataInputStream(stream);
+            KeyValueSchema schema = new KeyValueSchema();
+            schema.readFields(input);
+            int count = WritableUtils.readVInt(input);
+            Expression[] expressions = new Expression[count];
+            for (int i = 0; i < count; i++) {
+            	int ordinal = WritableUtils.readVInt(input);
+            	expressions[i] = ExpressionType.values()[ordinal].newInstance();
+            	expressions[i].readFields(input);
+            }
+            return new ScanProjector(schema, expressions);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } finally {
+            try {
+                stream.close();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+    
+    public static class ProjectedValueTuple implements Tuple {
+        private ImmutableBytesWritable keyPtr = new ImmutableBytesWritable();
+        private long timestamp;
+        private byte[] projectedValue;
+        private int bitSetLen;
+        private KeyValue keyValue;
+
+        private ProjectedValueTuple(byte[] keyBuffer, int keyOffset, int keyLength, long timestamp, byte[] projectedValue, int bitSetLen) {
+            this.keyPtr.set(keyBuffer, keyOffset, keyLength);
+            this.timestamp = timestamp;
+            this.projectedValue = projectedValue;
+            this.bitSetLen = bitSetLen;
+        }
+        
+        public ImmutableBytesWritable getKeyPtr() {
+            return keyPtr;
+        }
+        
+        public long getTimestamp() {
+            return timestamp;
+        }
+        
+        public byte[] getProjectedValue() {
+            return projectedValue;
+        }
+        
+        public int getBitSetLength() {
+            return bitSetLen;
+        }
+        
+        @Override
+        public void getKey(ImmutableBytesWritable ptr) {
+            ptr.set(keyPtr.get(), keyPtr.getOffset(), keyPtr.getLength());
+        }
+
+        @Override
+        public KeyValue getValue(int index) {
+            if (index != 0) {
+                throw new IndexOutOfBoundsException(Integer.toString(index));
+            }
+            return getValue(VALUE_COLUMN_FAMILY, VALUE_COLUMN_QUALIFIER);
+        }
+
+        @Override
+        public KeyValue getValue(byte[] family, byte[] qualifier) {
+            if (keyValue == null) {
+                keyValue = KeyValueUtil.newKeyValue(keyPtr.get(), keyPtr.getOffset(), keyPtr.getLength(), 
+                        VALUE_COLUMN_FAMILY, VALUE_COLUMN_QUALIFIER, timestamp, projectedValue, 0, projectedValue.length);
+            }
+            return keyValue;
+        }
+
+        @Override
+        public boolean getValue(byte[] family, byte[] qualifier,
+                ImmutableBytesWritable ptr) {
+            ptr.set(projectedValue);
+            return true;
+        }
+
+        @Override
+        public boolean isImmutable() {
+            return true;
+        }
+
+        @Override
+        public int size() {
+            return 1;
+        }
+    }
+    
+    public ProjectedValueTuple projectResults(Tuple tuple) {
+    	byte[] bytesValue = schema.toBytes(tuple, expressions, valueSet, ptr);
+    	KeyValue base = tuple.getValue(0);
+        return new ProjectedValueTuple(base.getBuffer(), base.getRowOffset(), base.getRowLength(), base.getTimestamp(), bytesValue, valueSet.getEstimatedLength());
+    }
+    
+    public static void decodeProjectedValue(Tuple tuple, ImmutableBytesWritable ptr) throws IOException {
+    	boolean b = tuple.getValue(VALUE_COLUMN_FAMILY, VALUE_COLUMN_QUALIFIER, ptr);
+        if (!b)
+            throw new IOException("Trying to decode a non-projected value.");
+    }
+    
+    public static ProjectedValueTuple mergeProjectedValue(ProjectedValueTuple dest, KeyValueSchema destSchema, ValueBitSet destBitSet,
+    		Tuple src, KeyValueSchema srcSchema, ValueBitSet srcBitSet, int offset) throws IOException {
+    	ImmutableBytesWritable destValue = new ImmutableBytesWritable(dest.getProjectedValue());
+    	destBitSet.clear();
+    	destBitSet.or(destValue);
+    	int origDestBitSetLen = dest.getBitSetLength();
+    	ImmutableBytesWritable srcValue = new ImmutableBytesWritable();
+    	decodeProjectedValue(src, srcValue);
+    	srcBitSet.clear();
+    	srcBitSet.or(srcValue);
+    	int origSrcBitSetLen = srcBitSet.getEstimatedLength();
+    	for (int i = 0; i < srcBitSet.getMaxSetBit(); i++) {
+    		if (srcBitSet.get(i)) {
+    			destBitSet.set(offset + i);
+    		}
+    	}
+    	int destBitSetLen = destBitSet.getEstimatedLength();
+    	byte[] merged = new byte[destValue.getLength() - origDestBitSetLen + srcValue.getLength() - origSrcBitSetLen + destBitSetLen];
+    	int o = Bytes.putBytes(merged, 0, destValue.get(), destValue.getOffset(), destValue.getLength() - origDestBitSetLen);
+    	o = Bytes.putBytes(merged, o, srcValue.get(), srcValue.getOffset(), srcValue.getLength() - origSrcBitSetLen);
+    	destBitSet.toBytes(merged, o);
+    	ImmutableBytesWritable keyPtr = dest.getKeyPtr();
+        return new ProjectedValueTuple(keyPtr.get(), keyPtr.getOffset(), keyPtr.getLength(), dest.getTimestamp(), merged, destBitSetLen);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/map/reduce/CSVBulkLoader.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/map/reduce/CSVBulkLoader.java b/phoenix-core/src/main/java/org/apache/phoenix/map/reduce/CSVBulkLoader.java
new file mode 100644
index 0000000..e7c6159
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/map/reduce/CSVBulkLoader.java
@@ -0,0 +1,401 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.map.reduce;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.Map;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+import com.google.common.io.Closeables;
+import org.apache.phoenix.map.reduce.util.ConfigReader;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.StringUtil;
+
+public class CSVBulkLoader {
+	private static final String UNDERSCORE = "_";
+	
+	static FileWriter wr = null;
+	static BufferedWriter bw = null;
+	static boolean isDebug = false; //Set to true, if you need to log the bulk-import time.
+	static ConfigReader systemConfig = null;
+
+	static String schemaName = "";
+	static String tableName = "";
+	static String idxTable = "";
+	static String createPSQL[] = null;
+	static String skipErrors = null;
+	static String zookeeperIP = null;
+	static String mapredIP = null;
+	static String hdfsNameNode = null;
+
+	static{
+		/** load the log-file writer, if debug is true **/
+		if(isDebug){
+			try {
+			    wr = new FileWriter("phoenix-bulk-import.log", false);
+			    bw = new BufferedWriter(wr);
+			} catch (IOException e) {
+			    System.err.println("Error preparing writer for log file :: " + e.getMessage());
+			}
+		}
+
+		/** load the Map-Reduce configs **/
+		try {
+			systemConfig = new ConfigReader("csv-bulk-load-config.properties");
+		} catch (Exception e) {
+			System.err.println("Exception occurred while reading config properties");
+			System.err.println("The bulk loader will run slower than estimated");
+		}
+	}
+	
+	/**
+	 * -i		CSV data file path in hdfs
+	 * -s		Phoenix schema name
+	 * -t		Phoenix table name
+	 * -sql  	Phoenix create table sql path (1 SQL statement per line)
+	 * -zk		Zookeeper IP:<port>
+	 * -mr		MapReduce Job Tracker IP:<port>
+	 * -hd		HDFS NameNode IP:<port>
+	 * -o		Output directory path in hdfs (Optional)
+	 * -idx  	Phoenix index table name (Optional)
+	 * -error    	Ignore error while reading rows from CSV ? (1 - YES/0 - NO, defaults to 1) (OPtional)
+	 * -help	Print all options (Optional)
+	 */
+
+	@SuppressWarnings("deprecation")
+    	public static void main(String[] args) throws Exception{
+		
+		String inputFile = null;
+		String outFile = null;
+
+		Options options = new Options();
+		options.addOption("i", true, "CSV data file path");
+		options.addOption("o", true, "Output directory path");
+		options.addOption("s", true, "Phoenix schema name");
+		options.addOption("t", true, "Phoenix table name");
+		options.addOption("idx", true, "Phoenix index table name");
+		options.addOption("zk", true, "Zookeeper IP:<port>");
+		options.addOption("mr", true, "MapReduce Job Tracker IP:<port>");
+		options.addOption("hd", true, "HDFS NameNode IP:<port>");
+		options.addOption("sql", true, "Phoenix create table sql path");
+		options.addOption("error", true, "Ignore error while reading rows from CSV ? (1 - YES/0 - NO, defaults to 1)");
+		options.addOption("help", false, "All options");
+		
+		CommandLineParser parser = new PosixParser();
+		CommandLine cmd = parser.parse( options, args);
+		
+		if(cmd.hasOption("help")){
+			HelpFormatter formatter = new HelpFormatter();
+			formatter.printHelp( "help", options );
+			System.exit(0);
+		}
+		
+		String parser_error = "ERROR while parsing arguments. ";
+		//CSV input, table name, sql and zookeeper IP  are mandatory fields
+		if(cmd.hasOption("i")){
+			inputFile = cmd.getOptionValue("i");
+		}else{
+			System.err.println(parser_error + "Please provide CSV file input path");
+			System.exit(0);
+		}
+		if(cmd.hasOption("t")){
+			tableName = cmd.getOptionValue("t");
+		}else{
+			System.err.println(parser_error + "Please provide Phoenix table name");
+			System.exit(0);
+		}
+		if(cmd.hasOption("sql")){
+			String sqlPath = cmd.getOptionValue("sql");
+			createPSQL = getCreatePSQLstmts(sqlPath);
+		}
+		if(cmd.hasOption("zk")){
+			zookeeperIP = cmd.getOptionValue("zk");
+		}else{
+			System.err.println(parser_error + "Please provide Zookeeper address");
+			System.exit(0);
+		}
+		if(cmd.hasOption("mr")){
+			mapredIP = cmd.getOptionValue("mr");
+		}else{
+			System.err.println(parser_error + "Please provide MapReduce address");
+			System.exit(0);
+		}
+		if(cmd.hasOption("hd")){
+			hdfsNameNode = cmd.getOptionValue("hd");
+		}else{
+			System.err.println(parser_error + "Please provide HDFS NameNode address");
+			System.exit(0);
+		}
+		
+		if(cmd.hasOption("o")){
+			outFile = cmd.getOptionValue("o");
+		}else{
+			outFile = "phoenix-output-dir";
+		}
+		if(cmd.hasOption("s")){
+			schemaName = cmd.getOptionValue("s");
+		}
+		if(cmd.hasOption("idx")){
+			idxTable = cmd.getOptionValue("idx");
+		}
+		if(cmd.hasOption("error")){
+			skipErrors = cmd.getOptionValue("error");
+		}else{
+			skipErrors = "1";
+		}
+		
+		log("[TS - START] :: " + new Date() + "\n");
+
+		Path inputPath = new Path(inputFile);
+		Path outPath = new Path(outFile);
+		
+		//Create the Phoenix table in HBase
+		if (createPSQL != null) {
+    		for(String s : createPSQL){
+    			if(s == null || s.trim().length() == 0) {
+    				continue;
+    			}
+				createTable(s);
+    		}
+    		
+    		log("[TS - Table created] :: " + new Date() + "\n");
+		}
+
+        String dataTable = ""; 
+        if(schemaName != null && schemaName.trim().length() > 0)
+            dataTable = SchemaUtil.normalizeIdentifier(schemaName) + "." + SchemaUtil.normalizeIdentifier(tableName);
+        else
+            dataTable = SchemaUtil.normalizeIdentifier(tableName);
+        
+        try {
+            validateTable();
+        } catch (SQLException e) {
+            System.err.println(e.getMessage());
+            System.exit(0);
+        }
+
+        Configuration conf = new Configuration();
+		loadMapRedConfigs(conf);
+		
+		Job job = new Job(conf, "MapReduce - Phoenix bulk import");
+		job.setJarByClass(MapReduceJob.class);
+		job.setInputFormatClass(TextInputFormat.class);
+		FileInputFormat.addInputPath(job, inputPath);
+		
+		FileSystem fs = FileSystem.get(conf);
+		fs.delete(outPath);
+		FileOutputFormat.setOutputPath(job, outPath);
+		
+		job.setMapperClass(MapReduceJob.PhoenixMapper.class);
+		job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+		job.setMapOutputValueClass(KeyValue.class);
+		
+		SchemaMetrics.configureGlobally(conf);
+
+		HTable hDataTable = new HTable(conf, dataTable);
+		
+		// Auto configure partitioner and reducer according to the Main Data table
+    	HFileOutputFormat.configureIncrementalLoad(job, hDataTable);
+
+		job.waitForCompletion(true);
+	    
+		log("[TS - M-R HFile generated..Now dumping to HBase] :: " + new Date() + "\n");
+		
+    		LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
+    		loader.doBulkLoad(new Path(outFile), hDataTable);
+	    
+		log("[TS - FINISH] :: " + new Date() + "\n");
+		if(isDebug) bw.close();
+		
+	}
+	
+	private static void createTable(String stmt) {
+		
+		Connection conn = null;
+		PreparedStatement statement = null;
+
+		try {
+			conn = DriverManager.getConnection(getUrl(), "", "");
+			try {
+    			statement = conn.prepareStatement(stmt);
+    			statement.execute();
+    			conn.commit();
+			} finally {
+			    if(statement != null) {
+			        statement.close();
+			    }
+			}
+		} catch (Exception e) {
+			System.err.println("Error creating the table :: " + e.getMessage());
+		} finally{
+			try {
+			    if(conn != null) {
+			        conn.close();
+			    }
+			} catch (Exception e) {
+				System.err.println("Failed to close connection :: " + e.getMessage());
+			}
+		}
+	}
+
+	/**
+	 * Perform any required validation on the table being bulk loaded into:
+	 * - ensure no column family names start with '_', as they'd be ignored leading to problems.
+	 * @throws SQLException
+	 */
+    private static void validateTable() throws SQLException {
+        
+        Connection conn = DriverManager.getConnection(getUrl());
+        try {
+            ResultSet rs = conn.getMetaData().getColumns(null, StringUtil.escapeLike(schemaName), StringUtil.escapeLike(tableName), null);
+            while (rs.next()) {
+                String familyName = rs.getString(1);
+                if (familyName != null && familyName.startsWith(UNDERSCORE)) {
+                    String msg;
+                    if (QueryConstants.DEFAULT_COLUMN_FAMILY.equals(familyName)) {
+                        msg = "CSV Bulk Loader error: All column names that are not part of the primary key constraint must be prefixed with a column family name (i.e. f.my_column VARCHAR)";
+                    } else {
+                        msg = "CSV Bulk Loader error: Column family name must not start with '_': " + familyName;
+                    }
+                    throw new SQLException(msg);
+                }
+            }
+        } finally{
+             conn.close();
+        }
+    }
+    
+	private static String getUrl() {
+        	return PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zookeeperIP;
+    	}
+	
+	private static void loadMapRedConfigs(Configuration conf){
+
+		conf.set("IGNORE.INVALID.ROW", skipErrors);
+		conf.set("schemaName", schemaName);
+		conf.set("tableName", tableName);
+		conf.set("zk", zookeeperIP);
+		conf.set("hbase.zookeeper.quorum", zookeeperIP);
+		conf.set("fs.default.name", hdfsNameNode);
+		conf.set("mapred.job.tracker", mapredIP);
+		
+		//Load the other System-Configs
+		try {
+			
+			Map<String, String> configs = systemConfig.getAllConfigMap();
+			
+			if(configs.containsKey("mapreduce.map.output.compress")){
+				String s = configs.get("mapreduce.map.output.compress");
+				if(s != null && s.trim().length() > 0)
+					conf.set("mapreduce.map.output.compress", s);
+			}
+			
+			if(configs.containsKey("mapreduce.map.output.compress.codec")){
+				String s = configs.get("mapreduce.map.output.compress.codec");
+				if(s != null && s.trim().length() > 0)
+					conf.set("mapreduce.map.output.compress.codec", s);
+			}
+			
+			if(configs.containsKey("io.sort.record.percent")){
+				String s = configs.get("io.sort.record.percent");
+				if(s != null && s.trim().length() > 0)
+					conf.set("io.sort.record.percent", s);	
+			}
+				
+			if(configs.containsKey("io.sort.factor")){
+				String s = configs.get("io.sort.factor");
+				if(s != null && s.trim().length() > 0)
+					conf.set("io.sort.factor", s);
+			}
+			
+			if(configs.containsKey("mapred.tasktracker.map.tasks.maximum")){
+				String s = configs.get("mapred.tasktracker.map.tasks.maximum");
+				if(s != null && s.trim().length() > 0)
+					conf.set("mapred.tasktracker.map.tasks.maximum", s);
+			}
+				
+		} catch (Exception e) {
+			System.err.println("Error loading the configs :: " + e.getMessage());
+			System.err.println("The bulk loader will run slower than estimated");
+		}
+	}
+	
+	private static String[] getCreatePSQLstmts(String path){
+		
+	    BufferedReader br = null;
+		try {
+			FileReader file = new FileReader(path);
+			br = new BufferedReader(file);
+			//Currently, we can have at-most 2 SQL statements - 1 for create table and 1 for index
+			String[] sb = new String[2];
+			String line;
+			for(int i = 0; i < 2 && (line = br.readLine()) != null ; i++){
+				sb[i] = line;
+			}
+			return sb;
+			
+		} catch (IOException e) {
+			System.err.println("Error reading the file :: " + path + ", " + e.getMessage());
+		} finally {
+		    if (br != null) Closeables.closeQuietly(br);
+		}
+		return null;
+	}
+	
+	private static void log(String msg){
+		if(isDebug){
+			try {
+				bw.write(msg);
+			} catch (IOException e) {
+				System.err.println("Error logging the statement :: " + msg);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/map/reduce/MapReduceJob.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/map/reduce/MapReduceJob.java b/phoenix-core/src/main/java/org/apache/phoenix/map/reduce/MapReduceJob.java
new file mode 100644
index 0000000..6dd9910
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/map/reduce/MapReduceJob.java
@@ -0,0 +1,204 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.map.reduce;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import au.com.bytecode.opencsv.CSVReader;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+
+public class MapReduceJob {
+
+	public static class PhoenixMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue>{
+		
+		private Connection conn_zk 	= null;
+		private PreparedStatement[] stmtCache;
+		private String tableName;
+		private String schemaName;
+		Map<Integer, Integer> colDetails = new LinkedHashMap<Integer, Integer>();
+		boolean ignoreUpsertError = true;
+		private String zookeeperIP;
+		
+		/**
+		 * Get the phoenix jdbc connection.
+		 */
+		
+		private static String getUrl(String url) {
+	        	return PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + url;
+	  	}
+		
+		/***
+		 * Get the column information from the table metaData.
+		 * Cretae a map of col-index and col-data-type.
+		 * Create the upsert Prepared Statement based on the map-size.
+		 */
+		
+		@Override
+		public void setup(Context context) throws InterruptedException{
+			Properties props = new Properties();
+			
+			try {
+				zookeeperIP 		= context.getConfiguration().get("zk");
+				
+				//ZK connection used to get the table meta-data
+				conn_zk				= DriverManager.getConnection(getUrl(zookeeperIP), props);
+				
+				schemaName			= context.getConfiguration().get("schemaName");
+				tableName 			= context.getConfiguration().get("tableName");
+				ignoreUpsertError 	= context.getConfiguration().get("IGNORE.INVALID.ROW").equalsIgnoreCase("0") ? false : true;
+				
+				//Get the resultset from the actual zookeeper connection. Connectionless mode throws "UnSupportedOperation" exception for this
+				ResultSet rs 		= conn_zk.getMetaData().getColumns(null, schemaName, tableName, null);
+				//This map holds the key-value pair of col-position and its data type
+				int i = 1;
+				while(rs.next()){
+					colDetails.put(i, rs.getInt(QueryUtil.DATA_TYPE_POSITION));
+					i++;
+				}
+				
+				stmtCache = new PreparedStatement[colDetails.size()];
+				ArrayList<String> cols = new ArrayList<String>();
+				for(i = 0 ; i < colDetails.size() ; i++){
+					cols.add("?");
+					String prepValues = StringUtils.join(cols, ",");
+					String upsertStmt = ""; 
+					if(schemaName != null && schemaName.trim().length() > 0)
+						upsertStmt = "upsert into " + schemaName + "." + tableName + " values (" + prepValues + ")";
+					else
+						upsertStmt = "upsert into " + tableName + " values (" + prepValues + ")";
+					try {
+						stmtCache[i] = conn_zk.prepareStatement(upsertStmt);
+					} catch (SQLException e) {
+						System.err.println("Error preparing the upsert statement" + e.getMessage());
+						if(!ignoreUpsertError){
+							throw (new InterruptedException(e.getMessage()));
+						}
+					}
+				}
+			} catch (SQLException e) {
+					System.err.println("Error occurred in connecting to Phoenix HBase" + e.getMessage());
+			}
+			
+	  	}
+		
+		/* Tokenize the text input line based on the "," delimeter.
+		*  TypeCast the token based on the col-data-type using the convertTypeSpecificValue API below.
+		*  Upsert the data. DO NOT COMMIT.
+		*  Use Phoenix's getUncommittedDataIterator API to parse the uncommited data to KeyValue pairs.
+		*  Emit the row-key and KeyValue pairs from Mapper to allow sorting based on row-key.
+		*  Finally, do connection.rollback( to preserve table state).
+		*/
+		
+		@Override
+		public void map(LongWritable key, Text line, Context context) throws IOException, InterruptedException{
+			
+			CSVReader reader = new CSVReader(new InputStreamReader(new ByteArrayInputStream(line.toString().getBytes())), ',');			
+			try {
+				String[] tokens = reader.readNext();
+				
+				PreparedStatement upsertStatement;
+				if(tokens.length >= stmtCache.length){
+					//If CVS values are more than the number of cols in the table, apply the col count cap
+					upsertStatement = stmtCache[stmtCache.length - 1];
+				}else{
+					//Else, take the corresponding upsertStmt from cached array 
+					upsertStatement = stmtCache[tokens.length - 1];
+				}
+
+				for(int i = 0 ; i < tokens.length && i < colDetails.size() ;i++){
+					upsertStatement.setObject(i+1, convertTypeSpecificValue(tokens[i], colDetails.get(new Integer(i+1))));
+				}
+				
+				upsertStatement.execute();
+			} catch (SQLException e) {
+				System.err.println("Failed to upsert data in the Phoenix :: " + e.getMessage());
+				if(!ignoreUpsertError){
+					throw (new InterruptedException(e.getMessage()));
+				}
+			} catch (Exception e) {
+				System.err.println("Failed to upsert data in the Phoenix :: " + e.getMessage());
+			}finally {
+				reader.close();
+       			} 
+			
+			Iterator<Pair<byte[],List<KeyValue>>> dataIterator = null;
+			try {
+				dataIterator = PhoenixRuntime.getUncommittedDataIterator(conn_zk);
+			} catch (SQLException e) {
+				System.err.println("Failed to retrieve the data iterator for Phoenix table :: " + e.getMessage());
+			}
+			
+			while(dataIterator != null && dataIterator.hasNext()){
+				Pair<byte[],List<KeyValue>> row = dataIterator.next();
+				for(KeyValue kv : row.getSecond()){
+					context.write(new ImmutableBytesWritable(kv.getRow()), kv);
+				}
+			}
+			
+			try {
+			    conn_zk.rollback();
+			} catch (SQLException e) {
+				System.err.println("Transaction rollback failed.");
+			}
+		}
+		
+		/*
+		* Do connection.close()
+		*/
+		
+		@Override
+		public void cleanup(Context context) {
+	  		try {
+	  			conn_zk.close();
+			} catch (SQLException e) {
+				System.err.println("Failed to close the JDBC connection");
+			}
+	  	}
+		
+		private Object convertTypeSpecificValue(String s, Integer sqlType) throws Exception {
+			return PDataType.fromTypeId(sqlType).toObject(s);
+		}
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/map/reduce/util/ConfigReader.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/map/reduce/util/ConfigReader.java b/phoenix-core/src/main/java/org/apache/phoenix/map/reduce/util/ConfigReader.java
new file mode 100644
index 0000000..8f739f5
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/map/reduce/util/ConfigReader.java
@@ -0,0 +1,114 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.map.reduce.util;
+
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.ResourceBundle;
+
+/**
+ * Class to read configs.
+ * 
+ */
+
+public class ConfigReader
+ {
+
+	private String propertyFile = null;
+	private boolean loaded = false;
+	private static final Object _synObj = new Object();
+	private Map<String, String> properties = new HashMap<String, String>();
+	private Exception loadException = null;
+
+	/**
+	 * Retrieves singleton config objects from a hashmap of stored objects,
+	 * creates these objects if they aren't in the hashmap.
+	 */
+
+	public ConfigReader(String propertyFile) {
+		this.propertyFile = propertyFile;
+	}
+
+	public void load() throws Exception {
+		if (loaded) {
+			if (loadException != null) {
+				throw new Exception(loadException);
+			}
+			return;
+		}
+		synchronized (_synObj) {
+			if (!loaded) {
+				try {
+					String tmpFile = propertyFile.trim();
+					if (tmpFile.endsWith(".properties")) {
+						tmpFile = tmpFile
+								.substring(0, tmpFile.lastIndexOf("."));
+					}
+					ResourceBundle resource = ResourceBundle.getBundle(tmpFile);
+					Enumeration<String> enm = resource.getKeys();
+
+					while (enm.hasMoreElements()) {
+						String key = enm.nextElement();
+						String value = resource.getString(key);
+						properties.put(key, value);
+					}
+				} catch (Exception e) {
+					System.err
+							.println("Exception while loading the config.properties file :: "
+									+ e.getMessage());
+					loadException = e;
+					loaded = true;
+					throw e;
+				}
+				loaded = true;
+			}
+		}
+	}
+
+	public void addConfig(String key, String value) {
+		try {
+			load();
+		} catch (Exception e) {
+			System.err.println("ERROR :: " + e.getMessage());
+		}
+		properties.put(key, value);
+	}
+
+	public boolean hasConfig(String key) {
+		try {
+			load();
+		} catch (Exception e) {
+			System.err.println("ERROR :: " + e.getMessage());
+		}
+		return properties.containsKey(key);
+	}
+
+	public String getConfig(String key) throws Exception {
+		load();
+		return properties.get(key);
+	}
+
+	public Map<String, String> getAllConfigMap() throws Exception {
+		load();
+		return properties;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/memory/ChildMemoryManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/memory/ChildMemoryManager.java b/phoenix-core/src/main/java/org/apache/phoenix/memory/ChildMemoryManager.java
new file mode 100644
index 0000000..925cdfd
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/memory/ChildMemoryManager.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.memory;
+
+import org.apache.http.annotation.GuardedBy;
+import org.apache.http.annotation.ThreadSafe;
+
+/**
+ * 
+ * Child memory manager that delegates through to global memory manager,
+ * but enforces that at most a threshold percentage is used by this
+ * memory manager.  No blocking is done if the threshold is exceeded,
+ * but the standard blocking will be done by the global memory manager.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+@ThreadSafe
+public class ChildMemoryManager extends DelegatingMemoryManager {
+    private final Object sync = new Object();
+    private final int maxPercOfTotal;
+    @GuardedBy("sync")
+    private long allocatedBytes;
+    
+    public ChildMemoryManager(MemoryManager mm, int maxPercOfTotal) {
+        super(mm);
+        if (mm instanceof ChildMemoryManager) {
+            throw new IllegalStateException("ChildMemoryManager cannot delegate to another ChildMemoryManager");
+        }
+        this.maxPercOfTotal = maxPercOfTotal;
+        if (maxPercOfTotal <= 0 || maxPercOfTotal > 100) {
+            throw new IllegalArgumentException("Max percentage of total memory (" + maxPercOfTotal + "%) must be greater than zero and less than or equal to 100");
+        }
+    }
+
+
+    private long adjustAllocation(long minBytes, long reqBytes) {
+        assert(reqBytes >= minBytes);
+        long availBytes = getAvailableMemory();
+        // Check if this memory managers percentage of allocated bytes exceeds its allowed maximum
+        if (minBytes > availBytes) {
+            throw new InsufficientMemoryException("Attempt to allocate more memory than the max allowed of " + maxPercOfTotal + "%");
+        }
+        // Revise reqBytes down to available memory if necessary
+        return Math.min(reqBytes,availBytes);
+    }
+    
+    @Override
+    public MemoryChunk allocate(long minBytes, long nBytes) {
+        synchronized (sync) {
+            nBytes = adjustAllocation(minBytes, nBytes);
+            final MemoryChunk chunk = super.allocate(minBytes, nBytes);
+            allocatedBytes += chunk.getSize();
+            // Instantiate delegate chunk to track allocatedBytes correctly
+            return new MemoryChunk() {
+                @Override
+                public void close() {
+                    synchronized (sync) {
+                        allocatedBytes -= chunk.getSize();
+                        chunk.close();
+                    }
+                }
+    
+                @Override
+                public long getSize() {
+                    return chunk.getSize();
+                }
+    
+                @Override
+                public void resize(long nBytes) {
+                    synchronized (sync) {
+                        long size = getSize();
+                        long deltaBytes = nBytes - size;
+                        if (deltaBytes > 0) {
+                            adjustAllocation(deltaBytes,deltaBytes); // Throw if too much memory
+                        }
+                        chunk.resize(nBytes);
+                        allocatedBytes += deltaBytes;
+                    }
+                }
+            };
+        }
+    }
+
+    @Override
+    public long getAvailableMemory() {
+        synchronized (sync) {
+            long availBytes = getMaxMemory() - allocatedBytes;
+            // Sanity check (should never happen)
+            if (availBytes < 0) {
+                throw new IllegalStateException("Available memory has become negative: " + availBytes + " bytes.  Allocated memory: " + allocatedBytes + " bytes.");
+            }
+            return availBytes;
+        }
+    }
+    
+    @Override
+    public long getMaxMemory() {
+        return maxPercOfTotal  * super.getMaxMemory() / 100;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/memory/DelegatingMemoryManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/memory/DelegatingMemoryManager.java b/phoenix-core/src/main/java/org/apache/phoenix/memory/DelegatingMemoryManager.java
new file mode 100644
index 0000000..f50c43d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/memory/DelegatingMemoryManager.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.memory;
+
+/**
+ * 
+ * Memory manager that delegates through to another memory manager.
+ * 
+ * @author jtaylor
+ * @since 0.1
+ */
+public class DelegatingMemoryManager implements MemoryManager {
+    private final MemoryManager parent;
+    
+    public DelegatingMemoryManager(MemoryManager globalMemoryManager){
+        this.parent = globalMemoryManager;
+    }
+    
+    @Override
+    public long getAvailableMemory() {
+        return parent.getAvailableMemory();
+    }
+
+    @Override
+    public long getMaxMemory() {
+        return parent.getMaxMemory();
+    }
+
+    @Override
+    public MemoryChunk allocate(long minBytes, long reqBytes) {
+        return parent.allocate(minBytes, reqBytes);
+    }
+
+
+    @Override
+    public MemoryChunk allocate(long nBytes) {
+        return allocate(nBytes, nBytes);
+    }
+
+    public MemoryManager getParent() {
+        return parent;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java b/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java
new file mode 100644
index 0000000..38a3cd9
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.memory;
+
+import org.apache.http.annotation.GuardedBy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * 
+ * Global memory manager to track course grained memory usage across all requests.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class GlobalMemoryManager implements MemoryManager {
+    private static final Logger logger = LoggerFactory.getLogger(GlobalMemoryManager.class);
+    
+    private final Object sync = new Object();
+    private final long maxMemoryBytes;
+    private final int maxWaitMs;
+    @GuardedBy("sync")
+    private volatile long usedMemoryBytes;
+    
+    public GlobalMemoryManager(long maxBytes, int maxWaitMs) {
+        if (maxBytes <= 0) {
+            throw new IllegalStateException("Total number of available bytes (" + maxBytes + ") must be greater than zero");
+        }
+        if (maxWaitMs < 0) {
+            throw new IllegalStateException("Maximum wait time (" + maxWaitMs + ") must be greater than or equal to zero");
+        }
+        this.maxMemoryBytes = maxBytes;
+        this.maxWaitMs = maxWaitMs;
+        this.usedMemoryBytes = 0;
+    }
+    
+    @Override
+    public long getAvailableMemory() {
+        synchronized(sync) {
+            return maxMemoryBytes - usedMemoryBytes;
+        }
+    }
+
+    @Override
+    public long getMaxMemory() {
+        return maxMemoryBytes;
+    }
+
+
+    // TODO: Work on fairness: One big memory request can cause all others to block here.
+    private long allocateBytes(long minBytes, long reqBytes) {
+        if (minBytes < 0 || reqBytes < 0) {
+            throw new IllegalStateException("Minimum requested bytes (" + minBytes + ") and requested bytes (" + reqBytes + ") must be greater than zero");
+        }
+        if (minBytes > maxMemoryBytes) { // No need to wait, since we'll never have this much available
+            throw new InsufficientMemoryException("Requested memory of " + minBytes + " bytes is larger than global pool of " + maxMemoryBytes + " bytes.");
+        }
+        long startTimeMs = System.currentTimeMillis(); // Get time outside of sync block to account for waiting for lock
+        long nBytes;
+        synchronized(sync) {
+            while (usedMemoryBytes + minBytes > maxMemoryBytes) { // Only wait if minBytes not available
+                try {
+                    long remainingWaitTimeMs = maxWaitMs - (System.currentTimeMillis() - startTimeMs);
+                    if (remainingWaitTimeMs <= 0) { // Ran out of time waiting for some memory to get freed up
+                        throw new InsufficientMemoryException("Requested memory of " + minBytes + " bytes could not be allocated from remaining memory of " + usedMemoryBytes + " bytes from global pool of " + maxMemoryBytes + " bytes after waiting for " + maxWaitMs + "ms.");
+                    }
+                    sync.wait(remainingWaitTimeMs);
+                } catch (InterruptedException ie) {
+                    throw new RuntimeException("Interrupted allocation of " + minBytes + " bytes", ie);
+                }
+            }
+            // Allocate at most reqBytes, but at least minBytes
+            nBytes = Math.min(reqBytes, maxMemoryBytes - usedMemoryBytes);
+            if (nBytes < minBytes) {
+                throw new IllegalStateException("Allocated bytes (" + nBytes + ") should be at least the minimum requested bytes (" + minBytes + ")");
+            }
+            usedMemoryBytes += nBytes;
+        }
+        return nBytes;
+    }
+
+    @Override
+    public MemoryChunk allocate(long minBytes, long reqBytes) {
+        long nBytes = allocateBytes(minBytes, reqBytes);
+        return newMemoryChunk(nBytes);
+    }
+
+    @Override
+    public MemoryChunk allocate(long nBytes) {
+        return allocate(nBytes,nBytes);
+    }
+
+    protected MemoryChunk newMemoryChunk(long sizeBytes) {
+        return new GlobalMemoryChunk(sizeBytes);
+    }
+    
+    private class GlobalMemoryChunk implements MemoryChunk {
+        private volatile long size;
+
+        private GlobalMemoryChunk(long size) {
+            if (size < 0) {
+                throw new IllegalStateException("Size of memory chunk must be greater than zero, but instead is " + size);
+            }
+            this.size = size;
+        }
+
+        @Override
+        public long getSize() {
+            synchronized(sync) {
+                return size; // TODO: does this need to be synchronized?
+            }
+        }
+        
+        @Override
+        public void resize(long nBytes) {
+            if (nBytes < 0) {
+                throw new IllegalStateException("Number of bytes to resize to must be greater than zero, but instead is " + nBytes);
+            }
+            synchronized(sync) {
+                long nAdditionalBytes = (nBytes - size);
+                if (nAdditionalBytes < 0) {
+                    usedMemoryBytes += nAdditionalBytes;
+                    size = nBytes;
+                    sync.notifyAll();
+                } else {
+                    allocateBytes(nAdditionalBytes, nAdditionalBytes);
+                    size = nBytes;
+                }
+            }
+        }
+        
+        /**
+         * Check that MemoryChunk has previously been closed.
+         */
+        @Override
+        protected void finalize() throws Throwable {
+            try {
+                close();
+                if (size > 0) {
+                    logger.warn("Orphaned chunk of " + size + " bytes found during finalize");
+                }
+                // TODO: log error here, but we can't use SFDC logging
+                // because this runs in an hbase coprocessor.
+                // Create a gack-like API (talk with GridForce or HBase folks)
+            } finally {
+                super.finalize();
+            }
+        }
+        
+        @Override
+        public void close() {
+            synchronized(sync) {
+                usedMemoryBytes -= size;
+                size = 0;
+                sync.notifyAll();
+            }
+        }
+    }
+}
+ 

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/memory/InsufficientMemoryException.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/memory/InsufficientMemoryException.java b/phoenix-core/src/main/java/org/apache/phoenix/memory/InsufficientMemoryException.java
new file mode 100644
index 0000000..8e7eaff
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/memory/InsufficientMemoryException.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.memory;
+
+/**
+ * 
+ * Exception thrown by MemoryManager when insufficient memory is available
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class InsufficientMemoryException extends RuntimeException {
+	private static final long serialVersionUID = 1L;
+
+    public InsufficientMemoryException() {
+    }
+
+    public InsufficientMemoryException(String message) {
+        super(message);
+    }
+
+    public InsufficientMemoryException(Throwable cause) {
+        super(cause);
+    }
+
+    public InsufficientMemoryException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/memory/MemoryManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/memory/MemoryManager.java b/phoenix-core/src/main/java/org/apache/phoenix/memory/MemoryManager.java
new file mode 100644
index 0000000..e9b9355
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/memory/MemoryManager.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.memory;
+
+import java.io.Closeable;
+
+/**
+ * 
+ * Memory manager used to track memory usage.  Either throttles
+ * memory usage by blocking when the max memory is reached or
+ * allocates up to a maximum without blocking.
+ * 
+ * @author jtaylor
+ * @since 0.1
+ */
+public interface MemoryManager {
+    /**
+     * Get the total amount of memory (in bytes) that may be allocated.
+     */
+    long getMaxMemory();
+    
+    /**
+     * Get the amount of available memory (in bytes) not yet allocated.
+     */
+    long getAvailableMemory();
+    
+    /**
+     * Allocate up to reqBytes of memory, dialing the amount down to 
+     * minBytes if full amount is not available.  If minBytes is not
+     * available, then this call will block for a configurable amount
+     * of time and throw if minBytes does not become available.
+     * @param minBytes minimum number of bytes required
+     * @param reqBytes requested number of bytes.  Must be greater
+     * than or equal to minBytes
+     * @return MemoryChunk that was allocated
+     * @throws InsufficientMemoryException if unable to allocate minBytes
+     *  during configured amount of time
+     */
+    MemoryChunk allocate(long minBytes, long reqBytes);
+
+    /**
+     * Equivalent to calling {@link #allocate(long, long)} where
+     * minBytes and reqBytes being the same.
+     */
+    MemoryChunk allocate(long nBytes);
+    
+    /**
+     * 
+     * Chunk of allocated memory.  To reclaim the memory, call {@link #close()}
+     *
+     * @author jtaylor
+     * @since 0.1
+     */
+    public static interface MemoryChunk extends Closeable {
+        /**
+         * Get the size in bytes of the allocated chunk.
+         */
+        long getSize();
+        
+        /**
+         * Free up the memory associated with this chunk
+         */
+        @Override
+        void close();
+        
+        /**
+         * Resize an already allocated memory chunk up or down to a
+         * new amount.  If decreasing allocation, this call will not block.
+         * If increasing allocation, and nBytes is not available,  then
+         * this call will block for a configurable amount of time and
+         * throw if nBytes does not become available.  Most commonly
+         * used to adjust the allocation of a memory buffer that was
+         * originally sized for the worst case scenario.
+         * @param nBytes new number of bytes required for this chunk
+         * @throws InsufficientMemoryException if unable to allocate minBytes
+         *  during configured amount of time
+         */
+        void resize(long nBytes); 
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
new file mode 100644
index 0000000..b60f742
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
@@ -0,0 +1,239 @@
+package org.apache.phoenix.optimize;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.apache.phoenix.compile.ColumnProjector;
+import org.apache.phoenix.compile.IndexStatementRewriter;
+import org.apache.phoenix.compile.QueryCompiler;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.parse.HintNode;
+import org.apache.phoenix.parse.HintNode.Hint;
+import org.apache.phoenix.parse.ParseNodeFactory;
+import org.apache.phoenix.parse.SelectStatement;
+import org.apache.phoenix.parse.TableNode;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.ColumnNotFoundException;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PDatum;
+import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+
+public class QueryOptimizer {
+    private static final ParseNodeFactory FACTORY = new ParseNodeFactory();
+
+    private final QueryServices services;
+    private final boolean useIndexes;
+
+    public QueryOptimizer(QueryServices services) {
+        this.services = services;
+        this.useIndexes = this.services.getProps().getBoolean(QueryServices.USE_INDEXES_ATTRIB, QueryServicesOptions.DEFAULT_USE_INDEXES);
+    }
+
+    public QueryPlan optimize(SelectStatement select, PhoenixStatement statement) throws SQLException {
+        return optimize(select, statement, Collections.<PColumn>emptyList(), null);
+    }
+
+    public QueryPlan optimize(SelectStatement select, PhoenixStatement statement, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory) throws SQLException {
+        QueryCompiler compiler = new QueryCompiler(statement, targetColumns, parallelIteratorFactory);
+        QueryPlan dataPlan = compiler.compile(select);
+        if (!useIndexes || select.getFrom().size() > 1) {
+            return dataPlan;
+        }
+        // Get the statement as it's been normalized now
+        // TODO: the recompile for the index tables could skip the normalize step
+        select = (SelectStatement)dataPlan.getStatement();
+        PTable dataTable = dataPlan.getTableRef().getTable();
+        List<PTable>indexes = Lists.newArrayList(dataTable.getIndexes());
+        if (indexes.isEmpty() || dataPlan.getTableRef().hasDynamicCols() || select.getHint().hasHint(Hint.NO_INDEX)) {
+            return dataPlan;
+        }
+        
+        // The targetColumns is set for UPSERT SELECT to ensure that the proper type conversion takes place.
+        // For a SELECT, it is empty. In this case, we want to set the targetColumns to match the projection
+        // from the dataPlan to ensure that the metadata for when an index is used matches the metadata for
+        // when the data table is used.
+        if (targetColumns.isEmpty()) {
+            List<? extends ColumnProjector> projectors = dataPlan.getProjector().getColumnProjectors();
+            List<PDatum> targetDatums = Lists.newArrayListWithExpectedSize(projectors.size());
+            for (ColumnProjector projector : projectors) {
+                targetDatums.add(projector.getExpression());
+            }
+            targetColumns = targetDatums;
+        }
+        
+        SelectStatement translatedIndexSelect = IndexStatementRewriter.translate(select, dataPlan.getContext().getResolver());
+        List<QueryPlan> plans = Lists.newArrayListWithExpectedSize(1 + indexes.size());
+        plans.add(dataPlan);
+        QueryPlan hintedPlan = getHintedQueryPlan(statement, translatedIndexSelect, indexes, targetColumns, parallelIteratorFactory, plans);
+        if (hintedPlan != null) {
+            return hintedPlan;
+        }
+        for (PTable index : indexes) {
+            addPlan(statement, translatedIndexSelect, index, targetColumns, parallelIteratorFactory, plans);
+        }
+        
+        return chooseBestPlan(select, plans);
+    }
+    
+    private static QueryPlan getHintedQueryPlan(PhoenixStatement statement, SelectStatement select, List<PTable> indexes, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, List<QueryPlan> plans) throws SQLException {
+        QueryPlan dataPlan = plans.get(0);
+        String indexHint = select.getHint().getHint(Hint.INDEX);
+        if (indexHint == null) {
+            return null;
+        }
+        int startIndex = 0;
+        String alias = dataPlan.getTableRef().getTableAlias();
+        String prefix = HintNode.PREFIX + (alias == null ? dataPlan.getTableRef().getTable().getName().getString() : alias) + HintNode.SEPARATOR;
+        while (startIndex < indexHint.length()) {
+            startIndex = indexHint.indexOf(prefix, startIndex);
+            if (startIndex < 0) {
+                return null;
+            }
+            startIndex += prefix.length();
+            boolean done = false; // true when SUFFIX found
+            while (startIndex < indexHint.length() && !done) {
+                int endIndex;
+                int endIndex1 = indexHint.indexOf(HintNode.SEPARATOR, startIndex);
+                int endIndex2 = indexHint.indexOf(HintNode.SUFFIX, startIndex);
+                if (endIndex1 < 0 && endIndex2 < 0) { // Missing SUFFIX shouldn't happen
+                    endIndex = indexHint.length();
+                } else if (endIndex1 < 0) {
+                    done = true;
+                    endIndex = endIndex2;
+                } else if (endIndex2 < 0) {
+                    endIndex = endIndex1;
+                } else {
+                    endIndex = Math.min(endIndex1, endIndex2);
+                    done = endIndex2 == endIndex;
+                }
+                String indexName = indexHint.substring(startIndex, endIndex);
+                int indexPos = getIndexPosition(indexes, indexName);
+                if (indexPos >= 0) {
+                    // Hinted index is applicable, so return it. It'll be the plan at position 1, after the data plan
+                    if (addPlan(statement, select, indexes.get(indexPos), targetColumns, parallelIteratorFactory, plans)) {
+                        return plans.get(1);
+                    }
+                    indexes.remove(indexPos);
+                }
+                startIndex = endIndex + 1;
+            }
+        }
+        return null;
+    }
+    
+    private static int getIndexPosition(List<PTable> indexes, String indexName) {
+        for (int i = 0; i < indexes.size(); i++) {
+            if (indexName.equals(indexes.get(i).getTableName().getString())) {
+                return i;
+            }
+        }
+        return -1;
+    }
+    
+    private static boolean addPlan(PhoenixStatement statement, SelectStatement select, PTable index, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, List<QueryPlan> plans) throws SQLException {
+        QueryPlan dataPlan = plans.get(0);
+        int nColumns = dataPlan.getProjector().getColumnCount();
+        String alias = '"' + dataPlan.getTableRef().getTableAlias() + '"'; // double quote in case it's case sensitive
+        String schemaName = dataPlan.getTableRef().getTable().getSchemaName().getString();
+        schemaName = schemaName.length() == 0 ? null :  '"' + schemaName + '"';
+
+        String tableName = '"' + index.getTableName().getString() + '"';
+        List<? extends TableNode> tables = Collections.singletonList(FACTORY.namedTable(alias, FACTORY.table(schemaName, tableName)));
+        try {
+            SelectStatement indexSelect = FACTORY.select(select, tables);            
+            QueryCompiler compiler = new QueryCompiler(statement, targetColumns, parallelIteratorFactory);
+            QueryPlan plan = compiler.compile(indexSelect);
+            // Checking the index status and number of columns handles the wildcard cases correctly
+            // We can't check the status earlier, because the index table may be out-of-date.
+            if (plan.getTableRef().getTable().getIndexState() == PIndexState.ACTIVE && plan.getProjector().getColumnCount() == nColumns) {
+                plans.add(plan);
+                return true;
+            }
+        } catch (ColumnNotFoundException e) {
+            /* Means that a column is being used that's not in our index.
+             * Since we currently don't keep stats, we don't know the selectivity of the index.
+             * For now, we just don't use this index (as opposed to trying to join back from
+             * the index table to the data table.
+             */
+        }
+        return false;
+    }
+    
+    /**
+     * Choose the best plan among all the possible ones.
+     * Since we don't keep stats yet, we use the following simple algorithm:
+     * 1) If the query has an ORDER BY and a LIMIT, choose the plan that has all the ORDER BY expression
+     * in the same order as the row key columns.
+     * 2) If there are more than one plan that meets (1), choose the plan with:
+     *    a) the most row key columns that may be used to form the start/stop scan key.
+     *    b) the plan that preserves ordering for a group by.
+     *    c) the data table plan
+     * @param plans the list of candidate plans
+     * @return
+     */
+    private QueryPlan chooseBestPlan(SelectStatement select, List<QueryPlan> plans) {
+        QueryPlan firstPlan = plans.get(0);
+        if (plans.size() == 1) {
+            return firstPlan;
+        }
+        
+        List<QueryPlan> candidates = Lists.newArrayListWithExpectedSize(plans.size());
+        if (firstPlan.getLimit() == null) {
+            candidates.addAll(plans);
+        } else {
+            for (QueryPlan plan : plans) {
+                // If ORDER BY optimized out (or not present at all)
+                if (plan.getOrderBy().getOrderByExpressions().isEmpty()) {
+                    candidates.add(plan);
+                }
+            }
+            if (candidates.isEmpty()) {
+                candidates.addAll(plans);
+            }
+        }
+        final int comparisonOfDataVersusIndexTable = select.getHint().hasHint(Hint.USE_DATA_OVER_INDEX_TABLE) ? -1 : 1;
+        Collections.sort(candidates, new Comparator<QueryPlan>() {
+
+            @Override
+            public int compare(QueryPlan plan1, QueryPlan plan2) {
+                int c = plan2.getContext().getScanRanges().getRanges().size() - plan1.getContext().getScanRanges().getRanges().size();
+                if (c != 0) return c;
+                if (plan1.getGroupBy()!=null && plan2.getGroupBy()!=null) {
+                    if (plan1.getGroupBy().isOrderPreserving() != plan2.getGroupBy().isOrderPreserving()) {
+                        return plan1.getGroupBy().isOrderPreserving() ? -1 : 1;
+                    }
+                }
+                // Use smaller table (table with fewest kv columns)
+                PTable table1 = plan1.getTableRef().getTable();
+                PTable table2 = plan2.getTableRef().getTable();
+                c = (table1.getColumns().size() - table1.getPKColumns().size()) - (table2.getColumns().size() - table2.getPKColumns().size());
+                if (c != 0) return c;
+                
+                // All things being equal, just use the index table
+                // TODO: have hint that drives this
+                if (plan1.getTableRef().getTable().getType() == PTableType.INDEX) {
+                    return comparisonOfDataVersusIndexTable;
+                }
+                if (plan2.getTableRef().getTable().getType() == PTableType.INDEX) {
+                    return -comparisonOfDataVersusIndexTable;
+                }
+                
+                return 0;
+            }
+            
+        });
+        
+        return candidates.get(0);
+        
+    }
+
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/parse/AddColumnStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/AddColumnStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/AddColumnStatement.java
new file mode 100644
index 0000000..1059fce
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/AddColumnStatement.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.phoenix.schema.PTableType;
+
+public class AddColumnStatement extends AlterTableStatement {
+    private final List<ColumnDef> columnDefs;
+    private final boolean ifNotExists;
+    private final Map<String,Object> props;
+    
+    protected AddColumnStatement(NamedTableNode table, PTableType tableType, List<ColumnDef> columnDefs, boolean ifNotExists, Map<String, Object> props) {
+        super(table, tableType);
+        this.columnDefs = columnDefs;
+        this.props = props == null ? Collections.<String,Object>emptyMap() : props;
+        this.ifNotExists = ifNotExists;
+    }
+
+    public List<ColumnDef> getColumnDefs() {
+        return columnDefs;
+    }
+    
+    public boolean ifNotExists() {
+        return ifNotExists;
+    }
+
+    public Map<String,Object> getProps() {
+        return props;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/parse/AddParseNode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/AddParseNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/AddParseNode.java
new file mode 100644
index 0000000..f4ae491
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/AddParseNode.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+
+
+
+/**
+ * 
+ * Node representing addition in a SQL expression
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class AddParseNode extends ArithmeticParseNode {
+
+    AddParseNode(List<ParseNode> children) {
+        super(children);
+    }
+
+    @Override
+    public <T> T accept(ParseNodeVisitor<T> visitor) throws SQLException {
+        List<T> l = Collections.emptyList();
+        if (visitor.visitEnter(this)) {
+            l = acceptChildren(visitor);
+        }
+        return visitor.visitLeave(this, l);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/parse/AggregateFunctionParseNode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/AggregateFunctionParseNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/AggregateFunctionParseNode.java
new file mode 100644
index 0000000..a715dd7
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/AggregateFunctionParseNode.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import java.util.List;
+
+
+public class AggregateFunctionParseNode extends FunctionParseNode {
+
+    public AggregateFunctionParseNode(String name, List<ParseNode> children, BuiltInFunctionInfo info) {
+        super(name, children, info);
+    }
+    
+    /**
+     * Aggregate function are not stateless, even though all the args may be stateless,
+     * for example, COUNT(1)
+     */
+    @Override
+    public boolean isStateless() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/parse/AliasedNode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/AliasedNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/AliasedNode.java
new file mode 100644
index 0000000..d2302fd
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/AliasedNode.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import org.apache.phoenix.util.SchemaUtil;
+
+/**
+ *
+ * Node representing an aliased parse node in a SQL select clause
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class AliasedNode {
+    private final String alias;
+    private final ParseNode node;
+    private final boolean isCaseSensitve;
+
+    public AliasedNode(String alias, ParseNode node) {
+        this.isCaseSensitve = alias != null && SchemaUtil.isCaseSensitive(alias);
+        this.alias = alias == null ? null : SchemaUtil.normalizeIdentifier(alias);
+        this.node = node;
+    }
+
+    public String getAlias() {
+        return alias;
+    }
+
+    public ParseNode getNode() {
+        return node;
+    }
+
+    public boolean isCaseSensitve() {
+        return isCaseSensitve;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/parse/AlterIndexStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/AlterIndexStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/AlterIndexStatement.java
new file mode 100644
index 0000000..bee7498
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/AlterIndexStatement.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import org.apache.phoenix.schema.PIndexState;
+
+public class AlterIndexStatement extends SingleTableSQLStatement {
+    private final String dataTableName;
+    private final boolean ifExists;
+    private final PIndexState indexState;
+
+    public AlterIndexStatement(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState indexState) {
+        super(indexTableNode,0);
+        this.dataTableName = dataTableName;
+        this.ifExists = ifExists;
+        this.indexState = indexState;
+    }
+
+    public String getTableName() {
+        return dataTableName;
+    }
+
+    @Override
+    public int getBindCount() {
+        return 0;
+    }
+
+    public boolean ifExists() {
+        return ifExists;
+    }
+
+    public PIndexState getIndexState() {
+        return indexState;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/parse/AlterTableStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/AlterTableStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/AlterTableStatement.java
new file mode 100644
index 0000000..e6d4c80
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/AlterTableStatement.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import org.apache.phoenix.schema.PTableType;
+
+public abstract class AlterTableStatement extends SingleTableSQLStatement {
+    private final PTableType tableType;
+
+    AlterTableStatement(NamedTableNode table, PTableType tableType) {
+        super(table, 0);
+        this.tableType = tableType;
+    }
+
+    public PTableType getTableType() {
+        return tableType;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/parse/AndParseNode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/AndParseNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/AndParseNode.java
new file mode 100644
index 0000000..452d893
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/AndParseNode.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+
+
+
+
+/**
+ * 
+ * Node representing AND in a SQL expression
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class AndParseNode extends CompoundParseNode {
+
+    AndParseNode(List<ParseNode> children) {
+        super(children);
+    }
+
+    @Override
+    public <T> T accept(ParseNodeVisitor<T> visitor) throws SQLException {
+        List<T> l = Collections.emptyList();
+        if (visitor.visitEnter(this)) {
+            l = acceptChildren(visitor);
+        }
+        return visitor.visitLeave(this, l);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/parse/ArithmeticParseNode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ArithmeticParseNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ArithmeticParseNode.java
new file mode 100644
index 0000000..4e90960
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ArithmeticParseNode.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import java.util.List;
+
+public abstract class ArithmeticParseNode extends CompoundParseNode {
+
+    public ArithmeticParseNode(List<ParseNode> children) {
+        super(children);
+    }
+
+}