You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/01/27 20:23:43 UTC

[42/51] [partial] Initial commit

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/cache/HashCache.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/cache/HashCache.java b/src/main/java/org/apache/phoenix/cache/HashCache.java
new file mode 100644
index 0000000..5f8348b
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/cache/HashCache.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.cache;
+
+import java.io.Closeable;
+import java.util.List;
+
+import org.apache.http.annotation.Immutable;
+
+import org.apache.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+
+/**
+ * Encapsulate deserialized hash cache from bytes into Map.
+ * The Map uses the row key as the key and the row as the value.
+ * @author jtaylor
+ * @since 0.1
+ */
+@Immutable
+public interface HashCache extends Closeable {
+    public List<Tuple> get(ImmutableBytesPtr hashKey);
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java b/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
new file mode 100644
index 0000000..fac78de
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
@@ -0,0 +1,10 @@
+package org.apache.phoenix.cache;
+
+import java.io.Closeable;
+import java.util.List;
+
+import org.apache.phoenix.index.IndexMaintainer;
+
+public interface IndexMetaDataCache extends Closeable {
+    public List<IndexMaintainer> getIndexMaintainers();
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java b/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
new file mode 100644
index 0000000..c5b052d
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
@@ -0,0 +1,278 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.cache;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.coprocessor.ServerCachingProtocol;
+import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.job.JobManager.JobCallable;
+import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.Closeables;
+import org.apache.phoenix.util.SQLCloseable;
+import org.apache.phoenix.util.SQLCloseables;
+
+/**
+ * 
+ * Client for sending cache to each region server
+ * 
+ * @author jtaylor
+ * @since 0.1
+ */
+public class ServerCacheClient {
+    public static final int UUID_LENGTH = Bytes.SIZEOF_LONG;
+    private static final Log LOG = LogFactory.getLog(ServerCacheClient.class);
+    private static final Random RANDOM = new Random();
+    private final PhoenixConnection connection;
+    private final TableRef cacheUsingTableRef;
+
+    /**
+     * Construct client used to create a serialized cached snapshot of a table and send it to each region server
+     * for caching during hash join processing.
+     * @param connection the client connection
+     * @param cacheUsingTableRef table name
+     * 
+     * TODO: instead of minMaxKeyRange, have an interface for iterating through ranges as we may be sending to
+     * servers when we don't have to if the min is in first region and max is in last region, especially for point queries.
+     */
+    public ServerCacheClient(PhoenixConnection connection, TableRef cacheUsingTableRef) {
+        this.connection = connection;
+        this.cacheUsingTableRef = cacheUsingTableRef;
+    }
+
+    public PhoenixConnection getConnection() {
+        return connection;
+    }
+    
+    public TableRef getTableRef() {
+        return cacheUsingTableRef;
+    }
+    
+    /**
+     * Client-side representation of a server cache.  Call {@link #close()} when usage
+     * is complete to free cache up on region server
+     *
+     * @author jtaylor
+     * @since 0.1
+     */
+    public class ServerCache implements SQLCloseable {
+        private final int size;
+        private final byte[] id;
+        private final ImmutableSet<HRegionLocation> servers;
+        
+        public ServerCache(byte[] id, Set<HRegionLocation> servers, int size) {
+            this.id = id;
+            this.servers = ImmutableSet.copyOf(servers);
+            this.size = size;
+        }
+
+        /**
+         * Gets the size in bytes of hash cache
+         */
+        public int getSize() {
+            return size;
+        }
+
+        /**
+         * Gets the unique identifier for this hash cache
+         */
+        public byte[] getId() {
+            return id;
+        }
+
+        /**
+         * Call to free up cache on region servers when no longer needed
+         */
+        @Override
+        public void close() throws SQLException {
+            removeServerCache(id, servers);
+        }
+
+    }
+    
+    public ServerCache addServerCache(ScanRanges keyRanges, final ImmutableBytesWritable cachePtr, final ServerCacheFactory cacheFactory) throws SQLException {
+        ConnectionQueryServices services = connection.getQueryServices();
+        MemoryChunk chunk = services.getMemoryManager().allocate(cachePtr.getLength());
+        List<Closeable> closeables = new ArrayList<Closeable>();
+        closeables.add(chunk);
+        ServerCache hashCacheSpec = null;
+        SQLException firstException = null;
+        final byte[] cacheId = generateId();
+        /**
+         * Execute EndPoint in parallel on each server to send compressed hash cache 
+         */
+        // TODO: generalize and package as a per region server EndPoint caller
+        // (ideally this would be functionality provided by the coprocessor framework)
+        boolean success = false;
+        ExecutorService executor = services.getExecutor();
+        List<Future<Boolean>> futures = Collections.emptyList();
+        try {
+            List<HRegionLocation> locations = services.getAllTableRegions(cacheUsingTableRef.getTable().getName().getBytes());
+            int nRegions = locations.size();
+            // Size these based on worst case
+            futures = new ArrayList<Future<Boolean>>(nRegions);
+            Set<HRegionLocation> servers = new HashSet<HRegionLocation>(nRegions);
+            for (HRegionLocation entry : locations) {
+                // Keep track of servers we've sent to and only send once
+                if ( ! servers.contains(entry) && 
+                        keyRanges.intersect(entry.getRegionInfo().getStartKey(), entry.getRegionInfo().getEndKey())) {  // Call RPC once per server
+                    servers.add(entry);
+                    if (LOG.isDebugEnabled()) {LOG.debug("Adding cache entry to be sent for " + entry);}
+                    final byte[] key = entry.getRegionInfo().getStartKey();
+                    final HTableInterface htable = services.getTable(cacheUsingTableRef.getTable().getName().getBytes());
+                    closeables.add(htable);
+                    futures.add(executor.submit(new JobCallable<Boolean>() {
+                        
+                        @Override
+                        public Boolean call() throws Exception {
+                            ServerCachingProtocol protocol = htable.coprocessorProxy(ServerCachingProtocol.class, key);
+                            return protocol.addServerCache(connection.getTenantId(), cacheId, cachePtr, cacheFactory);
+                        }
+
+                        /**
+                         * Defines the grouping for round robin behavior.  All threads spawned to process
+                         * this scan will be grouped together and time sliced with other simultaneously
+                         * executing parallel scans.
+                         */
+                        @Override
+                        public Object getJobId() {
+                            return ServerCacheClient.this;
+                        }
+                    }));
+                } else {
+                    if (LOG.isDebugEnabled()) {LOG.debug("NOT adding cache entry to be sent for " + entry + " since one already exists for that entry");}
+                }
+            }
+            
+            hashCacheSpec = new ServerCache(cacheId,servers,cachePtr.getLength());
+            // Execute in parallel
+            int timeoutMs = services.getProps().getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS);
+            for (Future<Boolean> future : futures) {
+                future.get(timeoutMs, TimeUnit.MILLISECONDS);
+            }
+            
+            success = true;
+        } catch (SQLException e) {
+            firstException = e;
+        } catch (Exception e) {
+            firstException = new SQLException(e);
+        } finally {
+            try {
+                if (!success) {
+                    SQLCloseables.closeAllQuietly(Collections.singletonList(hashCacheSpec));
+                    for (Future<Boolean> future : futures) {
+                        future.cancel(true);
+                    }
+                }
+            } finally {
+                try {
+                    Closeables.closeAll(closeables);
+                } catch (IOException e) {
+                    if (firstException == null) {
+                        firstException = new SQLException(e);
+                    }
+                } finally {
+                    if (firstException != null) {
+                        throw firstException;
+                    }
+                }
+            }
+        }
+        return hashCacheSpec;
+    }
+    
+    /**
+     * Remove the cached table from all region servers
+     * @param cacheId unique identifier for the hash join (returned from {@link #addHashCache(HTable, Scan, Set)})
+     * @param servers list of servers upon which table was cached (filled in by {@link #addHashCache(HTable, Scan, Set)})
+     * @throws SQLException
+     * @throws IllegalStateException if hashed table cannot be removed on any region server on which it was added
+     */
+    private void removeServerCache(byte[] cacheId, Set<HRegionLocation> servers) throws SQLException {
+        ConnectionQueryServices services = connection.getQueryServices();
+        Throwable lastThrowable = null;
+        byte[] tableName = cacheUsingTableRef.getTable().getName().getBytes();
+        HTableInterface iterateOverTable = services.getTable(tableName);
+        List<HRegionLocation> locations = services.getAllTableRegions(tableName);
+        Set<HRegionLocation> remainingOnServers = new HashSet<HRegionLocation>(servers);
+        /**
+         * Allow for the possibility that the region we based where to send our cache has split and been
+         * relocated to another region server *after* we sent it, but before we removed it. To accommodate
+         * this, we iterate through the current metadata boundaries and remove the cache once for each
+         * server that we originally sent to.
+         */
+        for (HRegionLocation entry : locations) {
+            if (remainingOnServers.contains(entry)) {  // Call once per server
+                try {
+                    byte[] key = entry.getRegionInfo().getStartKey();
+                    ServerCachingProtocol protocol = iterateOverTable.coprocessorProxy(ServerCachingProtocol.class, key);
+                    protocol.removeServerCache(connection.getTenantId(), cacheId);
+                    remainingOnServers.remove(entry);
+                } catch (Throwable t) {
+                    lastThrowable = t;
+                    LOG.error("Error trying to remove hash cache for " + entry, t);
+                }
+            }
+        }
+        if (!remainingOnServers.isEmpty()) {
+            LOG.warn("Unable to remove hash cache for " + remainingOnServers, lastThrowable);
+        }
+    }
+
+    /**
+     * Create an ID to keep the cached information across other operations independent.
+     * Using simple long random number, since the length of time we need this to be unique
+     * is very limited. 
+     */
+    public static byte[] generateId() {
+        long rand = RANDOM.nextLong();
+        return Bytes.toBytes(rand);
+    }
+    
+    public static String idToString(byte[] uuid) {
+        assert(uuid.length == Bytes.SIZEOF_LONG);
+        return Long.toString(Bytes.toLong(uuid));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/cache/TenantCache.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/cache/TenantCache.java b/src/main/java/org/apache/phoenix/cache/TenantCache.java
new file mode 100644
index 0000000..acddac9
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/cache/TenantCache.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.cache;
+
+import java.io.Closeable;
+import java.sql.SQLException;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
+import org.apache.phoenix.memory.MemoryManager;
+
+
+/**
+ * 
+ * Inteface to set and set cached values for a tenant
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public interface TenantCache {
+    MemoryManager getMemoryManager();
+    Closeable getServerCache(ImmutableBytesPtr cacheId);
+    Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, ServerCacheFactory cacheFactory) throws SQLException;
+    void removeServerCache(ImmutableBytesPtr cacheId) throws SQLException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java b/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
new file mode 100644
index 0000000..0cdb888
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.cache;
+
+import java.io.Closeable;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import com.google.common.cache.*;
+import org.apache.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
+import org.apache.phoenix.memory.MemoryManager;
+import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.util.Closeables;
+
+/**
+ * 
+ * Cache per tenant on server side.  Tracks memory usage for each
+ * tenat as well and rolling up usage to global memory manager.
+ * 
+ * @author jtaylor
+ * @since 0.1
+ */
+public class TenantCacheImpl implements TenantCache {
+    private final int maxTimeToLiveMs;
+    private final MemoryManager memoryManager;
+    private volatile Cache<ImmutableBytesPtr, Closeable> serverCaches;
+
+    public TenantCacheImpl(MemoryManager memoryManager, int maxTimeToLiveMs) {
+        this.memoryManager = memoryManager;
+        this.maxTimeToLiveMs = maxTimeToLiveMs;
+    }
+    
+    @Override
+    public MemoryManager getMemoryManager() {
+        return memoryManager;
+    }
+
+    private Cache<ImmutableBytesPtr,Closeable> getServerCaches() {
+        /* Delay creation of this map until it's needed */
+        if (serverCaches == null) {
+            synchronized(this) {
+                if (serverCaches == null) {
+                    serverCaches = CacheBuilder.newBuilder()
+                        .expireAfterAccess(maxTimeToLiveMs, TimeUnit.MILLISECONDS)
+                        .removalListener(new RemovalListener<ImmutableBytesPtr, Closeable>(){
+                            @Override
+                            public void onRemoval(RemovalNotification<ImmutableBytesPtr, Closeable> notification) {
+                                Closeables.closeAllQuietly(Collections.singletonList(notification.getValue()));
+                            }
+                        })
+                        .build();
+                }
+            }
+        }
+        return serverCaches;
+    }
+    
+    @Override
+    public Closeable getServerCache(ImmutableBytesPtr cacheId) {
+        return getServerCaches().getIfPresent(cacheId);
+    }
+    
+    @Override
+    public Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, ServerCacheFactory cacheFactory) throws SQLException {
+        MemoryChunk chunk = this.getMemoryManager().allocate(cachePtr.getLength());
+        Closeable element = cacheFactory.newCache(cachePtr, chunk);
+        getServerCaches().put(cacheId, element);
+        return element;
+    }
+    
+    @Override
+    public void removeServerCache(ImmutableBytesPtr cacheId) throws SQLException {
+        getServerCaches().invalidate(cacheId);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/compile/AggregationManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/compile/AggregationManager.java b/src/main/java/org/apache/phoenix/compile/AggregationManager.java
new file mode 100644
index 0000000..70ea3a2
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/compile/AggregationManager.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import org.apache.phoenix.expression.aggregator.ClientAggregators;
+
+/**
+ * 
+ * Class that manages aggregations during query compilation
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class AggregationManager {
+    private ClientAggregators aggregators;
+    private int position = 0;
+    
+    public AggregationManager() {
+    }
+
+    public ClientAggregators getAggregators() {
+        return aggregators;
+    }
+    
+    /**
+     * @return allocate the next available zero-based positional index
+     * for the client-side aggregate function.
+     */
+    protected int nextPosition() {
+        return position++;
+    }
+    
+    public void setAggregators(ClientAggregators clientAggregator) {
+        this.aggregators = clientAggregator;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/compile/BindManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/compile/BindManager.java b/src/main/java/org/apache/phoenix/compile/BindManager.java
new file mode 100644
index 0000000..4343d41
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/compile/BindManager.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.sql.ParameterMetaData;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
+import org.apache.phoenix.parse.BindParseNode;
+import org.apache.phoenix.schema.PDatum;
+
+
+/**
+ * 
+ * Class that manages binding parameters and checking type matching. There are
+ * two main usages:
+ * 
+ * 1) the standard query case where we have the values for the binds.
+ * 2) the retrieve param metadata case where we don't have the bind values.
+ * 
+ * In both cases, during query compilation we figure out what type the bind variable
+ * "should" be, based on how it's used in the query. For example foo < ? would expect
+ * that the bind variable type matches or can be coerced to the type of foo. For (1),
+ * we check that the bind value has the correct type and for (2) we set the param
+ * metadata type.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class BindManager {
+    public static final Object UNBOUND_PARAMETER = new Object();
+
+    private final List<Object> binds;
+    private final PhoenixParameterMetaData bindMetaData;
+
+    public BindManager(List<Object> binds, int bindCount) {
+        this.binds = binds;
+        this.bindMetaData = new PhoenixParameterMetaData(bindCount);
+    }
+
+    public ParameterMetaData getParameterMetaData() {
+        return bindMetaData;
+    }
+    
+    public Object getBindValue(BindParseNode node) throws SQLException {
+        int index = node.getIndex();
+        if (index < 0 || index >= binds.size()) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.PARAM_INDEX_OUT_OF_BOUND)
+                .setMessage("binds size: " + binds.size() + "; index: " + index).build().buildException();
+        }
+        Object value = binds.get(index);
+        if (value == UNBOUND_PARAMETER) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.PARAM_VALUE_UNBOUND)
+            .setMessage(node.toString()).build().buildException();
+        }
+        return value;
+    }
+
+    public void addParamMetaData(BindParseNode bind, PDatum column) throws SQLException {
+        bindMetaData.addParam(bind,column);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/compile/ColumnProjector.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/compile/ColumnProjector.java b/src/main/java/org/apache/phoenix/compile/ColumnProjector.java
new file mode 100644
index 0000000..e8a18e8
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/compile/ColumnProjector.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.sql.SQLException;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+
+
+/**
+ * 
+ * Interface used to access the value of a projected column.
+ * 
+ * @author jtaylor
+ * @since 0.1
+ */
+public interface ColumnProjector {
+    /**
+     * Get the column name as it was referenced in the query
+     * @return the database column name
+     */
+    String getName();
+    
+    /**
+     * Get the expression
+     * @return the expression for the column projector
+     */
+    public Expression getExpression();
+    
+    /**
+     * Get the name of the hbase table containing the column
+     * @return the hbase table name
+     */
+    String getTableName();
+    
+    /**
+     * Get the value of the column, coercing it if necessary to the specified type
+     * @param tuple the row containing the column
+     * @param type the type to which to coerce the binary value
+     * @param ptr used to retrieve the value
+     * @return the object representation of the column value.
+     * @throws SQLException
+     */
+    Object getValue(Tuple tuple, PDataType type, ImmutableBytesWritable ptr) throws SQLException;
+    
+    boolean isCaseSensitive();
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/compile/ColumnResolver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/compile/ColumnResolver.java b/src/main/java/org/apache/phoenix/compile/ColumnResolver.java
new file mode 100644
index 0000000..49a1947
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/compile/ColumnResolver.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.phoenix.schema.ColumnRef;
+import org.apache.phoenix.schema.TableRef;
+
+
+
+/**
+ * 
+ * Interface used to resolve column references occurring
+ * in the select statement.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public interface ColumnResolver {
+    
+    /**
+     * Returns the collection of resolved tables in the FROM clause.
+     */
+    public List<TableRef> getTables();
+    
+    /**
+     * Resolves column using name and alias.
+     * @param schemaName TODO
+     * @param tableName TODO
+     * @param colName TODO
+     * @return the resolved ColumnRef
+     * @throws ColumnNotFoundException if the column could not be resolved
+     * @throws AmbiguousColumnException if the column name is ambiguous
+     */
+    public ColumnRef resolveColumn(String schemaName, String tableName, String colName) throws SQLException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java b/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
new file mode 100644
index 0000000..3006a9e
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.sql.ParameterMetaData;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Scan;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.parse.CreateIndexStatement;
+import org.apache.phoenix.parse.ParseNode;
+import org.apache.phoenix.schema.MetaDataClient;
+
+public class CreateIndexCompiler {
+    private final PhoenixStatement statement;
+
+    public CreateIndexCompiler(PhoenixStatement statement) {
+        this.statement = statement;
+    }
+
+    public MutationPlan compile(final CreateIndexStatement create) throws SQLException {
+        final PhoenixConnection connection = statement.getConnection();
+        final ColumnResolver resolver = FromCompiler.getResolver(create, connection);
+        Scan scan = new Scan();
+        final StatementContext context = new StatementContext(create, connection, resolver, statement.getParameters(), scan);
+        ExpressionCompiler expressionCompiler = new ExpressionCompiler(context);
+        List<ParseNode> splitNodes = create.getSplitNodes();
+        final byte[][] splits = new byte[splitNodes.size()][];
+        for (int i = 0; i < splits.length; i++) {
+            ParseNode node = splitNodes.get(i);
+            if (!node.isConstant()) {
+                throw new SQLExceptionInfo.Builder(SQLExceptionCode.SPLIT_POINT_NOT_CONSTANT)
+                    .setMessage("Node: " + node).build().buildException();
+            }
+            LiteralExpression expression = (LiteralExpression)node.accept(expressionCompiler);
+            splits[i] = expression.getBytes();
+        }
+        final MetaDataClient client = new MetaDataClient(connection);
+        
+        return new MutationPlan() {
+
+            @Override
+            public ParameterMetaData getParameterMetaData() {
+                return context.getBindManager().getParameterMetaData();
+            }
+
+            @Override
+            public PhoenixConnection getConnection() {
+                return connection;
+            }
+
+            @Override
+            public MutationState execute() throws SQLException {
+                return client.createIndex(create, splits);
+            }
+
+            @Override
+            public ExplainPlan getExplainPlan() throws SQLException {
+                return new ExplainPlan(Collections.singletonList("CREATE INDEX"));
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java b/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java
new file mode 100644
index 0000000..ab98538
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.sql.ParameterMetaData;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Scan;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.parse.CreateTableStatement;
+import org.apache.phoenix.parse.ParseNode;
+import org.apache.phoenix.schema.MetaDataClient;
+
+
+public class CreateTableCompiler {
+    private final PhoenixStatement statement;
+    
+    public CreateTableCompiler(PhoenixStatement statement) {
+        this.statement = statement;
+    }
+
+    public MutationPlan compile(final CreateTableStatement create) throws SQLException {
+        final PhoenixConnection connection = statement.getConnection();
+        final ColumnResolver resolver = FromCompiler.getResolver(create, connection);
+        Scan scan = new Scan();
+        final StatementContext context = new StatementContext(create, connection, resolver, statement.getParameters(), scan);
+        ExpressionCompiler expressionCompiler = new ExpressionCompiler(context);
+        List<ParseNode> splitNodes = create.getSplitNodes();
+        final byte[][] splits = new byte[splitNodes.size()][];
+        for (int i = 0; i < splits.length; i++) {
+            ParseNode node = splitNodes.get(i);
+            if (!node.isConstant()) {
+                throw new SQLExceptionInfo.Builder(SQLExceptionCode.SPLIT_POINT_NOT_CONSTANT)
+                    .setMessage("Node: " + node).build().buildException();
+            }
+            LiteralExpression expression = (LiteralExpression)node.accept(expressionCompiler);
+            splits[i] = expression.getBytes();
+        }
+        final MetaDataClient client = new MetaDataClient(connection);
+        
+        return new MutationPlan() {
+
+            @Override
+            public ParameterMetaData getParameterMetaData() {
+                return context.getBindManager().getParameterMetaData();
+            }
+
+            @Override
+            public MutationState execute() throws SQLException {
+                return client.createTable(create, splits);
+            }
+
+            @Override
+            public ExplainPlan getExplainPlan() throws SQLException {
+                return new ExplainPlan(Collections.singletonList("CREATE TABLE"));
+            }
+
+            @Override
+            public PhoenixConnection getConnection() {
+                return connection;
+            }
+            
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
new file mode 100644
index 0000000..72b8ca1
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -0,0 +1,352 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.sql.ParameterMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.AggregatePlan;
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.index.IndexMetaDataCacheClient;
+import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.optimize.QueryOptimizer;
+import org.apache.phoenix.parse.AliasedNode;
+import org.apache.phoenix.parse.DeleteStatement;
+import org.apache.phoenix.parse.HintNode;
+import org.apache.phoenix.parse.HintNode.Hint;
+import org.apache.phoenix.parse.ParseNode;
+import org.apache.phoenix.parse.ParseNodeFactory;
+import org.apache.phoenix.parse.SelectStatement;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.query.Scanner;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.ReadOnlyTableException;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.IndexUtil;
+
+public class DeleteCompiler {
+    private static ParseNodeFactory FACTORY = new ParseNodeFactory();
+    
+    private final PhoenixStatement statement;
+    
+    public DeleteCompiler(PhoenixStatement statement) {
+        this.statement = statement;
+    }
+    
+    private static MutationState deleteRows(PhoenixStatement statement, TableRef tableRef, ResultIterator iterator, RowProjector projector) throws SQLException {
+        PhoenixConnection connection = statement.getConnection();
+        final boolean isAutoCommit = connection.getAutoCommit();
+        ConnectionQueryServices services = connection.getQueryServices();
+        final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
+        final int batchSize = Math.min(connection.getMutateBatchSize(), maxSize);
+        Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutations = Maps.newHashMapWithExpectedSize(batchSize);
+        try {
+            PTable table = tableRef.getTable();
+            List<PColumn> pkColumns = table.getPKColumns();
+            int offset = table.getBucketNum() == null ? 0 : 1; // Take into account salting
+            byte[][] values = new byte[pkColumns.size()][];
+            ResultSet rs = new PhoenixResultSet(iterator, projector, statement);
+            int rowCount = 0;
+            while (rs.next()) {
+                for (int i = offset; i < values.length; i++) {
+                    byte[] byteValue = rs.getBytes(i+1-offset);
+                    // The ResultSet.getBytes() call will have inverted it - we need to invert it back.
+                    // TODO: consider going under the hood and just getting the bytes
+                    if (pkColumns.get(i).getColumnModifier() == ColumnModifier.SORT_DESC) {
+                        byte[] tempByteValue = Arrays.copyOf(byteValue, byteValue.length);
+                        byteValue = ColumnModifier.SORT_DESC.apply(byteValue, 0, tempByteValue, 0, byteValue.length);
+                    }
+                    values[i] = byteValue;
+                }
+                ImmutableBytesPtr ptr = new ImmutableBytesPtr();
+                table.newKey(ptr, values);
+                mutations.put(ptr, null);
+                if (mutations.size() > maxSize) {
+                    throw new IllegalArgumentException("MutationState size of " + mutations.size() + " is bigger than max allowed size of " + maxSize);
+                }
+                rowCount++;
+                // Commit a batch if auto commit is true and we're at our batch size
+                if (isAutoCommit && rowCount % batchSize == 0) {
+                    MutationState state = new MutationState(tableRef, mutations, 0, maxSize, connection);
+                    connection.getMutationState().join(state);
+                    connection.commit();
+                    mutations.clear();
+                }
+            }
+
+            // If auto commit is true, this last batch will be committed upon return
+            return new MutationState(tableRef, mutations, rowCount / batchSize * batchSize, maxSize, connection);
+        } finally {
+            iterator.close();
+        }
+    }
+    
+    private static class DeletingParallelIteratorFactory extends MutatingParallelIteratorFactory {
+        private RowProjector projector;
+        
+        private DeletingParallelIteratorFactory(PhoenixConnection connection, TableRef tableRef) {
+            super(connection, tableRef);
+        }
+        
+        @Override
+        protected MutationState mutate(PhoenixConnection connection, ResultIterator iterator) throws SQLException {
+            PhoenixStatement statement = new PhoenixStatement(connection);
+            return deleteRows(statement, tableRef, iterator, projector);
+        }
+        
+        public void setRowProjector(RowProjector projector) {
+            this.projector = projector;
+        }
+        
+    }
+    
+    private boolean hasImmutableIndex(TableRef tableRef) {
+        return tableRef.getTable().isImmutableRows() && !tableRef.getTable().getIndexes().isEmpty();
+    }
+    
+    private boolean hasImmutableIndexWithKeyValueColumns(TableRef tableRef) {
+        if (!hasImmutableIndex(tableRef)) {
+            return false;
+        }
+        for (PTable index : tableRef.getTable().getIndexes()) {
+            for (PColumn column : index.getPKColumns()) {
+                if (!IndexUtil.isDataPKColumn(column)) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+    
+    public MutationPlan compile(DeleteStatement delete) throws SQLException {
+        final PhoenixConnection connection = statement.getConnection();
+        final boolean isAutoCommit = connection.getAutoCommit();
+        final ConnectionQueryServices services = connection.getQueryServices();
+        final ColumnResolver resolver = FromCompiler.getResolver(delete, connection);
+        final TableRef tableRef = resolver.getTables().get(0);
+        if (tableRef.getTable().getType() == PTableType.VIEW) {
+            throw new ReadOnlyTableException("Mutations not allowed for a view (" + tableRef.getTable() + ")");
+        }
+        
+        final boolean hasLimit = delete.getLimit() != null;
+        boolean runOnServer = isAutoCommit && !hasLimit && !hasImmutableIndex(tableRef);
+        HintNode hint = delete.getHint();
+        if (runOnServer && !delete.getHint().hasHint(Hint.USE_INDEX_OVER_DATA_TABLE)) {
+            hint = HintNode.create(hint, Hint.USE_DATA_OVER_INDEX_TABLE);
+        }
+
+        PTable table = tableRef.getTable();
+        List<AliasedNode> aliasedNodes = Lists.newArrayListWithExpectedSize(table.getPKColumns().size());
+        for (int i = table.getBucketNum() == null ? 0 : 1; i < table.getPKColumns().size(); i++) {
+            PColumn column = table.getPKColumns().get(i);
+            String name = column.getName().getString();
+            aliasedNodes.add(FACTORY.aliasedNode(null, FACTORY.column(null, name, name)));
+        }
+        SelectStatement select = FACTORY.select(
+                Collections.singletonList(delete.getTable()), 
+                hint, false, aliasedNodes, delete.getWhere(), 
+                Collections.<ParseNode>emptyList(), null, 
+                delete.getOrderBy(), delete.getLimit(),
+                delete.getBindCount(), false);
+        DeletingParallelIteratorFactory parallelIteratorFactory = hasLimit ? null : new DeletingParallelIteratorFactory(connection, tableRef);
+        final QueryPlan plan = new QueryOptimizer(services).optimize(select, statement, Collections.<PColumn>emptyList(), parallelIteratorFactory);
+        runOnServer &= plan.getTableRef().equals(tableRef);
+        
+        final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
+ 
+        if (hasImmutableIndexWithKeyValueColumns(tableRef)) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.NO_DELETE_IF_IMMUTABLE_INDEX).setSchemaName(tableRef.getTable().getSchemaName().getString())
+            .setTableName(tableRef.getTable().getTableName().getString()).build().buildException();
+        }
+        
+        final StatementContext context = plan.getContext();
+        // If we're doing a query for a single row with no where clause, then we don't need to contact the server at all.
+        // A simple check of the none existence of a where clause in the parse node is not sufficient, as the where clause
+        // may have been optimized out.
+        if (runOnServer && context.isSingleRowScan()) {
+            final ImmutableBytesPtr key = new ImmutableBytesPtr(context.getScan().getStartRow());
+            return new MutationPlan() {
+
+                @Override
+                public ParameterMetaData getParameterMetaData() {
+                    return context.getBindManager().getParameterMetaData();
+                }
+
+                @Override
+                public MutationState execute() {
+                    Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutation = Maps.newHashMapWithExpectedSize(1);
+                    mutation.put(key, null);
+                    return new MutationState(tableRef, mutation, 0, maxSize, connection);
+                }
+
+                @Override
+                public ExplainPlan getExplainPlan() throws SQLException {
+                    return new ExplainPlan(Collections.singletonList("DELETE SINGLE ROW"));
+                }
+
+                @Override
+                public PhoenixConnection getConnection() {
+                    return connection;
+                }
+            };
+        } else if (runOnServer) {
+            // TODO: better abstraction
+            Scan scan = context.getScan();
+            scan.setAttribute(UngroupedAggregateRegionObserver.DELETE_AGG, QueryConstants.TRUE);
+
+            // Build an ungrouped aggregate query: select COUNT(*) from <table> where <where>
+            // The coprocessor will delete each row returned from the scan
+            // Ignoring ORDER BY, since with auto commit on and no limit makes no difference
+            SelectStatement aggSelect = SelectStatement.create(SelectStatement.COUNT_ONE, delete.getHint());
+            final RowProjector projector = ProjectionCompiler.compile(context, aggSelect, GroupBy.EMPTY_GROUP_BY);
+            final QueryPlan aggPlan = new AggregatePlan(context, select, tableRef, projector, null, OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null);
+            return new MutationPlan() {
+
+                @Override
+                public PhoenixConnection getConnection() {
+                    return connection;
+                }
+
+                @Override
+                public ParameterMetaData getParameterMetaData() {
+                    return context.getBindManager().getParameterMetaData();
+                }
+
+                @Override
+                public MutationState execute() throws SQLException {
+                    // TODO: share this block of code with UPSERT SELECT
+                    ImmutableBytesWritable ptr = context.getTempPtr();
+                    tableRef.getTable().getIndexMaintainers(ptr);
+                    ServerCache cache = null;
+                    try {
+                        if (ptr.getLength() > 0) {
+                            IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef);
+                            cache = client.addIndexMetadataCache(context.getScanRanges(), ptr);
+                            byte[] uuidValue = cache.getId();
+                            context.getScan().setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+                        }
+                        Scanner scanner = aggPlan.getScanner();
+                        ResultIterator iterator = scanner.iterator();
+                        try {
+                            Tuple row = iterator.next();
+                            final long mutationCount = (Long)projector.getColumnProjector(0).getValue(row, PDataType.LONG, ptr);
+                            return new MutationState(maxSize, connection) {
+                                @Override
+                                public long getUpdateCount() {
+                                    return mutationCount;
+                                }
+                            };
+                        } finally {
+                            iterator.close();
+                        }
+                    } finally {
+                        if (cache != null) {
+                            cache.close();
+                        }
+                    }
+                }
+
+                @Override
+                public ExplainPlan getExplainPlan() throws SQLException {
+                    List<String> queryPlanSteps =  aggPlan.getExplainPlan().getPlanSteps();
+                    List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
+                    planSteps.add("DELETE ROWS");
+                    planSteps.addAll(queryPlanSteps);
+                    return new ExplainPlan(planSteps);
+                }
+            };
+        } else {
+            if (parallelIteratorFactory != null) {
+                parallelIteratorFactory.setRowProjector(plan.getProjector());
+            }
+            return new MutationPlan() {
+
+                @Override
+                public PhoenixConnection getConnection() {
+                    return connection;
+                }
+
+                @Override
+                public ParameterMetaData getParameterMetaData() {
+                    return context.getBindManager().getParameterMetaData();
+                }
+
+                @Override
+                public MutationState execute() throws SQLException {
+                    Scanner scanner = plan.getScanner();
+                    ResultIterator iterator = scanner.iterator();
+                    if (!hasLimit) {
+                        Tuple tuple;
+                        long totalRowCount = 0;
+                        while ((tuple=iterator.next()) != null) {// Runs query
+                            KeyValue kv = tuple.getValue(0);
+                            totalRowCount += PDataType.LONG.getCodec().decodeLong(kv.getBuffer(), kv.getValueOffset(), null);
+                        }
+                        // Return total number of rows that have been delete. In the case of auto commit being off
+                        // the mutations will all be in the mutation state of the current connection.
+                        return new MutationState(maxSize, connection, totalRowCount);
+                    } else {
+                        return deleteRows(statement, tableRef, iterator, plan.getProjector());
+                    }
+                }
+
+                @Override
+                public ExplainPlan getExplainPlan() throws SQLException {
+                    List<String> queryPlanSteps =  plan.getExplainPlan().getPlanSteps();
+                    List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
+                    planSteps.add("DELETE ROWS");
+                    planSteps.addAll(queryPlanSteps);
+                    return new ExplainPlan(planSteps);
+                }
+            };
+        }
+       
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/compile/ExplainPlan.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/compile/ExplainPlan.java b/src/main/java/org/apache/phoenix/compile/ExplainPlan.java
new file mode 100644
index 0000000..e1049a0
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/compile/ExplainPlan.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+
+public class ExplainPlan {
+    public static final ExplainPlan EMPTY_PLAN = new ExplainPlan(Collections.<String>emptyList());
+
+    private final List<String> planSteps;
+    
+    public ExplainPlan(List<String> planSteps) {
+        this.planSteps = ImmutableList.copyOf(planSteps);
+    }
+    
+    public List<String> getPlanSteps() {
+        return planSteps;
+    }
+}