You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/01/27 20:23:43 UTC
[42/51] [partial] Initial commit
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/cache/HashCache.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/cache/HashCache.java b/src/main/java/org/apache/phoenix/cache/HashCache.java
new file mode 100644
index 0000000..5f8348b
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/cache/HashCache.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.cache;
+
+import java.io.Closeable;
+import java.util.List;
+
+import org.apache.http.annotation.Immutable;
+
+import org.apache.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+
+/**
+ * Encapsulate deserialized hash cache from bytes into Map.
+ * The Map uses the row key as the key and the row as the value.
+ * @author jtaylor
+ * @since 0.1
+ */
+@Immutable
+public interface HashCache extends Closeable {
+ public List<Tuple> get(ImmutableBytesPtr hashKey);
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java b/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
new file mode 100644
index 0000000..fac78de
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
@@ -0,0 +1,10 @@
+package org.apache.phoenix.cache;
+
+import java.io.Closeable;
+import java.util.List;
+
+import org.apache.phoenix.index.IndexMaintainer;
+
+public interface IndexMetaDataCache extends Closeable {
+ public List<IndexMaintainer> getIndexMaintainers();
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java b/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
new file mode 100644
index 0000000..c5b052d
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
@@ -0,0 +1,278 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.cache;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.coprocessor.ServerCachingProtocol;
+import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.job.JobManager.JobCallable;
+import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.Closeables;
+import org.apache.phoenix.util.SQLCloseable;
+import org.apache.phoenix.util.SQLCloseables;
+
+/**
+ *
+ * Client for sending cache to each region server
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class ServerCacheClient {
+ public static final int UUID_LENGTH = Bytes.SIZEOF_LONG;
+ private static final Log LOG = LogFactory.getLog(ServerCacheClient.class);
+ private static final Random RANDOM = new Random();
+ private final PhoenixConnection connection;
+ private final TableRef cacheUsingTableRef;
+
+ /**
+ * Construct client used to create a serialized cached snapshot of a table and send it to each region server
+ * for caching during hash join processing.
+ * @param connection the client connection
+ * @param cacheUsingTableRef table name
+ *
+ * TODO: instead of minMaxKeyRange, have an interface for iterating through ranges as we may be sending to
+ * servers when we don't have to if the min is in first region and max is in last region, especially for point queries.
+ */
+ public ServerCacheClient(PhoenixConnection connection, TableRef cacheUsingTableRef) {
+ this.connection = connection;
+ this.cacheUsingTableRef = cacheUsingTableRef;
+ }
+
+ public PhoenixConnection getConnection() {
+ return connection;
+ }
+
+ public TableRef getTableRef() {
+ return cacheUsingTableRef;
+ }
+
+ /**
+ * Client-side representation of a server cache. Call {@link #close()} when usage
+ * is complete to free cache up on region server
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+ public class ServerCache implements SQLCloseable {
+ private final int size;
+ private final byte[] id;
+ private final ImmutableSet<HRegionLocation> servers;
+
+ public ServerCache(byte[] id, Set<HRegionLocation> servers, int size) {
+ this.id = id;
+ this.servers = ImmutableSet.copyOf(servers);
+ this.size = size;
+ }
+
+ /**
+ * Gets the size in bytes of hash cache
+ */
+ public int getSize() {
+ return size;
+ }
+
+ /**
+ * Gets the unique identifier for this hash cache
+ */
+ public byte[] getId() {
+ return id;
+ }
+
+ /**
+ * Call to free up cache on region servers when no longer needed
+ */
+ @Override
+ public void close() throws SQLException {
+ removeServerCache(id, servers);
+ }
+
+ }
+
+ public ServerCache addServerCache(ScanRanges keyRanges, final ImmutableBytesWritable cachePtr, final ServerCacheFactory cacheFactory) throws SQLException {
+ ConnectionQueryServices services = connection.getQueryServices();
+ MemoryChunk chunk = services.getMemoryManager().allocate(cachePtr.getLength());
+ List<Closeable> closeables = new ArrayList<Closeable>();
+ closeables.add(chunk);
+ ServerCache hashCacheSpec = null;
+ SQLException firstException = null;
+ final byte[] cacheId = generateId();
+ /**
+ * Execute EndPoint in parallel on each server to send compressed hash cache
+ */
+ // TODO: generalize and package as a per region server EndPoint caller
+ // (ideally this would be functionality provided by the coprocessor framework)
+ boolean success = false;
+ ExecutorService executor = services.getExecutor();
+ List<Future<Boolean>> futures = Collections.emptyList();
+ try {
+ List<HRegionLocation> locations = services.getAllTableRegions(cacheUsingTableRef.getTable().getName().getBytes());
+ int nRegions = locations.size();
+ // Size these based on worst case
+ futures = new ArrayList<Future<Boolean>>(nRegions);
+ Set<HRegionLocation> servers = new HashSet<HRegionLocation>(nRegions);
+ for (HRegionLocation entry : locations) {
+ // Keep track of servers we've sent to and only send once
+ if ( ! servers.contains(entry) &&
+ keyRanges.intersect(entry.getRegionInfo().getStartKey(), entry.getRegionInfo().getEndKey())) { // Call RPC once per server
+ servers.add(entry);
+ if (LOG.isDebugEnabled()) {LOG.debug("Adding cache entry to be sent for " + entry);}
+ final byte[] key = entry.getRegionInfo().getStartKey();
+ final HTableInterface htable = services.getTable(cacheUsingTableRef.getTable().getName().getBytes());
+ closeables.add(htable);
+ futures.add(executor.submit(new JobCallable<Boolean>() {
+
+ @Override
+ public Boolean call() throws Exception {
+ ServerCachingProtocol protocol = htable.coprocessorProxy(ServerCachingProtocol.class, key);
+ return protocol.addServerCache(connection.getTenantId(), cacheId, cachePtr, cacheFactory);
+ }
+
+ /**
+ * Defines the grouping for round robin behavior. All threads spawned to process
+ * this scan will be grouped together and time sliced with other simultaneously
+ * executing parallel scans.
+ */
+ @Override
+ public Object getJobId() {
+ return ServerCacheClient.this;
+ }
+ }));
+ } else {
+ if (LOG.isDebugEnabled()) {LOG.debug("NOT adding cache entry to be sent for " + entry + " since one already exists for that entry");}
+ }
+ }
+
+ hashCacheSpec = new ServerCache(cacheId,servers,cachePtr.getLength());
+ // Execute in parallel
+ int timeoutMs = services.getProps().getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS);
+ for (Future<Boolean> future : futures) {
+ future.get(timeoutMs, TimeUnit.MILLISECONDS);
+ }
+
+ success = true;
+ } catch (SQLException e) {
+ firstException = e;
+ } catch (Exception e) {
+ firstException = new SQLException(e);
+ } finally {
+ try {
+ if (!success) {
+ SQLCloseables.closeAllQuietly(Collections.singletonList(hashCacheSpec));
+ for (Future<Boolean> future : futures) {
+ future.cancel(true);
+ }
+ }
+ } finally {
+ try {
+ Closeables.closeAll(closeables);
+ } catch (IOException e) {
+ if (firstException == null) {
+ firstException = new SQLException(e);
+ }
+ } finally {
+ if (firstException != null) {
+ throw firstException;
+ }
+ }
+ }
+ }
+ return hashCacheSpec;
+ }
+
+ /**
+ * Remove the cached table from all region servers
+ * @param cacheId unique identifier for the hash join (returned from {@link #addHashCache(HTable, Scan, Set)})
+ * @param servers list of servers upon which table was cached (filled in by {@link #addHashCache(HTable, Scan, Set)})
+ * @throws SQLException
+ * @throws IllegalStateException if hashed table cannot be removed on any region server on which it was added
+ */
+ private void removeServerCache(byte[] cacheId, Set<HRegionLocation> servers) throws SQLException {
+ ConnectionQueryServices services = connection.getQueryServices();
+ Throwable lastThrowable = null;
+ byte[] tableName = cacheUsingTableRef.getTable().getName().getBytes();
+ HTableInterface iterateOverTable = services.getTable(tableName);
+ List<HRegionLocation> locations = services.getAllTableRegions(tableName);
+ Set<HRegionLocation> remainingOnServers = new HashSet<HRegionLocation>(servers);
+ /**
+ * Allow for the possibility that the region we based where to send our cache has split and been
+ * relocated to another region server *after* we sent it, but before we removed it. To accommodate
+ * this, we iterate through the current metadata boundaries and remove the cache once for each
+ * server that we originally sent to.
+ */
+ for (HRegionLocation entry : locations) {
+ if (remainingOnServers.contains(entry)) { // Call once per server
+ try {
+ byte[] key = entry.getRegionInfo().getStartKey();
+ ServerCachingProtocol protocol = iterateOverTable.coprocessorProxy(ServerCachingProtocol.class, key);
+ protocol.removeServerCache(connection.getTenantId(), cacheId);
+ remainingOnServers.remove(entry);
+ } catch (Throwable t) {
+ lastThrowable = t;
+ LOG.error("Error trying to remove hash cache for " + entry, t);
+ }
+ }
+ }
+ if (!remainingOnServers.isEmpty()) {
+ LOG.warn("Unable to remove hash cache for " + remainingOnServers, lastThrowable);
+ }
+ }
+
+ /**
+ * Create an ID to keep the cached information across other operations independent.
+ * Using simple long random number, since the length of time we need this to be unique
+ * is very limited.
+ */
+ public static byte[] generateId() {
+ long rand = RANDOM.nextLong();
+ return Bytes.toBytes(rand);
+ }
+
+ public static String idToString(byte[] uuid) {
+ assert(uuid.length == Bytes.SIZEOF_LONG);
+ return Long.toString(Bytes.toLong(uuid));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/cache/TenantCache.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/cache/TenantCache.java b/src/main/java/org/apache/phoenix/cache/TenantCache.java
new file mode 100644
index 0000000..acddac9
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/cache/TenantCache.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.cache;
+
+import java.io.Closeable;
+import java.sql.SQLException;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
+import org.apache.phoenix.memory.MemoryManager;
+
+
+/**
+ *
+ * Inteface to set and set cached values for a tenant
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public interface TenantCache {
+ MemoryManager getMemoryManager();
+ Closeable getServerCache(ImmutableBytesPtr cacheId);
+ Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, ServerCacheFactory cacheFactory) throws SQLException;
+ void removeServerCache(ImmutableBytesPtr cacheId) throws SQLException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java b/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
new file mode 100644
index 0000000..0cdb888
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.cache;
+
+import java.io.Closeable;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import com.google.common.cache.*;
+import org.apache.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
+import org.apache.phoenix.memory.MemoryManager;
+import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.util.Closeables;
+
+/**
+ *
+ * Cache per tenant on server side. Tracks memory usage for each
+ * tenat as well and rolling up usage to global memory manager.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class TenantCacheImpl implements TenantCache {
+ private final int maxTimeToLiveMs;
+ private final MemoryManager memoryManager;
+ private volatile Cache<ImmutableBytesPtr, Closeable> serverCaches;
+
+ public TenantCacheImpl(MemoryManager memoryManager, int maxTimeToLiveMs) {
+ this.memoryManager = memoryManager;
+ this.maxTimeToLiveMs = maxTimeToLiveMs;
+ }
+
+ @Override
+ public MemoryManager getMemoryManager() {
+ return memoryManager;
+ }
+
+ private Cache<ImmutableBytesPtr,Closeable> getServerCaches() {
+ /* Delay creation of this map until it's needed */
+ if (serverCaches == null) {
+ synchronized(this) {
+ if (serverCaches == null) {
+ serverCaches = CacheBuilder.newBuilder()
+ .expireAfterAccess(maxTimeToLiveMs, TimeUnit.MILLISECONDS)
+ .removalListener(new RemovalListener<ImmutableBytesPtr, Closeable>(){
+ @Override
+ public void onRemoval(RemovalNotification<ImmutableBytesPtr, Closeable> notification) {
+ Closeables.closeAllQuietly(Collections.singletonList(notification.getValue()));
+ }
+ })
+ .build();
+ }
+ }
+ }
+ return serverCaches;
+ }
+
+ @Override
+ public Closeable getServerCache(ImmutableBytesPtr cacheId) {
+ return getServerCaches().getIfPresent(cacheId);
+ }
+
+ @Override
+ public Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, ServerCacheFactory cacheFactory) throws SQLException {
+ MemoryChunk chunk = this.getMemoryManager().allocate(cachePtr.getLength());
+ Closeable element = cacheFactory.newCache(cachePtr, chunk);
+ getServerCaches().put(cacheId, element);
+ return element;
+ }
+
+ @Override
+ public void removeServerCache(ImmutableBytesPtr cacheId) throws SQLException {
+ getServerCaches().invalidate(cacheId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/compile/AggregationManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/compile/AggregationManager.java b/src/main/java/org/apache/phoenix/compile/AggregationManager.java
new file mode 100644
index 0000000..70ea3a2
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/compile/AggregationManager.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import org.apache.phoenix.expression.aggregator.ClientAggregators;
+
+/**
+ *
+ * Class that manages aggregations during query compilation
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class AggregationManager {
+ private ClientAggregators aggregators;
+ private int position = 0;
+
+ public AggregationManager() {
+ }
+
+ public ClientAggregators getAggregators() {
+ return aggregators;
+ }
+
+ /**
+ * @return allocate the next available zero-based positional index
+ * for the client-side aggregate function.
+ */
+ protected int nextPosition() {
+ return position++;
+ }
+
+ public void setAggregators(ClientAggregators clientAggregator) {
+ this.aggregators = clientAggregator;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/compile/BindManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/compile/BindManager.java b/src/main/java/org/apache/phoenix/compile/BindManager.java
new file mode 100644
index 0000000..4343d41
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/compile/BindManager.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.sql.ParameterMetaData;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
+import org.apache.phoenix.parse.BindParseNode;
+import org.apache.phoenix.schema.PDatum;
+
+
+/**
+ *
+ * Class that manages binding parameters and checking type matching. There are
+ * two main usages:
+ *
+ * 1) the standard query case where we have the values for the binds.
+ * 2) the retrieve param metadata case where we don't have the bind values.
+ *
+ * In both cases, during query compilation we figure out what type the bind variable
+ * "should" be, based on how it's used in the query. For example foo < ? would expect
+ * that the bind variable type matches or can be coerced to the type of foo. For (1),
+ * we check that the bind value has the correct type and for (2) we set the param
+ * metadata type.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class BindManager {
+ public static final Object UNBOUND_PARAMETER = new Object();
+
+ private final List<Object> binds;
+ private final PhoenixParameterMetaData bindMetaData;
+
+ public BindManager(List<Object> binds, int bindCount) {
+ this.binds = binds;
+ this.bindMetaData = new PhoenixParameterMetaData(bindCount);
+ }
+
+ public ParameterMetaData getParameterMetaData() {
+ return bindMetaData;
+ }
+
+ public Object getBindValue(BindParseNode node) throws SQLException {
+ int index = node.getIndex();
+ if (index < 0 || index >= binds.size()) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.PARAM_INDEX_OUT_OF_BOUND)
+ .setMessage("binds size: " + binds.size() + "; index: " + index).build().buildException();
+ }
+ Object value = binds.get(index);
+ if (value == UNBOUND_PARAMETER) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.PARAM_VALUE_UNBOUND)
+ .setMessage(node.toString()).build().buildException();
+ }
+ return value;
+ }
+
+ public void addParamMetaData(BindParseNode bind, PDatum column) throws SQLException {
+ bindMetaData.addParam(bind,column);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/compile/ColumnProjector.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/compile/ColumnProjector.java b/src/main/java/org/apache/phoenix/compile/ColumnProjector.java
new file mode 100644
index 0000000..e8a18e8
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/compile/ColumnProjector.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.sql.SQLException;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+
+
+/**
+ *
+ * Interface used to access the value of a projected column.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public interface ColumnProjector {
+ /**
+ * Get the column name as it was referenced in the query
+ * @return the database column name
+ */
+ String getName();
+
+ /**
+ * Get the expression
+ * @return the expression for the column projector
+ */
+ public Expression getExpression();
+
+ /**
+ * Get the name of the hbase table containing the column
+ * @return the hbase table name
+ */
+ String getTableName();
+
+ /**
+ * Get the value of the column, coercing it if necessary to the specified type
+ * @param tuple the row containing the column
+ * @param type the type to which to coerce the binary value
+ * @param ptr used to retrieve the value
+ * @return the object representation of the column value.
+ * @throws SQLException
+ */
+ Object getValue(Tuple tuple, PDataType type, ImmutableBytesWritable ptr) throws SQLException;
+
+ boolean isCaseSensitive();
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/compile/ColumnResolver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/compile/ColumnResolver.java b/src/main/java/org/apache/phoenix/compile/ColumnResolver.java
new file mode 100644
index 0000000..49a1947
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/compile/ColumnResolver.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.phoenix.schema.ColumnRef;
+import org.apache.phoenix.schema.TableRef;
+
+
+
+/**
+ *
+ * Interface used to resolve column references occurring
+ * in the select statement.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public interface ColumnResolver {
+
+ /**
+ * Returns the collection of resolved tables in the FROM clause.
+ */
+ public List<TableRef> getTables();
+
+ /**
+ * Resolves column using name and alias.
+ * @param schemaName TODO
+ * @param tableName TODO
+ * @param colName TODO
+ * @return the resolved ColumnRef
+ * @throws ColumnNotFoundException if the column could not be resolved
+ * @throws AmbiguousColumnException if the column name is ambiguous
+ */
+ public ColumnRef resolveColumn(String schemaName, String tableName, String colName) throws SQLException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java b/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
new file mode 100644
index 0000000..3006a9e
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.sql.ParameterMetaData;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Scan;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.parse.CreateIndexStatement;
+import org.apache.phoenix.parse.ParseNode;
+import org.apache.phoenix.schema.MetaDataClient;
+
+public class CreateIndexCompiler {
+ private final PhoenixStatement statement;
+
+ public CreateIndexCompiler(PhoenixStatement statement) {
+ this.statement = statement;
+ }
+
+ public MutationPlan compile(final CreateIndexStatement create) throws SQLException {
+ final PhoenixConnection connection = statement.getConnection();
+ final ColumnResolver resolver = FromCompiler.getResolver(create, connection);
+ Scan scan = new Scan();
+ final StatementContext context = new StatementContext(create, connection, resolver, statement.getParameters(), scan);
+ ExpressionCompiler expressionCompiler = new ExpressionCompiler(context);
+ List<ParseNode> splitNodes = create.getSplitNodes();
+ final byte[][] splits = new byte[splitNodes.size()][];
+ for (int i = 0; i < splits.length; i++) {
+ ParseNode node = splitNodes.get(i);
+ if (!node.isConstant()) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.SPLIT_POINT_NOT_CONSTANT)
+ .setMessage("Node: " + node).build().buildException();
+ }
+ LiteralExpression expression = (LiteralExpression)node.accept(expressionCompiler);
+ splits[i] = expression.getBytes();
+ }
+ final MetaDataClient client = new MetaDataClient(connection);
+
+ return new MutationPlan() {
+
+ @Override
+ public ParameterMetaData getParameterMetaData() {
+ return context.getBindManager().getParameterMetaData();
+ }
+
+ @Override
+ public PhoenixConnection getConnection() {
+ return connection;
+ }
+
+ @Override
+ public MutationState execute() throws SQLException {
+ return client.createIndex(create, splits);
+ }
+
+ @Override
+ public ExplainPlan getExplainPlan() throws SQLException {
+ return new ExplainPlan(Collections.singletonList("CREATE INDEX"));
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java b/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java
new file mode 100644
index 0000000..ab98538
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.sql.ParameterMetaData;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Scan;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.parse.CreateTableStatement;
+import org.apache.phoenix.parse.ParseNode;
+import org.apache.phoenix.schema.MetaDataClient;
+
+
+public class CreateTableCompiler {
+ private final PhoenixStatement statement;
+
+ public CreateTableCompiler(PhoenixStatement statement) {
+ this.statement = statement;
+ }
+
+ public MutationPlan compile(final CreateTableStatement create) throws SQLException {
+ final PhoenixConnection connection = statement.getConnection();
+ final ColumnResolver resolver = FromCompiler.getResolver(create, connection);
+ Scan scan = new Scan();
+ final StatementContext context = new StatementContext(create, connection, resolver, statement.getParameters(), scan);
+ ExpressionCompiler expressionCompiler = new ExpressionCompiler(context);
+ List<ParseNode> splitNodes = create.getSplitNodes();
+ final byte[][] splits = new byte[splitNodes.size()][];
+ for (int i = 0; i < splits.length; i++) {
+ ParseNode node = splitNodes.get(i);
+ if (!node.isConstant()) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.SPLIT_POINT_NOT_CONSTANT)
+ .setMessage("Node: " + node).build().buildException();
+ }
+ LiteralExpression expression = (LiteralExpression)node.accept(expressionCompiler);
+ splits[i] = expression.getBytes();
+ }
+ final MetaDataClient client = new MetaDataClient(connection);
+
+ return new MutationPlan() {
+
+ @Override
+ public ParameterMetaData getParameterMetaData() {
+ return context.getBindManager().getParameterMetaData();
+ }
+
+ @Override
+ public MutationState execute() throws SQLException {
+ return client.createTable(create, splits);
+ }
+
+ @Override
+ public ExplainPlan getExplainPlan() throws SQLException {
+ return new ExplainPlan(Collections.singletonList("CREATE TABLE"));
+ }
+
+ @Override
+ public PhoenixConnection getConnection() {
+ return connection;
+ }
+
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
new file mode 100644
index 0000000..72b8ca1
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -0,0 +1,352 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.sql.ParameterMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.AggregatePlan;
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.index.IndexMetaDataCacheClient;
+import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.optimize.QueryOptimizer;
+import org.apache.phoenix.parse.AliasedNode;
+import org.apache.phoenix.parse.DeleteStatement;
+import org.apache.phoenix.parse.HintNode;
+import org.apache.phoenix.parse.HintNode.Hint;
+import org.apache.phoenix.parse.ParseNode;
+import org.apache.phoenix.parse.ParseNodeFactory;
+import org.apache.phoenix.parse.SelectStatement;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.query.Scanner;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.ReadOnlyTableException;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.IndexUtil;
+
+public class DeleteCompiler {
+ private static ParseNodeFactory FACTORY = new ParseNodeFactory();
+
+ private final PhoenixStatement statement;
+
+ public DeleteCompiler(PhoenixStatement statement) {
+ this.statement = statement;
+ }
+
+ private static MutationState deleteRows(PhoenixStatement statement, TableRef tableRef, ResultIterator iterator, RowProjector projector) throws SQLException {
+ PhoenixConnection connection = statement.getConnection();
+ final boolean isAutoCommit = connection.getAutoCommit();
+ ConnectionQueryServices services = connection.getQueryServices();
+ final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
+ final int batchSize = Math.min(connection.getMutateBatchSize(), maxSize);
+ Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutations = Maps.newHashMapWithExpectedSize(batchSize);
+ try {
+ PTable table = tableRef.getTable();
+ List<PColumn> pkColumns = table.getPKColumns();
+ int offset = table.getBucketNum() == null ? 0 : 1; // Take into account salting
+ byte[][] values = new byte[pkColumns.size()][];
+ ResultSet rs = new PhoenixResultSet(iterator, projector, statement);
+ int rowCount = 0;
+ while (rs.next()) {
+ for (int i = offset; i < values.length; i++) {
+ byte[] byteValue = rs.getBytes(i+1-offset);
+ // The ResultSet.getBytes() call will have inverted it - we need to invert it back.
+ // TODO: consider going under the hood and just getting the bytes
+ if (pkColumns.get(i).getColumnModifier() == ColumnModifier.SORT_DESC) {
+ byte[] tempByteValue = Arrays.copyOf(byteValue, byteValue.length);
+ byteValue = ColumnModifier.SORT_DESC.apply(byteValue, 0, tempByteValue, 0, byteValue.length);
+ }
+ values[i] = byteValue;
+ }
+ ImmutableBytesPtr ptr = new ImmutableBytesPtr();
+ table.newKey(ptr, values);
+ mutations.put(ptr, null);
+ if (mutations.size() > maxSize) {
+ throw new IllegalArgumentException("MutationState size of " + mutations.size() + " is bigger than max allowed size of " + maxSize);
+ }
+ rowCount++;
+ // Commit a batch if auto commit is true and we're at our batch size
+ if (isAutoCommit && rowCount % batchSize == 0) {
+ MutationState state = new MutationState(tableRef, mutations, 0, maxSize, connection);
+ connection.getMutationState().join(state);
+ connection.commit();
+ mutations.clear();
+ }
+ }
+
+ // If auto commit is true, this last batch will be committed upon return
+ return new MutationState(tableRef, mutations, rowCount / batchSize * batchSize, maxSize, connection);
+ } finally {
+ iterator.close();
+ }
+ }
+
+ private static class DeletingParallelIteratorFactory extends MutatingParallelIteratorFactory {
+ private RowProjector projector;
+
+ private DeletingParallelIteratorFactory(PhoenixConnection connection, TableRef tableRef) {
+ super(connection, tableRef);
+ }
+
+ @Override
+ protected MutationState mutate(PhoenixConnection connection, ResultIterator iterator) throws SQLException {
+ PhoenixStatement statement = new PhoenixStatement(connection);
+ return deleteRows(statement, tableRef, iterator, projector);
+ }
+
+ public void setRowProjector(RowProjector projector) {
+ this.projector = projector;
+ }
+
+ }
+
+ private boolean hasImmutableIndex(TableRef tableRef) {
+ return tableRef.getTable().isImmutableRows() && !tableRef.getTable().getIndexes().isEmpty();
+ }
+
+ private boolean hasImmutableIndexWithKeyValueColumns(TableRef tableRef) {
+ if (!hasImmutableIndex(tableRef)) {
+ return false;
+ }
+ for (PTable index : tableRef.getTable().getIndexes()) {
+ for (PColumn column : index.getPKColumns()) {
+ if (!IndexUtil.isDataPKColumn(column)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ public MutationPlan compile(DeleteStatement delete) throws SQLException {
+ final PhoenixConnection connection = statement.getConnection();
+ final boolean isAutoCommit = connection.getAutoCommit();
+ final ConnectionQueryServices services = connection.getQueryServices();
+ final ColumnResolver resolver = FromCompiler.getResolver(delete, connection);
+ final TableRef tableRef = resolver.getTables().get(0);
+ if (tableRef.getTable().getType() == PTableType.VIEW) {
+ throw new ReadOnlyTableException("Mutations not allowed for a view (" + tableRef.getTable() + ")");
+ }
+
+ final boolean hasLimit = delete.getLimit() != null;
+ boolean runOnServer = isAutoCommit && !hasLimit && !hasImmutableIndex(tableRef);
+ HintNode hint = delete.getHint();
+ if (runOnServer && !delete.getHint().hasHint(Hint.USE_INDEX_OVER_DATA_TABLE)) {
+ hint = HintNode.create(hint, Hint.USE_DATA_OVER_INDEX_TABLE);
+ }
+
+ PTable table = tableRef.getTable();
+ List<AliasedNode> aliasedNodes = Lists.newArrayListWithExpectedSize(table.getPKColumns().size());
+ for (int i = table.getBucketNum() == null ? 0 : 1; i < table.getPKColumns().size(); i++) {
+ PColumn column = table.getPKColumns().get(i);
+ String name = column.getName().getString();
+ aliasedNodes.add(FACTORY.aliasedNode(null, FACTORY.column(null, name, name)));
+ }
+ SelectStatement select = FACTORY.select(
+ Collections.singletonList(delete.getTable()),
+ hint, false, aliasedNodes, delete.getWhere(),
+ Collections.<ParseNode>emptyList(), null,
+ delete.getOrderBy(), delete.getLimit(),
+ delete.getBindCount(), false);
+ DeletingParallelIteratorFactory parallelIteratorFactory = hasLimit ? null : new DeletingParallelIteratorFactory(connection, tableRef);
+ final QueryPlan plan = new QueryOptimizer(services).optimize(select, statement, Collections.<PColumn>emptyList(), parallelIteratorFactory);
+ runOnServer &= plan.getTableRef().equals(tableRef);
+
+ final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
+
+ if (hasImmutableIndexWithKeyValueColumns(tableRef)) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.NO_DELETE_IF_IMMUTABLE_INDEX).setSchemaName(tableRef.getTable().getSchemaName().getString())
+ .setTableName(tableRef.getTable().getTableName().getString()).build().buildException();
+ }
+
+ final StatementContext context = plan.getContext();
+ // If we're doing a query for a single row with no where clause, then we don't need to contact the server at all.
+ // A simple check of the none existence of a where clause in the parse node is not sufficient, as the where clause
+ // may have been optimized out.
+ if (runOnServer && context.isSingleRowScan()) {
+ final ImmutableBytesPtr key = new ImmutableBytesPtr(context.getScan().getStartRow());
+ return new MutationPlan() {
+
+ @Override
+ public ParameterMetaData getParameterMetaData() {
+ return context.getBindManager().getParameterMetaData();
+ }
+
+ @Override
+ public MutationState execute() {
+ Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutation = Maps.newHashMapWithExpectedSize(1);
+ mutation.put(key, null);
+ return new MutationState(tableRef, mutation, 0, maxSize, connection);
+ }
+
+ @Override
+ public ExplainPlan getExplainPlan() throws SQLException {
+ return new ExplainPlan(Collections.singletonList("DELETE SINGLE ROW"));
+ }
+
+ @Override
+ public PhoenixConnection getConnection() {
+ return connection;
+ }
+ };
+ } else if (runOnServer) {
+ // TODO: better abstraction
+ Scan scan = context.getScan();
+ scan.setAttribute(UngroupedAggregateRegionObserver.DELETE_AGG, QueryConstants.TRUE);
+
+ // Build an ungrouped aggregate query: select COUNT(*) from <table> where <where>
+ // The coprocessor will delete each row returned from the scan
+ // Ignoring ORDER BY, since with auto commit on and no limit makes no difference
+ SelectStatement aggSelect = SelectStatement.create(SelectStatement.COUNT_ONE, delete.getHint());
+ final RowProjector projector = ProjectionCompiler.compile(context, aggSelect, GroupBy.EMPTY_GROUP_BY);
+ final QueryPlan aggPlan = new AggregatePlan(context, select, tableRef, projector, null, OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null);
+ return new MutationPlan() {
+
+ @Override
+ public PhoenixConnection getConnection() {
+ return connection;
+ }
+
+ @Override
+ public ParameterMetaData getParameterMetaData() {
+ return context.getBindManager().getParameterMetaData();
+ }
+
+ @Override
+ public MutationState execute() throws SQLException {
+ // TODO: share this block of code with UPSERT SELECT
+ ImmutableBytesWritable ptr = context.getTempPtr();
+ tableRef.getTable().getIndexMaintainers(ptr);
+ ServerCache cache = null;
+ try {
+ if (ptr.getLength() > 0) {
+ IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef);
+ cache = client.addIndexMetadataCache(context.getScanRanges(), ptr);
+ byte[] uuidValue = cache.getId();
+ context.getScan().setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+ }
+ Scanner scanner = aggPlan.getScanner();
+ ResultIterator iterator = scanner.iterator();
+ try {
+ Tuple row = iterator.next();
+ final long mutationCount = (Long)projector.getColumnProjector(0).getValue(row, PDataType.LONG, ptr);
+ return new MutationState(maxSize, connection) {
+ @Override
+ public long getUpdateCount() {
+ return mutationCount;
+ }
+ };
+ } finally {
+ iterator.close();
+ }
+ } finally {
+ if (cache != null) {
+ cache.close();
+ }
+ }
+ }
+
+ @Override
+ public ExplainPlan getExplainPlan() throws SQLException {
+ List<String> queryPlanSteps = aggPlan.getExplainPlan().getPlanSteps();
+ List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
+ planSteps.add("DELETE ROWS");
+ planSteps.addAll(queryPlanSteps);
+ return new ExplainPlan(planSteps);
+ }
+ };
+ } else {
+ if (parallelIteratorFactory != null) {
+ parallelIteratorFactory.setRowProjector(plan.getProjector());
+ }
+ return new MutationPlan() {
+
+ @Override
+ public PhoenixConnection getConnection() {
+ return connection;
+ }
+
+ @Override
+ public ParameterMetaData getParameterMetaData() {
+ return context.getBindManager().getParameterMetaData();
+ }
+
+ @Override
+ public MutationState execute() throws SQLException {
+ Scanner scanner = plan.getScanner();
+ ResultIterator iterator = scanner.iterator();
+ if (!hasLimit) {
+ Tuple tuple;
+ long totalRowCount = 0;
+ while ((tuple=iterator.next()) != null) {// Runs query
+ KeyValue kv = tuple.getValue(0);
+ totalRowCount += PDataType.LONG.getCodec().decodeLong(kv.getBuffer(), kv.getValueOffset(), null);
+ }
+ // Return total number of rows that have been delete. In the case of auto commit being off
+ // the mutations will all be in the mutation state of the current connection.
+ return new MutationState(maxSize, connection, totalRowCount);
+ } else {
+ return deleteRows(statement, tableRef, iterator, plan.getProjector());
+ }
+ }
+
+ @Override
+ public ExplainPlan getExplainPlan() throws SQLException {
+ List<String> queryPlanSteps = plan.getExplainPlan().getPlanSteps();
+ List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
+ planSteps.add("DELETE ROWS");
+ planSteps.addAll(queryPlanSteps);
+ return new ExplainPlan(planSteps);
+ }
+ };
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/compile/ExplainPlan.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/compile/ExplainPlan.java b/src/main/java/org/apache/phoenix/compile/ExplainPlan.java
new file mode 100644
index 0000000..e1049a0
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/compile/ExplainPlan.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+
+public class ExplainPlan {
+ public static final ExplainPlan EMPTY_PLAN = new ExplainPlan(Collections.<String>emptyList());
+
+ private final List<String> planSteps;
+
+ public ExplainPlan(List<String> planSteps) {
+ this.planSteps = ImmutableList.copyOf(planSteps);
+ }
+
+ public List<String> getPlanSteps() {
+ return planSteps;
+ }
+}