You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by an...@apache.org on 2017/05/17 07:27:02 UTC
[1/2] phoenix git commit: PHOENIX-3572 Support FETCH NEXT| n ROWS
from Cursor (Biju Nair)
Repository: phoenix
Updated Branches:
refs/heads/master 1666e932d -> 2cb617f35
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2cb617f3/phoenix-core/src/main/java/org/apache/phoenix/util/CursorUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/CursorUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/CursorUtil.java
new file mode 100644
index 0000000..877c436
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/CursorUtil.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.util;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.execute.CursorFetchPlan;
+import org.apache.phoenix.iterate.CursorResultIterator;
+import org.apache.phoenix.parse.CloseStatement;
+import org.apache.phoenix.parse.DeclareCursorStatement;
+import org.apache.phoenix.parse.OpenStatement;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+public final class CursorUtil {
+
+ private static class CursorWrapper {
+ private final String cursorName;
+ private final String selectSQL;
+ private boolean isOpen = false;
+ QueryPlan queryPlan;
+ ImmutableBytesWritable row;
+ ImmutableBytesWritable previousRow;
+ private Scan scan;
+ private boolean moreValues=true;
+ private boolean isReversed;
+ private boolean islastCallNext;
+ private CursorFetchPlan fetchPlan;
+ private int offset = -1;
+ private boolean isAggregate;
+
+ private CursorWrapper(String cursorName, String selectSQL, QueryPlan queryPlan){
+ this.cursorName = cursorName;
+ this.selectSQL = selectSQL;
+ this.queryPlan = queryPlan;
+ this.islastCallNext = true;
+ this.fetchPlan = new CursorFetchPlan(queryPlan,cursorName);
+ isAggregate = fetchPlan.isAggregate();
+ }
+
+ private synchronized void openCursor(Connection conn) throws SQLException {
+ if(isOpen){
+ return;
+ }
+ this.scan = this.queryPlan.getContext().getScan();
+ isReversed=OrderBy.REV_ROW_KEY_ORDER_BY.equals(this.queryPlan.getOrderBy());
+ isOpen = true;
+ }
+
+ private void closeCursor() throws SQLException {
+ isOpen = false;
+ ((CursorResultIterator) fetchPlan.iterator()).closeCursor();
+ //TODO: Determine if the cursor should be removed from the HashMap at this point.
+ //Semantically it makes sense that something which is 'Closed' one should be able to 'Open' again.
+ mapCursorIDQuery.remove(this.cursorName);
+ }
+
+ private QueryPlan getFetchPlan(boolean isNext, int fetchSize) throws SQLException {
+ if (!isOpen)
+ throw new SQLException("Fetch call on closed cursor '" + this.cursorName + "'!");
+ ((CursorResultIterator)fetchPlan.iterator()).setFetchSize(fetchSize);
+ if (!isAggregate) {
+ if (row!=null){
+ scan.setStartRow(row.get());
+ }
+ }
+ return this.fetchPlan;
+ }
+
+ public void updateLastScanRow(Tuple rowValues,Tuple nextRowValues) {
+
+ this.moreValues = !isReversed ? nextRowValues != null : rowValues != null;
+ if(!moreValues()){
+ return;
+ }
+ if (row == null) {
+ row = new ImmutableBytesWritable();
+ }
+ if (previousRow == null) {
+ previousRow = new ImmutableBytesWritable();
+ }
+ if (nextRowValues != null) {
+ nextRowValues.getKey(row);
+ }
+ if (rowValues != null) {
+ rowValues.getKey(previousRow);
+ }
+ offset++;
+ }
+
+ public boolean moreValues() {
+ return moreValues;
+ }
+
+ public String getFetchSQL() throws SQLException {
+ if (!isOpen)
+ throw new SQLException("Fetch call on closed cursor '" + this.cursorName + "'!");
+ return selectSQL;
+ }
+ }
+
+ private static Map<String, CursorWrapper> mapCursorIDQuery = new HashMap<String,CursorWrapper>();
+
+ /**
+ * Private constructor
+ */
+ private CursorUtil() {
+ }
+
+ /**
+ *
+ * @param stmt DeclareCursorStatement instance intending to declare a new cursor.
+ * @return Returns true if the new cursor was successfully declared. False if a cursor with the same
+ * identifier already exists.
+ */
+ public static boolean declareCursor(DeclareCursorStatement stmt, QueryPlan queryPlan) throws SQLException {
+ if(mapCursorIDQuery.containsKey(stmt.getCursorName())){
+ throw new SQLException("Can't declare cursor " + stmt.getCursorName() + ", cursor identifier already in use.");
+ } else {
+ mapCursorIDQuery.put(stmt.getCursorName(), new CursorWrapper(stmt.getCursorName(), stmt.getQuerySQL(), queryPlan));
+ return true;
+ }
+ }
+
+ public static boolean openCursor(OpenStatement stmt, Connection conn) throws SQLException {
+ if(mapCursorIDQuery.containsKey(stmt.getCursorName())){
+ mapCursorIDQuery.get(stmt.getCursorName()).openCursor(conn);
+ return true;
+ } else{
+ throw new SQLException("Cursor " + stmt.getCursorName() + " not declared.");
+ }
+ }
+
+ public static void closeCursor(CloseStatement stmt) throws SQLException {
+ if(mapCursorIDQuery.containsKey(stmt.getCursorName())){
+ mapCursorIDQuery.get(stmt.getCursorName()).closeCursor();
+ }
+ }
+
+ public static QueryPlan getFetchPlan(String cursorName, boolean isNext, int fetchSize) throws SQLException {
+ if(mapCursorIDQuery.containsKey(cursorName)){
+ return mapCursorIDQuery.get(cursorName).getFetchPlan(isNext, fetchSize);
+ } else {
+ throw new SQLException("Cursor " + cursorName + " not declared.");
+ }
+ }
+
+ public static String getFetchSQL(String cursorName) throws SQLException {
+ if (mapCursorIDQuery.containsKey(cursorName)) {
+ return mapCursorIDQuery.get(cursorName).getFetchSQL();
+ } else {
+ throw new SQLException("Cursor " + cursorName + " not declared.");
+ }
+ }
+
+ public static void updateCursor(String cursorName, Tuple rowValues, Tuple nextRowValues) throws SQLException {
+ mapCursorIDQuery.get(cursorName).updateLastScanRow(rowValues,nextRowValues);
+ }
+
+ public static boolean cursorDeclared(String cursorName){
+ return mapCursorIDQuery.containsKey(cursorName);
+ }
+
+ public static boolean moreValues(String cursorName) {
+ return mapCursorIDQuery.get(cursorName).moreValues();
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2cb617f3/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index 1fdc73b..9794a2a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -613,6 +613,10 @@ public class ScanUtil {
scan.setAttribute(BaseScannerRegionObserver.REVERSE_SCAN, PDataType.TRUE_BYTES);
}
+ public static void unsetReversed(Scan scan) {
+ scan.setAttribute(BaseScannerRegionObserver.REVERSE_SCAN, PDataType.FALSE_BYTES);
+ }
+
private static byte[] getReversedRow(byte[] startRow) {
/*
* Must get previous key because this is going from an inclusive start key to an exclusive stop key, and we need
@@ -926,4 +930,4 @@ public class ScanUtil {
return scan.getAttribute((BaseScannerRegionObserver.REBUILD_INDEXES)) != null;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2cb617f3/phoenix-core/src/test/java/org/apache/phoenix/compile/CursorCompilerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/CursorCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/CursorCompilerTest.java
new file mode 100644
index 0000000..a8f37f0
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/CursorCompilerTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.CountAggregator;
+import org.apache.phoenix.expression.aggregator.ServerAggregators;
+import org.apache.phoenix.expression.function.TimeUnit;
+import org.apache.phoenix.filter.ColumnProjectionFilter;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.query.BaseConnectionlessQueryTest;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.*;
+import org.apache.phoenix.util.*;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.sql.*;
+import java.util.*;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.apache.phoenix.util.TestUtil.assertDegenerate;
+import static org.junit.Assert.*;
+
+
+/**
+ *
+ * Test for compiling the various cursor related statements
+ *
+ *
+ * @since 0.1
+ */
+@edu.umd.cs.findbugs.annotations.SuppressWarnings(
+ value="RV_RETURN_VALUE_IGNORED",
+ justification="Test code.")
+public class CursorCompilerTest extends BaseConnectionlessQueryTest {
+
+ @Test
+ public void testCursorLifecycleCompile() throws SQLException {
+ String query = "SELECT a_string, b_string FROM atable";
+ String sql = "DECLARE testCursor CURSOR FOR " + query;
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ //Test declare cursor compile
+ PreparedStatement statement = conn.prepareStatement(sql);
+ //Test declare cursor execution
+ statement.execute();
+ assertTrue(CursorUtil.cursorDeclared("testCursor"));
+ //Test open cursor compile
+ sql = "OPEN testCursor";
+ statement = conn.prepareStatement(sql);
+ //Test open cursor execution
+ statement.execute();
+ //Test fetch cursor compile
+ sql = "FETCH NEXT FROM testCursor";
+ statement = conn.prepareStatement(sql);
+ statement.executeQuery();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2cb617f3/phoenix-core/src/test/java/org/apache/phoenix/parse/CursorParserTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/parse/CursorParserTest.java b/phoenix-core/src/test/java/org/apache/phoenix/parse/CursorParserTest.java
new file mode 100644
index 0000000..247ee44
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/parse/CursorParserTest.java
@@ -0,0 +1,367 @@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+import org.apache.phoenix.schema.SortOrder;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+
+public class CursorParserTest {
+
+ private void parseCursor(String sql) throws IOException, SQLException {
+ SQLParser parser = new SQLParser(new StringReader(sql));
+ BindableStatement stmt = null;
+ try{
+ stmt = parser.parseDeclareCursor();
+ } catch (SQLException e){
+ fail("Unable to parse:\n" + sql);
+ }
+ }
+
+ private void parseFetch(String sql) throws IOException, SQLException {
+ SQLParser parser = new SQLParser(new StringReader(sql));
+ BindableStatement stmt = null;
+ try{
+ stmt = parser.parseFetch();
+ } catch (SQLException e){
+ fail("Unable to parse:\n" + sql);
+ }
+ }
+
+ private void parseOpen(String sql) throws IOException, SQLException {
+ SQLParser parser = new SQLParser(new StringReader(sql));
+ BindableStatement stmt = null;
+ try{
+ stmt = parser.parseOpen();
+ } catch (SQLException e){
+ fail("Unable to parse:\n" + sql);
+ }
+ }
+
+ @Test
+ public void testParseCursor0() throws Exception {
+ String expectedNameToken = "testCursor";
+ String expectedSelectStatement = "select a from b\n" +
+ "where ((ind.name = 'X')" +
+ "and rownum <= (1000 + 1000))\n";
+
+ String sql = "DECLARE " + expectedNameToken + " CURSOR FOR " + expectedSelectStatement;
+ parseCursor(sql);
+ }
+
+ @Test
+ public void testParseCursor1() throws Exception {
+ String expectedNameToken = "testCursor";
+ String expectedSelectStatement = "select /*gatherSlowStats*/ count(1) from core.search_name_lookup ind\n" +
+ "where( (ind.name = 'X'\n" +
+ "and rownum <= 1 + 2)\n" +
+ "and (ind.organization_id = '000000000000000')\n" +
+ "and (ind.key_prefix = '00T')\n" +
+ "and (ind.name_type = 't'))";
+
+
+ String sql = "DECLARE " + expectedNameToken + " CURSOR FOR " + expectedSelectStatement;
+ parseCursor(sql);
+ }
+
+ @Test
+ public void testParseCursor2() throws Exception {
+ String expectedNameToken = "testCursor";
+ String expectedSelectStatement = "select /*gatherSlowStats*/ count(1) from core.custom_index_value ind\n" +
+ "where (ind.string_value in ('a', 'b', 'c', 'd'))\n" +
+ "and rownum <= ( 3 + 1 )\n" +
+ "and (ind.organization_id = '000000000000000')\n" +
+ "and (ind.key_prefix = '00T')\n" +
+ "and (ind.deleted = '0')\n" +
+ "and (ind.index_num = 1)";
+
+
+ String sql = "DECLARE " + expectedNameToken + " CURSOR FOR " + expectedSelectStatement;
+ parseCursor(sql);
+
+ }
+
+ @Test
+ public void testParseCursor3() throws Exception {
+ String expectedNameToken = "testCursor";
+ String expectedSelectStatement = "select /*gatherSlowStats*/ count(1) from core.custom_index_value ind\n" +
+ "where (ind.number_value > 3)\n" +
+ "and rownum <= 1000\n" +
+ "and (ind.organization_id = '000000000000000')\n" +
+ "and (ind.key_prefix = '001'\n" +
+ "and (ind.deleted = '0'))\n" +
+ "and (ind.index_num = 2)";
+
+
+ String sql = "DECLARE " + expectedNameToken + " CURSOR FOR " + expectedSelectStatement;
+ parseCursor(sql);
+
+ }
+
+ @Test
+ public void testParseCursor4() throws Exception {
+ String expectedNameToken = "testCursor";
+ String expectedSelectStatement = "select /*+ index(t iecustom_entity_data_created) */ /*gatherSlowStats*/ count(1) from core.custom_entity_data t\n" +
+ "where (t.created_date > to_date('01/01/2001'))\n" +
+ "and rownum <= 4500\n" +
+ "and (t.organization_id = '000000000000000')\n" +
+ "and (t.key_prefix = '001')";
+
+
+ String sql = "DECLARE " + expectedNameToken + " CURSOR FOR " + expectedSelectStatement;
+ parseCursor(sql);
+
+ }
+
+ @Test
+ public void testCountDistinctCursor() throws Exception {
+ String expectedNameToken = "testCursor";
+ String expectedSelectStatement = "select count(distinct foo) from core.custom_entity_data t\n"
+ + "where (t.created_date > to_date('01/01/2001'))\n"
+ + "and (t.organization_id = '000000000000000')\n"
+ + "and (t.key_prefix = '001')\n" + "limit 4500";
+
+ String sql = "DECLARE " + expectedNameToken + " CURSOR FOR " + expectedSelectStatement;
+ parseCursor(sql);
+
+ }
+
+ @Test
+ public void testIsNullCursor() throws Exception {
+ String expectedNameToken = "testCursor";
+ String expectedSelectStatement = "select count(foo) from core.custom_entity_data t\n" +
+ "where (t.created_date is null)\n" +
+ "and (t.organization_id is not null)\n";
+
+ String sql = "DECLARE " + expectedNameToken + " CURSOR FOR " + expectedSelectStatement;
+ parseCursor(sql);
+
+ }
+
+ @Test
+ public void testAsInColumnAlias() throws Exception {
+ String expectedNameToken = "testCursor";
+ String expectedSelectStatement = "select count(foo) AS c from core.custom_entity_data t\n" +
+ "where (t.created_date is null)\n" +
+ "and (t.organization_id is not null)\n";
+
+ String sql = "DECLARE " + expectedNameToken + " CURSOR FOR " + expectedSelectStatement;
+ parseCursor(sql);
+
+ }
+
+ @Test
+ public void testParseJoin1() throws Exception {
+ String expectedNameToken = "testCursor";
+ String expectedSelectStatement = "select /*SOQL*/ \"Id\"\n" +
+ "from (select /*+ ordered index(cft) */\n" +
+ "cft.val188 \"Marketing_Offer_Code__c\",\n" +
+ "t.account_id \"Id\"\n" +
+ "from sales.account_cfdata cft,\n" +
+ "sales.account t\n" +
+ "where (cft.account_cfdata_id = t.account_id)\n" +
+ "and (cft.organization_id = '00D300000000XHP')\n" +
+ "and (t.organization_id = '00D300000000XHP')\n" +
+ "and (t.deleted = '0')\n" +
+ "and (t.account_id != '000000000000000'))\n" +
+ "where (\"Marketing_Offer_Code__c\" = 'FSCR')";
+
+ String sql = "DECLARE " + expectedNameToken + " CURSOR FOR " + expectedSelectStatement;
+ parseCursor(sql);
+
+ }
+
+ @Test
+ public void testParseJoin2() throws Exception {
+ String expectedNameToken = "testCursor";
+ String expectedSelectStatement = "select /*rptacctlist 00O40000002C3of*/ \"00N40000001M8VK\",\n" +
+ "\"00N40000001M8VK.ID\",\n" +
+ "\"00N30000000r0K2\",\n" +
+ "\"00N30000000jgjo\"\n" +
+ "from (select /*+ ordered use_hash(aval368) index(cfa) */\n" +
+ "a.record_type_id \"RECORDTYPE\",\n" +
+ "aval368.last_name,aval368.first_name || ' ' || aval368.last_name,aval368.name \"00N40000001M8VK\",\n" +
+ "a.last_update \"LAST_UPDATE\",\n" +
+ "cfa.val368 \"00N40000001M8VK.ID\",\n" +
+ "TO_DATE(cfa.val282) \"00N30000000r0K2\",\n" +
+ "cfa.val252 \"00N30000000jgjo\"\n" +
+ "from sales.account a,\n" +
+ "sales.account_cfdata cfa,\n" +
+ "core.name_denorm aval368\n" +
+ "where (cfa.account_cfdata_id = a.account_id)\n" +
+ "and (aval368.entity_id = cfa.val368)\n" +
+ "and (a.deleted = '0')\n" +
+ "and (a.organization_id = '00D300000000EaE')\n" +
+ "and (a.account_id <> '000000000000000')\n" +
+ "and (cfa.organization_id = '00D300000000EaE')\n" +
+ "and (aval368.organization_id = '00D300000000EaE')\n" +
+ "and (aval368.entity_id like '005%'))\n" +
+ "where (\"RECORDTYPE\" = '0123000000002Gv')\n" +
+ "AND (\"00N40000001M8VK\" is null or \"00N40000001M8VK\" in ('BRIAN IRWIN', 'BRIAN MILLER', 'COLLEEN HORNYAK', 'ERNIE ZAVORAL JR', 'JAMIE TRIMBUR', 'JOE ANTESBERGER', 'MICHAEL HYTLA', 'NATHAN DELSIGNORE', 'SANJAY GANDHI', 'TOM BASHIOUM'))\n" +
+ "AND (\"LAST_UPDATE\" >= to_date('2009-08-01 07:00:00'))";
+
+ String sql = "DECLARE " + expectedNameToken + " CURSOR FOR " + expectedSelectStatement;
+ parseCursor(sql);
+
+ }
+
+ @Test
+ public void testCommentCursor() throws Exception {
+ String expectedNameToken = "testCursor";
+ String expectedSelectStatement = "select a from b -- here we come\n" +
+ "where ((ind.name = 'X') // to save the day\n" +
+ "and rownum /* won't run */ <= (1000 + 1000))\n";
+
+ String sql = "DECLARE " + expectedNameToken + " CURSOR FOR " + expectedSelectStatement;
+ parseCursor(sql);
+
+ }
+
+ @Test
+ public void testQuoteEscapeCursor() throws Exception {
+ String expectedNameToken = "testCursor";
+ String expectedSelectStatement = "select a from b\n" +
+ "where ind.name = 'X''Y'\n";
+
+ String sql = "DECLARE " + expectedNameToken + " CURSOR FOR " + expectedSelectStatement;
+ parseCursor(sql);
+
+ }
+
+ @Test
+ public void testSubtractionInSelect() throws Exception {
+ String expectedNameToken = "testCursor";
+ String expectedSelectStatement = "select a, 3-1-2, -4- -1-1 from b\n" +
+ "where d = c - 1\n";
+
+ String sql = "DECLARE " + expectedNameToken + " CURSOR FOR " + expectedSelectStatement;
+ parseCursor(sql);
+
+ }
+
+ @Test
+ public void testNextValueForSelect() throws Exception {
+ String expectedNameToken = "testCursor";
+ String expectedSelectStatement = "select next value for foo.bar \n" +
+ "from core.custom_entity_data\n";
+
+ String sql = "DECLARE " + expectedNameToken + " CURSOR FOR " + expectedSelectStatement;
+ parseCursor(sql);
+
+ }
+
+ @Test
+ public void testPercentileQuery1() throws Exception {
+ String expectedNameToken = "testCursor";
+ String expectedSelectStatement = "select PERCENTILE_CONT(0.9) WITHIN GROUP (ORDER BY salary DESC) from core.custom_index_value ind";
+
+ String sql = "DECLARE " + expectedNameToken + " CURSOR FOR " + expectedSelectStatement;
+ parseCursor(sql);
+
+ }
+
+ @Test
+ public void testPercentileQuery2() throws Exception {
+ String expectedNameToken = "testCursor";
+ String expectedSelectStatement = "select PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY mark ASC) from core.custom_index_value ind";
+
+ String sql = "DECLARE " + expectedNameToken + " CURSOR FOR " + expectedSelectStatement;
+ parseCursor(sql);
+
+ }
+
+ @Test
+ public void testRowValueConstructorQuery() throws Exception {
+ String expectedNameToken = "testCursor";
+ String expectedSelectStatement = "select a_integer FROM aTable where (x_integer, y_integer) > (3, 4)";
+
+ String sql = "DECLARE " + expectedNameToken + " CURSOR FOR " + expectedSelectStatement;
+ parseCursor(sql);
+
+ }
+
+ @Test
+ public void testSingleTopLevelNot() throws Exception {
+ String expectedNameToken = "testCursor";
+ String expectedSelectStatement = "select * from t where not c = 5";
+
+ String sql = "DECLARE " + expectedNameToken + " CURSOR FOR " + expectedSelectStatement;
+ parseCursor(sql);
+ }
+
+ @Test
+ public void testHavingWithNot() throws Exception {
+ String expectedNameToken = "testCursor";
+ String expectedSelectStatement = "select\n" +
+ "\"WEB_STAT_ALIAS\".\"DOMAIN\" as \"c0\"\n" +
+ "from \"WEB_STAT\" \"WEB_STAT_ALIAS\"\n" +
+ "group by \"WEB_STAT_ALIAS\".\"DOMAIN\" having\n" +
+ "(\n" +
+ "(\n" +
+ "NOT\n" +
+ "(\n" +
+ "(sum(\"WEB_STAT_ALIAS\".\"ACTIVE_VISITOR\") is null)\n" +
+ ")\n" +
+ "OR NOT((sum(\"WEB_STAT_ALIAS\".\"ACTIVE_VISITOR\") is null))\n" +
+ ")\n" +
+ "OR NOT((sum(\"WEB_STAT_ALIAS\".\"ACTIVE_VISITOR\") is null))\n" +
+ ")\n" +
+ "order by CASE WHEN \"WEB_STAT_ALIAS\".\"DOMAIN\" IS NULL THEN 1 ELSE 0 END,\n" +
+ "\"WEB_STAT_ALIAS\".\"DOMAIN\" ASC";
+
+ String sql = "DECLARE " + expectedNameToken + " CURSOR FOR " + expectedSelectStatement;
+ parseCursor(sql);
+
+ }
+
+ @Test
+ public void testDoubleBackslash() throws Exception {
+ String expectedNameToken = "testCursor";
+ String expectedSelectStatement = "SELECT * FROM T WHERE A LIKE 'a\\(d'";
+
+ String sql = "DECLARE " + expectedNameToken + " CURSOR FOR " + expectedSelectStatement;
+ parseCursor(sql);
+
+ }
+
+ @Test
+ public void testOpenCursor() throws Exception {
+ String expectedNameToken = "testCursor";
+ String sql = "OPEN " + expectedNameToken;
+ parseOpen(sql);
+ }
+
+ @Test
+ public void testFetchNext() throws Exception {
+ String expectedNameToken = "testCursor";
+ String sql = "FETCH NEXT FROM " + expectedNameToken;
+ parseFetch(sql);
+ }
+
+}
[2/2] phoenix git commit: PHOENIX-3572 Support FETCH NEXT| n ROWS
from Cursor (Biju Nair)
Posted by an...@apache.org.
PHOENIX-3572 Support FETCH NEXT| n ROWS from Cursor (Biju Nair)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/2cb617f3
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/2cb617f3
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/2cb617f3
Branch: refs/heads/master
Commit: 2cb617f352048179439d242d1165a9ffb39ad81c
Parents: 1666e93
Author: Ankit Singhal <an...@gmail.com>
Authored: Wed May 17 12:56:42 2017 +0530
Committer: Ankit Singhal <an...@gmail.com>
Committed: Wed May 17 12:56:42 2017 +0530
----------------------------------------------------------------------
.../CursorWithRowValueConstructorIT.java | 687 +++++++++++++++++++
phoenix-core/src/main/antlr3/PhoenixSQL.g | 31 +
.../phoenix/compile/CloseStatementCompiler.java | 57 ++
.../phoenix/compile/DeclareCursorCompiler.java | 75 ++
.../phoenix/compile/OpenStatementCompiler.java | 57 ++
.../apache/phoenix/execute/CursorFetchPlan.java | 53 ++
.../phoenix/iterate/CursorResultIterator.java | 75 ++
.../apache/phoenix/jdbc/PhoenixStatement.java | 91 ++-
.../apache/phoenix/parse/CloseStatement.java | 40 ++
.../org/apache/phoenix/parse/CursorName.java | 26 +
.../phoenix/parse/DeclareCursorStatement.java | 60 ++
.../apache/phoenix/parse/FetchStatement.java | 52 ++
.../org/apache/phoenix/parse/OpenStatement.java | 40 ++
.../apache/phoenix/parse/ParseNodeFactory.java | 20 +
.../org/apache/phoenix/parse/SQLParser.java | 76 ++
.../apache/phoenix/schema/MetaDataClient.java | 20 +
.../org/apache/phoenix/util/CursorUtil.java | 189 +++++
.../java/org/apache/phoenix/util/ScanUtil.java | 6 +-
.../phoenix/compile/CursorCompilerTest.java | 87 +++
.../apache/phoenix/parse/CursorParserTest.java | 367 ++++++++++
20 files changed, 2106 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2cb617f3/phoenix-core/src/it/java/org/apache/phoenix/end2end/CursorWithRowValueConstructorIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CursorWithRowValueConstructorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CursorWithRowValueConstructorIT.java
new file mode 100644
index 0000000..dda4bd1
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CursorWithRowValueConstructorIT.java
@@ -0,0 +1,687 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.phoenix.util.*;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.sql.*;
+import java.util.Properties;
+import java.util.Random;
+
+import static org.apache.phoenix.util.TestUtil.*;
+import static org.junit.Assert.*;
+
+
+public class CursorWithRowValueConstructorIT extends ParallelStatsDisabledIT {
+ private static final String TABLE_NAME = "CursorRVCTestTable";
+ protected static final Log LOG = LogFactory.getLog(CursorWithRowValueConstructorIT.class);
+
+ public void createAndInitializeTestTable() throws SQLException {
+ Connection conn = DriverManager.getConnection(getUrl());
+
+ PreparedStatement stmt = conn.prepareStatement("CREATE TABLE IF NOT EXISTS " + TABLE_NAME +
+ "(a_id INTEGER NOT NULL, " +
+ "a_data INTEGER, " +
+ "CONSTRAINT my_pk PRIMARY KEY (a_id))");
+ stmt.execute();
+ synchronized (conn){
+ conn.commit();
+ }
+
+ //Upsert test values into the test table
+ Random rand = new Random();
+ stmt = conn.prepareStatement("UPSERT INTO " + TABLE_NAME +
+ "(a_id, a_data) VALUES (?,?)");
+ int rowCount = 0;
+ while(rowCount < 100){
+ stmt.setInt(1, rowCount);
+ stmt.setInt(2, rand.nextInt(501));
+ stmt.execute();
+ ++rowCount;
+ }
+ synchronized (conn){
+ conn.commit();
+ }
+ }
+
+ public void deleteTestTable() throws SQLException {
+ Connection conn = DriverManager.getConnection(getUrl());
+ PreparedStatement stmt = conn.prepareStatement("DROP TABLE IF EXISTS " + TABLE_NAME);
+ stmt.execute();
+ synchronized (conn){
+ conn.commit();
+ }
+ }
+
+ @Test
+ public void testCursorsOnTestTablePK() throws SQLException {
+ try{
+ createAndInitializeTestTable();
+ String querySQL = "SELECT a_id FROM " + TABLE_NAME;
+
+ //Test actual cursor implementation
+ String cursorSQL = "DECLARE testCursor CURSOR FOR " + querySQL;
+ DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).execute();
+ cursorSQL = "OPEN testCursor";
+ DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).execute();
+ cursorSQL = "FETCH NEXT FROM testCursor";
+ ResultSet rs = DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).executeQuery();
+ int rowID = 0;
+ while(rs.next()){
+ assertEquals(rowID,rs.getInt(1));
+ ++rowID;
+ rs = DriverManager.getConnection(getUrl()).createStatement().executeQuery(cursorSQL);
+ }
+ } finally{
+ DriverManager.getConnection(getUrl()).prepareStatement("CLOSE testCursor").execute();
+ deleteTestTable();
+ }
+
+ }
+
+ @Test
+ public void testCursorsOnRandomTableData() throws SQLException {
+ try{
+ createAndInitializeTestTable();
+ String querySQL = "SELECT a_id,a_data FROM " + TABLE_NAME + " ORDER BY a_data";
+ String cursorSQL = "DECLARE testCursor CURSOR FOR " + querySQL;
+ DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).execute();
+ cursorSQL = "OPEN testCursor";
+ DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).execute();
+ cursorSQL = "FETCH NEXT FROM testCursor";
+ ResultSet cursorRS = DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).executeQuery();
+ ResultSet rs = DriverManager.getConnection(getUrl()).prepareStatement(querySQL).executeQuery();
+ int rowCount = 0;
+ while(rs.next() && cursorRS.next()){
+ assertEquals(rs.getInt(2),cursorRS.getInt(2));
+ ++rowCount;
+ cursorRS = DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).executeQuery();
+ }
+ assertEquals(100, rowCount);
+ } finally{
+ DriverManager.getConnection(getUrl()).prepareStatement("CLOSE testCursor").execute();
+ deleteTestTable();
+ }
+ }
+
+ @Test
+ public void testCursorsOnTestTablePKDesc() throws SQLException {
+ try{
+ createAndInitializeTestTable();
+ String dummySQL = "SELECT a_id FROM " + TABLE_NAME + " ORDER BY a_id DESC";
+
+ String cursorSQL = "DECLARE testCursor CURSOR FOR " + dummySQL;
+ DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).execute();
+ cursorSQL = "OPEN testCursor";
+ DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).execute();
+ cursorSQL = "FETCH NEXT FROM testCursor";
+ ResultSet rs = DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).executeQuery();
+ int rowCount = 0;
+ while(rs.next()){
+ assertEquals(99-rowCount, rs.getInt(1));
+ rs = DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).executeQuery();
+ ++rowCount;
+ }
+ assertEquals(100, rowCount);
+ } finally{
+ DriverManager.getConnection(getUrl()).prepareStatement("CLOSE testCursor").execute();
+ deleteTestTable();
+ }
+ }
+
+ @Test
+ public void testCursorsOnTestTableNonPKDesc() throws SQLException {
+ try{
+ createAndInitializeTestTable();
+ String dummySQL = "SELECT a_data FROM " + TABLE_NAME + " ORDER BY a_data DESC";
+
+ String cursorSQL = "DECLARE testCursor CURSOR FOR " + dummySQL;
+ DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).execute();
+ cursorSQL = "OPEN testCursor";
+ DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).execute();
+ cursorSQL = "FETCH NEXT FROM testCursor";
+ ResultSet rs = DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).executeQuery();
+ int rowCount = 0;
+ while(rs.next()){
+ rs = DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).executeQuery();
+ ++rowCount;
+ }
+ assertEquals(100, rowCount);
+ } finally{
+ DriverManager.getConnection(getUrl()).prepareStatement("CLOSE testCursor").execute();
+ deleteTestTable();
+ }
+ }
+
+ @Test
+ public void testCursorsOnWildcardSelect() throws SQLException {
+ try{
+ createAndInitializeTestTable();
+ String querySQL = "SELECT * FROM " + TABLE_NAME;
+ ResultSet rs = DriverManager.getConnection(getUrl()).prepareStatement(querySQL).executeQuery();
+
+ String cursorSQL = "DECLARE testCursor CURSOR FOR "+querySQL;
+ DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).execute();
+ cursorSQL = "OPEN testCursor";
+ DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).execute();
+ cursorSQL = "FETCH NEXT FROM testCursor";
+ ResultSet cursorRS = DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).executeQuery();
+ int rowCount = 0;
+ while(rs.next() && cursorRS.next()){
+ assertEquals(rs.getInt(1),cursorRS.getInt(1));
+ ++rowCount;
+ cursorRS = DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).executeQuery();
+ }
+ assertEquals(100, rowCount);
+ } finally{
+ DriverManager.getConnection(getUrl()).prepareStatement("CLOSE testCursor").execute();
+ deleteTestTable();
+ }
+ }
+
+ @Test
+ public void testCursorsWithBindings() throws Exception {
+ long ts = nextTimestamp();
+ String tenantId = getOrganizationId();
+ initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl(), null);
+ ensureTableCreated(getUrl(), CUSTOM_ENTITY_DATA_FULL_NAME, CUSTOM_ENTITY_DATA_FULL_NAME, ts-1);
+ String query = "SELECT a_integer, x_integer FROM aTable WHERE ?=organization_id AND (a_integer, x_integer) = (7, 5)";
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String cursor = "DECLARE testCursor CURSOR FOR "+query;
+ try {
+ PreparedStatement statement = conn.prepareStatement(cursor);
+ statement.setString(1, tenantId);
+ statement.execute();
+ }catch(SQLException e){
+ assertTrue(e.getMessage().equalsIgnoreCase("Cannot declare cursor, internal SELECT statement contains bindings!"));
+ assertTrue(!CursorUtil.cursorDeclared("testCursor"));
+ return;
+ } finally {
+ cursor = "CLOSE testCursor";
+ conn.prepareStatement(cursor).execute();
+ conn.close();
+ }
+ fail();
+ }
+
+ @Test
+ public void testCursorsInWhereWithEqualsExpression() throws Exception {
+ long ts = nextTimestamp();
+ String tenantId = getOrganizationId();
+ initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl(), null);
+ ensureTableCreated(getUrl(), CUSTOM_ENTITY_DATA_FULL_NAME, CUSTOM_ENTITY_DATA_FULL_NAME, ts-1);
+ String query = "SELECT a_integer, x_integer FROM aTable WHERE '"+tenantId+"'=organization_id AND (a_integer, x_integer) = (7, 5)";
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String cursor = "DECLARE testCursor CURSOR FOR "+query;
+ try {
+ conn.prepareStatement(cursor).execute();
+ cursor = "OPEN testCursor";
+ conn.prepareStatement(cursor).execute();
+ cursor = "FETCH NEXT FROM testCursor";
+ ResultSet rs = conn.prepareStatement(cursor).executeQuery();
+ int count = 0;
+ while(rs.next()) {
+ assertTrue(rs.getInt(1) == 7);
+ assertTrue(rs.getInt(2) == 5);
+ count++;
+ rs = conn.prepareStatement(cursor).executeQuery();
+ }
+ assertTrue(count == 1);
+ } finally {
+ cursor = "CLOSE testCursor";
+ conn.prepareStatement(cursor).execute();
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testCursorsInWhereWithGreaterThanExpression() throws Exception {
+ long ts = nextTimestamp();
+ String tenantId = getOrganizationId();
+ initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl(), null);
+ ensureTableCreated(getUrl(), CUSTOM_ENTITY_DATA_FULL_NAME, CUSTOM_ENTITY_DATA_FULL_NAME, ts-1);
+ String query = "SELECT a_integer, x_integer FROM aTable WHERE '"+tenantId+"'=organization_id AND (a_integer, x_integer) >= (4, 4)";
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String cursor = "DECLARE testCursor CURSOR FOR "+query;
+ try {
+ conn.prepareStatement(cursor).execute();
+ cursor = "OPEN testCursor";
+ conn.prepareStatement(cursor).execute();
+ cursor = "FETCH NEXT FROM testCursor";
+ ResultSet rs = conn.prepareStatement(cursor).executeQuery();
+ int count = 0;
+ while(rs.next()) {
+ assertTrue(rs.getInt(1) >= 4);
+ assertTrue(rs.getInt(1) == 4 ? rs.getInt(2) >= 4 : rs.getInt(2) >= 0);
+ count++;
+ rs = conn.prepareStatement(cursor).executeQuery();
+ }
+ assertTrue(count == 5);
+ } finally {
+ cursor = "CLOSE testCursor";
+ conn.prepareStatement(cursor).execute();
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testCursorsInWhereWithUnEqualNumberArgs() throws Exception {
+ long ts = nextTimestamp();
+ String tenantId = getOrganizationId();
+ initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl(), null);
+ ensureTableCreated(getUrl(), CUSTOM_ENTITY_DATA_FULL_NAME, CUSTOM_ENTITY_DATA_FULL_NAME, ts-1);
+ String query = "SELECT a_integer, x_integer FROM aTable WHERE '"+tenantId+"'=organization_id AND (a_integer, x_integer, y_integer) >= (7, 5)";
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String cursor = "DECLARE testCursor CURSOR FOR "+query;
+ try {
+ double startTime = System.nanoTime();
+ conn.prepareStatement(cursor).execute();
+ cursor = "OPEN testCursor";
+ conn.prepareStatement(cursor).execute();
+ cursor = "FETCH NEXT FROM testCursor";
+ ResultSet rs = conn.prepareStatement(cursor).executeQuery();
+ int count = 0;
+ while(rs.next()) {
+ assertTrue(rs.getInt(1) >= 7);
+ assertTrue(rs.getInt(1) == 7 ? rs.getInt(2) >= 5 : rs.getInt(2) >= 0);
+ count++;
+ rs = conn.prepareStatement(cursor).executeQuery();
+ }
+ // we have key values (7,5) (8,4) and (9,3) present in aTable. So the query should return the 3 records.
+ assertTrue(count == 3);
+ double endTime = System.nanoTime();
+ System.out.println("Method Time in milliseconds: "+Double.toString((endTime-startTime)/1000000));
+ } finally {
+ cursor = "CLOSE testCursor";
+ conn.prepareStatement(cursor).execute();
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testCursorsOnLHSAndLiteralExpressionOnRHS() throws Exception {
+ long ts = nextTimestamp();
+ String tenantId = getOrganizationId();
+ initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl(), null);
+ ensureTableCreated(getUrl(), CUSTOM_ENTITY_DATA_FULL_NAME, CUSTOM_ENTITY_DATA_FULL_NAME, ts-1);
+ String query = "SELECT a_integer, x_integer FROM aTable WHERE '"+tenantId+"'=organization_id AND (a_integer, x_integer) >= 7";
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String cursor = "DECLARE testCursor CURSOR FOR "+query;
+ try {
+ conn.prepareStatement(cursor).execute();
+ cursor = "OPEN testCursor";
+ conn.prepareStatement(cursor).execute();
+ cursor = "FETCH NEXT FROM testCursor";
+ ResultSet rs = conn.prepareStatement(cursor).executeQuery();
+ int count = 0;
+ while(rs.next()) {
+ count++;
+ rs = conn.prepareStatement(cursor).executeQuery();
+ }
+ // we have key values (7,5) (8,4) and (9,3) present in aTable. So the query should return the 3 records.
+ assertTrue(count == 3);
+ } finally {
+ cursor = "CLOSE testCursor";
+ conn.prepareStatement(cursor).execute();
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testCursorsOnRHSLiteralExpressionOnLHS() throws Exception {
+ long ts = nextTimestamp();
+ String tenantId = getOrganizationId();
+ initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl(), null);
+ ensureTableCreated(getUrl(), CUSTOM_ENTITY_DATA_FULL_NAME, CUSTOM_ENTITY_DATA_FULL_NAME, ts-1);
+ String query = "SELECT a_integer, x_integer FROM aTable WHERE '"+tenantId+"'=organization_id AND 7 <= (a_integer, x_integer)";
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String cursor = "DECLARE testCursor CURSOR FOR "+query;
+ try {
+ conn.prepareStatement(cursor).execute();
+ cursor = "OPEN testCursor";
+ conn.prepareStatement(cursor).execute();
+ cursor = "FETCH NEXT FROM testCursor";
+ ResultSet rs = conn.prepareStatement(cursor).executeQuery();
+ int count = 0;
+ while(rs.next()) {
+ count++;
+ rs = conn.prepareStatement(cursor).executeQuery();
+ }
+ // we have key values (7,5) (8,4) and (9,3) present in aTable. So the query should return the 3 records.
+ assertTrue(count == 3);
+ } finally {
+ cursor = "CLOSE testCursor";
+ conn.prepareStatement(cursor).execute();
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testCursorsOnBuiltInFunctionOperatingOnIntegerLiteral() throws Exception {
+ long ts = nextTimestamp();
+ String tenantId = getOrganizationId();
+ initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl(), null);
+ ensureTableCreated(getUrl(), CUSTOM_ENTITY_DATA_FULL_NAME, CUSTOM_ENTITY_DATA_FULL_NAME, ts-1);
+ String query = "SELECT a_integer, x_integer FROM aTable WHERE '"+tenantId+"'=organization_id AND (a_integer, x_integer) >= to_number('7')";
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String cursor = "DECLARE testCursor CURSOR FOR "+query;
+ try {
+ conn.prepareStatement(cursor).execute();
+ cursor = "OPEN testCursor";
+ conn.prepareStatement(cursor).execute();
+ cursor = "FETCH NEXT FROM testCursor";
+ ResultSet rs = conn.prepareStatement(cursor).executeQuery();
+ int count = 0;
+ while(rs.next()) {
+ count++;
+ rs = conn.prepareStatement(cursor).executeQuery();
+ }
+ // we have key values (7,5) (8,4) and (9,3) present in aTable. So the query should return the 3 records.
+ assertEquals(3, count);
+ } finally {
+ cursor = "CLOSE testCursor";
+ conn.prepareStatement(cursor).execute();
+ conn.close();
+ }
+ }
+
+ @Test
+ /**
+ * Test for the precision of Date datatype when used as part of a filter within the internal Select statement.
+ */
+ public void testCursorsWithDateDatatypeFilter() throws Exception {
+ long ts = nextTimestamp();
+ String tenantId = getOrganizationId();
+ long currentTime = System.currentTimeMillis();
+ java.sql.Date date = new java.sql.Date(currentTime);
+ String strCurrentDate = date.toString();
+
+ //Sets date to <yesterday's date> 23:59:59.999
+ while(date.toString().equals(strCurrentDate)){
+ currentTime -= 1;
+ date = new Date(currentTime);
+ }
+ //Sets date to <today's date> 00:00:00.001
+ date = new Date(currentTime+2);
+ java.sql.Date midnight = new Date(currentTime+1);
+
+
+ initEntityHistoryTableValues(tenantId, getDefaultSplits(tenantId), date, ts);
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2));
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+
+
+ String query = "select parent_id from " + ENTITY_HISTORY_TABLE_NAME +
+ " WHERE (organization_id, parent_id, created_date, entity_history_id) IN ((?,?,?,?),(?,?,?,?))";
+
+ query = query.replaceFirst("\\?", "'"+tenantId+"'");
+ query = query.replaceFirst("\\?", "'"+PARENTID3+"'");
+ query = query.replaceFirst("\\?", "TO_DATE('"+DateUtil.getDateFormatter(DateUtil.DEFAULT_DATE_FORMAT).format(date)+"')");
+ query = query.replaceFirst("\\?", "'"+ENTITYHISTID3+"'");
+ query = query.replaceFirst("\\?", "'"+tenantId+"'");
+ query = query.replaceFirst("\\?", "'"+PARENTID7+"'");
+ query = query.replaceFirst("\\?", "TO_DATE('"+DateUtil.getDateFormatter(DateUtil.DEFAULT_DATE_FORMAT).format(date)+"')");
+ query = query.replaceFirst("\\?", "'"+ENTITYHISTID7+"'");
+ String cursor = "DECLARE testCursor CURSOR FOR "+query;
+
+ conn.prepareStatement(cursor).execute();
+ cursor = "OPEN testCursor";
+ conn.prepareStatement(cursor).execute();
+ cursor = "FETCH NEXT FROM testCursor";
+
+ ResultSet rs = conn.prepareStatement(cursor).executeQuery();
+ assertTrue(rs.next());
+ assertEquals(PARENTID3, rs.getString(1));
+ rs = conn.prepareStatement(cursor).executeQuery();
+ assertTrue(rs.next());
+ assertEquals(PARENTID7, rs.getString(1));
+ assertFalse(rs.next());
+
+ //Test against the same table for the same records, but this time use the 'midnight' java.sql.Date instance.
+ //'midnight' is identical to 'date' to the tens of millisecond precision.
+ query = "select parent_id from " + ENTITY_HISTORY_TABLE_NAME +
+ " WHERE (organization_id, parent_id, created_date, entity_history_id) IN ((?,?,?,?),(?,?,?,?))";
+ query = query.replaceFirst("\\?", "'"+tenantId+"'");
+ query = query.replaceFirst("\\?", "'"+PARENTID3+"'");
+ query = query.replaceFirst("\\?", "TO_DATE('"+DateUtil.getDateFormatter(DateUtil.DEFAULT_DATE_FORMAT).format(midnight)+"')");
+ query = query.replaceFirst("\\?", "'"+ENTITYHISTID3+"'");
+ query = query.replaceFirst("\\?", "'"+tenantId+"'");
+ query = query.replaceFirst("\\?", "'"+PARENTID7+"'");
+ query = query.replaceFirst("\\?", "TO_DATE('"+DateUtil.getDateFormatter(DateUtil.DEFAULT_DATE_FORMAT).format(midnight)+"')");
+ query = query.replaceFirst("\\?", "'"+ENTITYHISTID7+"'");
+ cursor = "DECLARE testCursor2 CURSOR FOR "+query;
+
+ conn.prepareStatement(cursor).execute();
+ cursor = "OPEN testCursor2";
+ conn.prepareStatement(cursor).execute();
+ cursor = "FETCH NEXT FROM testCursor2";
+
+ rs = conn.prepareStatement(cursor).executeQuery();
+ assertTrue(!rs.next());
+ String sql = "CLOSE testCursor";
+ conn.prepareStatement(sql).execute();
+ sql = "CLOSE testCursor2";
+ conn.prepareStatement(sql).execute();
+ }
+
+ @Test
+ public void testCursorsWithNonLeadingPkColsOfTypesTimeStampAndVarchar() throws Exception {
+ long ts = nextTimestamp();
+ String tenantId = getOrganizationId();
+ initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl(), null);
+ ensureTableCreated(getUrl(), CUSTOM_ENTITY_DATA_FULL_NAME, CUSTOM_ENTITY_DATA_FULL_NAME, ts-1);
+ String updateStmt =
+ "upsert into " +
+ "ATABLE(" +
+ " ORGANIZATION_ID, " +
+ " ENTITY_ID, " +
+ " A_TIMESTAMP) " +
+ "VALUES (?, ?, ?)";
+ String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 1);
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection upsertConn = DriverManager.getConnection(url, props);
+ upsertConn.setAutoCommit(true);
+ PreparedStatement stmt = upsertConn.prepareStatement(updateStmt);
+ stmt.setString(1, tenantId);
+ stmt.setString(2, ROW4);
+ Timestamp tsValue = new Timestamp(System.nanoTime());
+ stmt.setTimestamp(3, tsValue);
+ stmt.execute();
+
+ String query = "SELECT a_timestamp, a_string FROM aTable WHERE ?=organization_id AND (a_timestamp, a_string) = (?, 'a')";
+ query = query.replaceFirst("\\?", "'"+tenantId+"'");
+ query = query.replaceFirst("\\?", "TO_DATE('"+DateUtil.getDateFormatter(DateUtil.DEFAULT_TIMESTAMP_FORMAT).format(tsValue)+"')");
+
+ props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ try {
+ String cursor = "DECLARE testCursor CURSOR FOR "+query;
+ conn.prepareStatement(cursor).execute();
+ cursor = "OPEN testCursor";
+ conn.prepareStatement(cursor).execute();
+ cursor = "FETCH NEXT FROM testCursor";
+
+ ResultSet rs = conn.prepareStatement(cursor).executeQuery();
+ int count = 0;
+ while(rs.next()) {
+ assertTrue(rs.getTimestamp(1).equals(tsValue));
+ assertTrue(rs.getString(2).compareTo("a") == 0);
+ count++;
+ rs = conn.prepareStatement(cursor).executeQuery();
+ }
+ assertTrue(count == 1);
+ } finally {
+ String sql = "CLOSE testCursor";
+ conn.prepareStatement(sql).execute();
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testCursorsQueryMoreWithInListClausePossibleNullValues() throws Exception {
+ long ts = nextTimestamp();
+ String tenantId = getOrganizationId();
+ initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl(), null);
+ ensureTableCreated(getUrl(), CUSTOM_ENTITY_DATA_FULL_NAME, CUSTOM_ENTITY_DATA_FULL_NAME, ts-1);
+ String updateStmt =
+ "upsert into " +
+ "ATABLE(ORGANIZATION_ID, ENTITY_ID, Y_INTEGER, X_INTEGER) VALUES (?, ?, ?, ?)";
+ String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 1);
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection upsertConn = DriverManager.getConnection(url, props);
+ upsertConn.setAutoCommit(true);
+ PreparedStatement stmt = upsertConn.prepareStatement(updateStmt);
+ stmt.setString(1, tenantId);
+ stmt.setString(2, ROW4);
+ stmt.setInt(3, 4);
+ stmt.setInt(4, 5);
+ stmt.execute();
+
+ //we have a row present in aTable where x_integer = 5 and y_integer = NULL which gets translated to 0 when retriving from HBase.
+ String query = "SELECT x_integer, y_integer FROM aTable WHERE ? = organization_id AND (x_integer) IN ((5))";
+
+ query = query.replaceFirst("\\?", "'"+tenantId+"'");
+
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+
+ try {
+ String cursor = "DECLARE testCursor CURSOR FOR "+query;
+ conn.prepareStatement(cursor).execute();
+ cursor = "OPEN testCursor";
+ conn.prepareStatement(cursor).execute();
+ cursor = "FETCH NEXT FROM testCursor";
+
+ ResultSet rs = conn.prepareStatement(cursor).executeQuery();
+ assertTrue(rs.next());
+ assertEquals(5, rs.getInt(1));
+ assertEquals(4, rs.getInt(2));
+ rs = conn.prepareStatement(cursor).executeQuery();
+ assertTrue(rs.next());
+ assertEquals(5, rs.getInt(1));
+ assertEquals(0, rs.getInt(2));
+ } finally {
+ String sql = "CLOSE testCursor";
+ conn.prepareStatement(sql).execute();
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testCursorsWithColsOfTypesDecimal() throws Exception {
+ long ts = nextTimestamp();
+ String tenantId = getOrganizationId();
+ initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl(), null);
+ ensureTableCreated(getUrl(), CUSTOM_ENTITY_DATA_FULL_NAME, CUSTOM_ENTITY_DATA_FULL_NAME, ts-1);
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+
+ String query = "SELECT x_decimal FROM aTable WHERE ?=organization_id AND entity_id IN (?,?,?)";
+ query = query.replaceFirst("\\?", "'"+tenantId+"'");
+ query = query.replaceFirst("\\?", "'"+ROW7+"'");
+ query = query.replaceFirst("\\?", "'"+ROW8+"'");
+ query = query.replaceFirst("\\?", "'"+ROW9+"'");
+
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ try {
+ String cursor = "DECLARE testCursor CURSOR FOR "+query;
+ conn.prepareStatement(cursor).execute();
+ cursor = "OPEN testCursor";
+ conn.prepareStatement(cursor).execute();
+ cursor = "FETCH NEXT FROM testCursor";
+
+ ResultSet rs = conn.prepareStatement(cursor).executeQuery();
+ int count = 0;
+ while(rs.next()) {
+ assertTrue(BigDecimal.valueOf(0.1).equals(rs.getBigDecimal(1)) || BigDecimal.valueOf(3.9).equals(rs.getBigDecimal(1)) || BigDecimal.valueOf(3.3).equals(rs.getBigDecimal(1)));
+ count++;
+ if(count == 3) break;
+ rs = conn.prepareStatement(cursor).executeQuery();
+ }
+ assertTrue(count == 3);
+ } finally {
+ String sql = "CLOSE testCursor";
+ conn.prepareStatement(sql).execute();
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testCursorsWithColsOfTypesTinyintSmallintFloatDouble() throws Exception {
+ long ts = nextTimestamp();
+ String tenantId = getOrganizationId();
+ initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl(), null);
+ ensureTableCreated(getUrl(), CUSTOM_ENTITY_DATA_FULL_NAME, CUSTOM_ENTITY_DATA_FULL_NAME, ts-1);
+ String query = "SELECT a_byte,a_short,a_float,a_double FROM aTable WHERE ?=organization_id AND entity_id IN (?,?,?)";
+ query = query.replaceFirst("\\?", "'"+tenantId+"'");
+ query = query.replaceFirst("\\?", "'"+ROW1+"'");
+ query = query.replaceFirst("\\?", "'"+ROW2+"'");
+ query = query.replaceFirst("\\?", "'"+ROW3+"'");
+
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ try {
+ String cursor = "DECLARE testCursor CURSOR FOR "+query;
+ conn.prepareStatement(cursor).execute();
+ cursor = "OPEN testCursor";
+ conn.prepareStatement(cursor).execute();
+ cursor = "FETCH NEXT FROM testCursor";
+
+ ResultSet rs = conn.prepareStatement(cursor).executeQuery();
+ int count = 0;
+ while(rs.next()) {
+ assertTrue((byte)1 == (rs.getByte(1)) || (byte)2 == (rs.getByte(1)) || (byte)3 == (rs.getByte(1)));
+ assertTrue((short)128 == (rs.getShort(2)) || (short)129 == (rs.getShort(2)) || (short)130 == (rs.getShort(2)));
+ assertTrue(0.01f == (rs.getFloat(3)) || 0.02f == (rs.getFloat(3)) || 0.03f == (rs.getFloat(3)));
+ assertTrue(0.0001 == (rs.getDouble(4)) || 0.0002 == (rs.getDouble(4)) || 0.0003 == (rs.getDouble(4)));
+ count++;
+ if(count == 3) break;
+ rs = conn.prepareStatement(cursor).executeQuery();
+ }
+ assertTrue(count == 3);
+ } finally {
+ String sql = "CLOSE testCursor";
+ conn.prepareStatement(sql).execute();
+ conn.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2cb617f3/phoenix-core/src/main/antlr3/PhoenixSQL.g
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/antlr3/PhoenixSQL.g b/phoenix-core/src/main/antlr3/PhoenixSQL.g
index 07a51ce..66d50e9 100644
--- a/phoenix-core/src/main/antlr3/PhoenixSQL.g
+++ b/phoenix-core/src/main/antlr3/PhoenixSQL.g
@@ -130,6 +130,10 @@ tokens
USE='use';
OFFSET ='offset';
FETCH = 'fetch';
+ DECLARE = 'declare';
+ CURSOR = 'cursor';
+ OPEN = 'open';
+ CLOSE = 'close';
ROW = 'row';
ROWS = 'rows';
ONLY = 'only';
@@ -409,6 +413,10 @@ oneStatement returns [BindableStatement ret]
| s=create_schema_node
| s=create_view_node
| s=create_index_node
+ | s=cursor_open_node
+ | s=cursor_close_node
+ | s=cursor_fetch_node
+ | s=declare_cursor_node
| s=drop_table_node
| s=drop_index_node
| s=alter_index_node
@@ -744,6 +752,25 @@ upsert_column_refs returns [Pair<List<ColumnDef>,List<ColumnName>> ret]
(COMMA d=dyn_column_name_or_def { if (d.getDataType()!=null) { $ret.getFirst().add(d); } $ret.getSecond().add(d.getColumnDefName()); } )*
;
+
+// Parse a full declare cursor expression structure.
+declare_cursor_node returns [DeclareCursorStatement ret]
+ : DECLARE c=cursor_name CURSOR FOR s=select_node
+ {ret = factory.declareCursor(c, s); }
+ ;
+
+cursor_open_node returns [OpenStatement ret]
+ : OPEN c=cursor_name {ret = factory.open(c);}
+ ;
+
+cursor_close_node returns [CloseStatement ret]
+ : CLOSE c=cursor_name {ret = factory.close(c);}
+ ;
+
+cursor_fetch_node returns [FetchStatement ret]
+ : FETCH NEXT (a=NUMBER)? (ROW|ROWS)? FROM c=cursor_name {ret = factory.fetch(c,true, a == null ? 1 : Integer.parseInt( a.getText() )); }
+ ;
+
// Parse a full delete expression structure.
delete_node returns [DeleteStatement ret]
: DELETE (hint=hintClause)? FROM t=from_table_name
@@ -1033,6 +1060,10 @@ index_name returns [NamedNode ret]
: name=identifier {$ret = factory.indexName(name); }
;
+cursor_name returns [CursorName ret]
+ : name=identifier {$ret = factory.cursorName(name);}
+ ;
+
// TODO: figure out how not repeat this two times
table_name returns [TableName ret]
: t=identifier {$ret = factory.table(null, t); }
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2cb617f3/phoenix-core/src/main/java/org/apache/phoenix/compile/CloseStatementCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/CloseStatementCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/CloseStatementCompiler.java
new file mode 100644
index 0000000..cc53a9d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/CloseStatementCompiler.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+import org.apache.phoenix.parse.CloseStatement;
+import org.apache.phoenix.parse.OpenStatement;
+import org.apache.phoenix.schema.MetaDataClient;
+
+import java.sql.SQLException;
+import java.util.Collections;
+
+public class CloseStatementCompiler {
+ private final PhoenixStatement statement;
+ private final Operation operation;
+
+ public CloseStatementCompiler(PhoenixStatement statement, Operation operation) {
+ this.statement = statement;
+ this.operation = operation;
+ }
+
+ public MutationPlan compile(final CloseStatement close) throws SQLException {
+ final PhoenixConnection connection = statement.getConnection();
+ final StatementContext context = new StatementContext(statement);
+ final MetaDataClient client = new MetaDataClient(connection);
+
+ return new BaseMutationPlan(context, operation) {
+ @Override
+ public MutationState execute() throws SQLException {
+ return client.close(close);
+ }
+
+ @Override
+ public ExplainPlan getExplainPlan() throws SQLException {
+ return new ExplainPlan(Collections.singletonList("CLOSE CURSOR"));
+ }
+ };
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2cb617f3/phoenix-core/src/main/java/org/apache/phoenix/compile/DeclareCursorCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeclareCursorCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeclareCursorCompiler.java
new file mode 100644
index 0000000..5280291
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeclareCursorCompiler.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+import org.apache.phoenix.parse.CreateIndexStatement;
+import org.apache.phoenix.parse.DeclareCursorStatement;
+import org.apache.phoenix.parse.ParseNode;
+import org.apache.phoenix.util.CursorUtil;
+import org.apache.phoenix.schema.MetaDataClient;
+import org.apache.phoenix.schema.PTable.IndexType;
+
+import java.sql.ParameterMetaData;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+
+public class DeclareCursorCompiler {
+ private final PhoenixStatement statement;
+ private final Operation operation;
+ private QueryPlan queryPlan;
+
+ public DeclareCursorCompiler(PhoenixStatement statement, Operation operation, QueryPlan queryPlan) {
+ this.statement = statement;
+ this.operation = operation;
+ this.queryPlan = queryPlan;
+ }
+
+ public MutationPlan compile(final DeclareCursorStatement declare) throws SQLException {
+ if(declare.getBindCount() != 0){
+ throw new SQLException("Cannot declare cursor, internal SELECT statement contains bindings!");
+ }
+
+ final PhoenixConnection connection = statement.getConnection();
+ final StatementContext context = new StatementContext(statement);
+ final MetaDataClient client = new MetaDataClient(connection);
+
+ return new BaseMutationPlan(context, operation) {
+ @Override
+ public MutationState execute() throws SQLException {
+ return client.declareCursor(declare, queryPlan);
+ }
+
+ @Override
+ public ExplainPlan getExplainPlan() throws SQLException {
+ return new ExplainPlan(Collections.singletonList("DECLARE CURSOR"));
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2cb617f3/phoenix-core/src/main/java/org/apache/phoenix/compile/OpenStatementCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/OpenStatementCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/OpenStatementCompiler.java
new file mode 100644
index 0000000..b6125fd
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/OpenStatementCompiler.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+import org.apache.phoenix.parse.DeclareCursorStatement;
+import org.apache.phoenix.parse.OpenStatement;
+import org.apache.phoenix.schema.MetaDataClient;
+
+import java.sql.SQLException;
+import java.util.Collections;
+
+public class OpenStatementCompiler {
+ private final PhoenixStatement statement;
+ private final Operation operation;
+
+ public OpenStatementCompiler(PhoenixStatement statement, Operation operation) {
+ this.statement = statement;
+ this.operation = operation;
+ }
+
+ public MutationPlan compile(final OpenStatement open) throws SQLException {
+ final PhoenixConnection connection = statement.getConnection();
+ final StatementContext context = new StatementContext(statement);
+ final MetaDataClient client = new MetaDataClient(connection);
+
+ return new BaseMutationPlan(context, operation) {
+ @Override
+ public MutationState execute() throws SQLException {
+ return client.open(open);
+ }
+
+ @Override
+ public ExplainPlan getExplainPlan() throws SQLException {
+ return new ExplainPlan(Collections.singletonList("OPEN CURSOR"));
+ }
+ };
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2cb617f3/phoenix-core/src/main/java/org/apache/phoenix/execute/CursorFetchPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/CursorFetchPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/CursorFetchPlan.java
new file mode 100644
index 0000000..aaea13e
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CursorFetchPlan.java
@@ -0,0 +1,53 @@
+package org.apache.phoenix.execute;
+
+import java.sql.SQLException;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.ExplainPlan;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.iterate.CursorResultIterator;
+import org.apache.phoenix.iterate.LookAheadResultIterator;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
+import org.apache.phoenix.iterate.ResultIterator;
+
+public class CursorFetchPlan extends DelegateQueryPlan {
+
+ private CursorResultIterator resultIterator;
+ private int fetchSize;
+ private boolean isAggregate;
+ private String cursorName;
+
+ public CursorFetchPlan(QueryPlan cursorQueryPlan,String cursorName) {
+ super(cursorQueryPlan);
+ this.isAggregate = delegate.getStatement().isAggregate() || delegate.getStatement().isDistinct();
+ this.cursorName = cursorName;
+ }
+
+ @Override
+ public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
+ StatementContext context = delegate.getContext();
+ if (resultIterator == null) {
+ context.getOverallQueryMetrics().startQuery();
+ resultIterator = new CursorResultIterator(LookAheadResultIterator.wrap(delegate.iterator(scanGrouper, scan)),cursorName);
+ }
+ return resultIterator;
+ }
+
+
+ @Override
+ public ExplainPlan getExplainPlan() throws SQLException {
+ return delegate.getExplainPlan();
+ }
+
+ public void setFetchSize(int fetchSize){
+ this.fetchSize = fetchSize;
+ }
+
+ public int getFetchSize() {
+ return fetchSize;
+ }
+
+ public boolean isAggregate(){
+ return this.isAggregate;
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2cb617f3/phoenix-core/src/main/java/org/apache/phoenix/iterate/CursorResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/CursorResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/CursorResultIterator.java
new file mode 100644
index 0000000..7ff2785
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/CursorResultIterator.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.CursorUtil;
+
+import java.sql.SQLException;
+import java.util.List;
+
+public class CursorResultIterator implements ResultIterator {
+ private String cursorName;
+ private PeekingResultIterator delegate;
+ //TODO Configure fetch size from FETCH call
+ private int fetchSize = 0;
+ private int rowsRead = 0;
+ public CursorResultIterator(PeekingResultIterator delegate, String cursorName) {
+ this.delegate = delegate;
+ this.cursorName = cursorName;
+ }
+
+ @Override
+ public Tuple next() throws SQLException {
+ if(!CursorUtil.moreValues(cursorName)){
+ return null;
+ } else if (fetchSize == rowsRead) {
+ return null;
+ }
+
+ Tuple next = delegate.next();
+ CursorUtil.updateCursor(cursorName,next, delegate.peek());
+ rowsRead++;
+ return next;
+ }
+
+ @Override
+ public void explain(List<String> planSteps) {
+ delegate.explain(planSteps);
+ planSteps.add("CLIENT CURSOR " + cursorName);
+ }
+
+ @Override
+ public String toString() {
+ return "CursorResultIterator [cursor=" + cursorName + "]";
+ }
+
+ @Override
+ public void close() throws SQLException {
+ //NOP
+ }
+
+ public void closeCursor() throws SQLException {
+ delegate.close();
+ }
+
+ public void setFetchSize(int fetchSize){
+ this.fetchSize = fetchSize;
+ this.rowsRead = 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2cb617f3/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index f3c6d30..6981a30 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.call.CallRunner;
import org.apache.phoenix.compile.BaseMutationPlan;
+import org.apache.phoenix.compile.CloseStatementCompiler;
import org.apache.phoenix.compile.ColumnProjector;
import org.apache.phoenix.compile.ColumnResolver;
import org.apache.phoenix.compile.CreateFunctionCompiler;
@@ -55,6 +56,7 @@ import org.apache.phoenix.compile.CreateIndexCompiler;
import org.apache.phoenix.compile.CreateSchemaCompiler;
import org.apache.phoenix.compile.CreateSequenceCompiler;
import org.apache.phoenix.compile.CreateTableCompiler;
+import org.apache.phoenix.compile.DeclareCursorCompiler;
import org.apache.phoenix.compile.DeleteCompiler;
import org.apache.phoenix.compile.DropSequenceCompiler;
import org.apache.phoenix.compile.ExplainPlan;
@@ -63,6 +65,7 @@ import org.apache.phoenix.compile.FromCompiler;
import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
import org.apache.phoenix.compile.ListJarsQueryPlan;
import org.apache.phoenix.compile.MutationPlan;
+import org.apache.phoenix.compile.OpenStatementCompiler;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
import org.apache.phoenix.compile.QueryCompiler;
import org.apache.phoenix.compile.QueryPlan;
@@ -91,13 +94,16 @@ import org.apache.phoenix.parse.AliasedNode;
import org.apache.phoenix.parse.AlterIndexStatement;
import org.apache.phoenix.parse.AlterSessionStatement;
import org.apache.phoenix.parse.BindableStatement;
+import org.apache.phoenix.parse.CloseStatement;
import org.apache.phoenix.parse.ColumnDef;
import org.apache.phoenix.parse.ColumnName;
+import org.apache.phoenix.parse.CursorName;
import org.apache.phoenix.parse.CreateFunctionStatement;
import org.apache.phoenix.parse.CreateIndexStatement;
import org.apache.phoenix.parse.CreateSchemaStatement;
import org.apache.phoenix.parse.CreateSequenceStatement;
import org.apache.phoenix.parse.CreateTableStatement;
+import org.apache.phoenix.parse.DeclareCursorStatement;
import org.apache.phoenix.parse.DeleteJarStatement;
import org.apache.phoenix.parse.DeleteStatement;
import org.apache.phoenix.parse.DropColumnStatement;
@@ -107,6 +113,7 @@ import org.apache.phoenix.parse.DropSchemaStatement;
import org.apache.phoenix.parse.DropSequenceStatement;
import org.apache.phoenix.parse.DropTableStatement;
import org.apache.phoenix.parse.ExecuteUpgradeStatement;
+import org.apache.phoenix.parse.FetchStatement;
import org.apache.phoenix.parse.ExplainStatement;
import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.parse.HintNode;
@@ -117,6 +124,7 @@ import org.apache.phoenix.parse.LiteralParseNode;
import org.apache.phoenix.parse.NamedNode;
import org.apache.phoenix.parse.NamedTableNode;
import org.apache.phoenix.parse.OffsetNode;
+import org.apache.phoenix.parse.OpenStatement;
import org.apache.phoenix.parse.OrderByNode;
import org.apache.phoenix.parse.PFunction;
import org.apache.phoenix.parse.ParseNode;
@@ -155,6 +163,7 @@ import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PVarchar;
import org.apache.phoenix.trace.util.Tracing;
import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.CursorUtil;
import org.apache.phoenix.util.KeyValueUtil;
import org.apache.phoenix.util.LogUtil;
import org.apache.phoenix.util.PhoenixContextExecutor;
@@ -294,7 +303,7 @@ public class PhoenixStatement implements Statement, SQLCloseable {
}
StatementContext context = plan.getContext();
context.getOverallQueryMetrics().startQuery();
- PhoenixResultSet rs = newResultSet(resultIterator, plan.getProjector(), context);
+ PhoenixResultSet rs = newResultSet(resultIterator, plan.getProjector(), plan.getContext());
resultSets.add(rs);
setLastQueryPlan(plan);
setLastResultSet(rs);
@@ -403,6 +412,13 @@ public class PhoenixStatement implements Statement, SQLCloseable {
super(from, hint, isDistinct, select, where, groupBy, having, orderBy, limit, offset, bindCount, isAggregate, hasSequence, selects, udfParseNodes);
}
+ private ExecutableSelectStatement(ExecutableSelectStatement select) {
+ this(select.getFrom(), select.getHint(), select.isDistinct(), select.getSelect(), select.getWhere(),
+ select.getGroupBy(), select.getHaving(), select.getOrderBy(), select.getLimit(), select.getOffset(), select.getBindCount(),
+ select.isAggregate(), select.hasSequence(), select.getSelects(), select.getUdfParseNodes());
+ }
+
+
@SuppressWarnings("unchecked")
@Override
public QueryPlan compilePlan(PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException {
@@ -417,6 +433,7 @@ public class PhoenixStatement implements Statement, SQLCloseable {
resolver = FromCompiler.getResolverForQuery(transformedSelect, stmt.getConnection());
select = StatementNormalizer.normalize(transformedSelect, resolver);
}
+
QueryPlan plan = new QueryCompiler(stmt, select, resolver, Collections.<PDatum>emptyList(), stmt.getConnection().getIteratorFactory(), new SequenceManager(stmt), true).compile();
plan.getContext().getSequenceManager().validateSequences(seqAction);
return plan;
@@ -765,6 +782,56 @@ public class PhoenixStatement implements Statement, SQLCloseable {
}
}
+ private static class ExecutableDeclareCursorStatement extends DeclareCursorStatement implements CompilableStatement {
+ public ExecutableDeclareCursorStatement(CursorName cursor, SelectStatement select){
+ super(cursor, select);
+ }
+
+ @Override
+ public MutationPlan compilePlan(PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException {
+ ExecutableSelectStatement wrappedSelect = new ExecutableSelectStatement(
+ (ExecutableSelectStatement) stmt.parseStatement(this.getQuerySQL()));
+ DeclareCursorCompiler compiler = new DeclareCursorCompiler(stmt, this.getOperation(),wrappedSelect.compilePlan(stmt, seqAction));
+ return compiler.compile(this);
+ }
+ }
+
+ private static class ExecutableOpenStatement extends OpenStatement implements CompilableStatement {
+ public ExecutableOpenStatement(CursorName cursor){
+ super(cursor);
+ }
+
+ @Override
+ public MutationPlan compilePlan(PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException {
+ OpenStatementCompiler compiler = new OpenStatementCompiler(stmt, this.getOperation());
+ return compiler.compile(this);
+ }
+ }
+
+ private static class ExecutableCloseStatement extends CloseStatement implements CompilableStatement {
+ public ExecutableCloseStatement(CursorName cursor){
+ super(cursor);
+ }
+
+ @Override
+ public MutationPlan compilePlan(PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException {
+ CloseStatementCompiler compiler = new CloseStatementCompiler(stmt, this.getOperation());
+ return compiler.compile(this);
+ }
+ }
+
+ private static class ExecutableFetchStatement extends FetchStatement implements CompilableStatement {
+ public ExecutableFetchStatement(CursorName cursor, boolean isNext, int fetchLimit){
+ super(cursor, isNext, fetchLimit);
+ }
+
+ @Override
+ public QueryPlan compilePlan(PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException {
+ return CursorUtil.getFetchPlan(this.getCursorName().getName(), this.isNext(), this.getFetchSize());
+ }
+
+ }
+
private static class ExecutableDeleteJarStatement extends DeleteJarStatement implements CompilableStatement {
public ExecutableDeleteJarStatement(LiteralParseNode jarPath) {
@@ -1209,7 +1276,27 @@ public class PhoenixStatement implements Statement, SQLCloseable {
Map<String, UDFParseNode> udfParseNodes, List<Pair<ColumnName,ParseNode>> onDupKeyPairs) {
return new ExecutableUpsertStatement(table, hintNode, columns, values, select, bindCount, udfParseNodes, onDupKeyPairs);
}
-
+
+ @Override
+ public ExecutableDeclareCursorStatement declareCursor(CursorName cursor, SelectStatement select){
+ return new ExecutableDeclareCursorStatement(cursor, select);
+ }
+
+ @Override
+ public ExecutableFetchStatement fetch(CursorName cursor, boolean isNext, int fetchLimit){
+ return new ExecutableFetchStatement(cursor, isNext, fetchLimit);
+ }
+
+ @Override
+ public ExecutableOpenStatement open(CursorName cursor){
+ return new ExecutableOpenStatement(cursor);
+ }
+
+ @Override
+ public ExecutableCloseStatement close(CursorName cursor){
+ return new ExecutableCloseStatement(cursor);
+ }
+
@Override
public ExecutableDeleteStatement delete(NamedTableNode table, HintNode hint, ParseNode whereNode, List<OrderByNode> orderBy, LimitNode limit, int bindCount, Map<String, UDFParseNode> udfParseNodes) {
return new ExecutableDeleteStatement(table, hint, whereNode, orderBy, limit, bindCount, udfParseNodes);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2cb617f3/phoenix-core/src/main/java/org/apache/phoenix/parse/CloseStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/CloseStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/CloseStatement.java
new file mode 100644
index 0000000..5d7af34
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/CloseStatement.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+
+public class CloseStatement implements BindableStatement {
+ private final CursorName cursorName;
+
+ public CloseStatement(CursorName cursorName){
+ this.cursorName = cursorName;
+ }
+
+ public String getCursorName(){
+ return cursorName.getName();
+ }
+
+ public int getBindCount(){
+ return 0;
+ }
+
+ public Operation getOperation(){
+ return Operation.UPSERT;
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2cb617f3/phoenix-core/src/main/java/org/apache/phoenix/parse/CursorName.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/CursorName.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/CursorName.java
new file mode 100644
index 0000000..5b9de76
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/CursorName.java
@@ -0,0 +1,26 @@
+package org.apache.phoenix.parse;
+
+import org.apache.phoenix.util.SchemaUtil;
+
+public class CursorName {
+ private final String name;
+ private final boolean isCaseSensitive;
+
+ public CursorName(String name, boolean isCaseSensitive){
+ this.name = name;
+ this.isCaseSensitive = isCaseSensitive;
+ }
+
+ public CursorName(String name){
+ this.name = name;
+ this.isCaseSensitive = name == null ? false: SchemaUtil.isCaseSensitive(name);
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public boolean isCaseSensitive() {
+ return isCaseSensitive;
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2cb617f3/phoenix-core/src/main/java/org/apache/phoenix/parse/DeclareCursorStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/DeclareCursorStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/DeclareCursorStatement.java
new file mode 100644
index 0000000..68129ec
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/DeclareCursorStatement.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+import java.util.*;
+
+public class DeclareCursorStatement implements BindableStatement {
+ private final CursorName cursorName;
+ private final SelectStatement select;
+
+ public DeclareCursorStatement(CursorName cursorName, SelectStatement select){
+ this.cursorName = cursorName;
+ this.select = select;
+ }
+
+ public String getCursorName(){
+ return cursorName.getName();
+ }
+
+ public String getQuerySQL(){
+ //Check if there are parameters to bind.
+ if(select.getBindCount() > 0){
+
+ }
+ //TODO: Test if this works
+ return select.toString();
+ }
+
+ public SelectStatement getSelect(){
+ return select;
+ }
+
+ public List<OrderByNode> getSelectOrderBy() {
+ return select.getOrderBy();
+ }
+
+ public int getBindCount(){
+ return select.getBindCount();
+ }
+
+ public Operation getOperation(){
+ return Operation.UPSERT;
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2cb617f3/phoenix-core/src/main/java/org/apache/phoenix/parse/FetchStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/FetchStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/FetchStatement.java
new file mode 100644
index 0000000..08e9724
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/FetchStatement.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+
+public class FetchStatement implements BindableStatement {
+ private final CursorName cursorName;
+ private final boolean isNext;
+ private final int fetchSize;
+
+ public FetchStatement(CursorName cursorName, boolean isNext, int fetchSize){
+ this.cursorName = cursorName;
+ this.isNext = isNext;
+ this.fetchSize = fetchSize;
+ }
+
+ public CursorName getCursorName(){
+ return cursorName;
+ }
+
+ public boolean isNext(){
+ return isNext;
+ }
+
+ public int getBindCount(){
+ return 0;
+ }
+
+ public Operation getOperation(){
+ return Operation.QUERY;
+ }
+
+ public int getFetchSize(){
+ return fetchSize;
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2cb617f3/phoenix-core/src/main/java/org/apache/phoenix/parse/OpenStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/OpenStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/OpenStatement.java
new file mode 100644
index 0000000..ad905b0
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/OpenStatement.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+
+public class OpenStatement implements BindableStatement {
+ private final CursorName cursorName;
+
+ public OpenStatement(CursorName cursorName){
+ this.cursorName = cursorName;
+ }
+
+ public String getCursorName(){
+ return cursorName.getName();
+ }
+
+ public int getBindCount(){
+ return 0;
+ }
+
+ public Operation getOperation(){
+ return Operation.UPSERT;
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2cb617f3/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
index 4b65c6a..4628d51 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
@@ -731,6 +731,26 @@ public class ParseNodeFactory {
return new UpsertStatement(table, hint, columns, values, select, bindCount, udfParseNodes, onDupKeyPairs);
}
+ public CursorName cursorName(String name){
+ return new CursorName(name);
+ }
+
+ public DeclareCursorStatement declareCursor(CursorName cursor, SelectStatement select){
+ return new DeclareCursorStatement(cursor, select);
+ }
+
+ public FetchStatement fetch(CursorName cursor, boolean isNext, int fetchLimit){
+ return new FetchStatement(cursor, isNext, fetchLimit);
+ }
+
+ public OpenStatement open(CursorName cursor){
+ return new OpenStatement(cursor);
+ }
+
+ public CloseStatement close(CursorName cursor){
+ return new CloseStatement(cursor);
+ }
+
public DeleteStatement delete(NamedTableNode table, HintNode hint, ParseNode node, List<OrderByNode> orderBy, LimitNode limit, int bindCount, Map<String, UDFParseNode> udfParseNodes) {
return new DeleteStatement(table, hint, node, orderBy, limit, bindCount, udfParseNodes);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2cb617f3/phoenix-core/src/main/java/org/apache/phoenix/parse/SQLParser.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/SQLParser.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/SQLParser.java
index 1a80991..b6b7de2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/SQLParser.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/SQLParser.java
@@ -139,6 +139,82 @@ public class SQLParser {
}
/**
+ * Parses the input as a SQL declare cursor statement.
+ * Used only in tests
+ * @throws SQLException
+ */
+ public DeclareCursorStatement parseDeclareCursor() throws SQLException {
+ try {
+ DeclareCursorStatement statement = parser.declare_cursor_node();
+ return statement;
+ } catch (RecognitionException e) {
+ throw PhoenixParserException.newException(e, parser.getTokenNames());
+ } catch (RuntimeException e) {
+ if (e.getCause() instanceof SQLException) {
+ throw (SQLException) e.getCause();
+ }
+ throw PhoenixParserException.newException(e, parser.getTokenNames());
+ }
+ }
+
+ /**
+ * Parses the input as a SQL cursor open statement.
+ * Used only in tests
+ * @throws SQLException
+ */
+ public OpenStatement parseOpen() throws SQLException {
+ try {
+ OpenStatement statement = parser.cursor_open_node();
+ return statement;
+ } catch (RecognitionException e) {
+ throw PhoenixParserException.newException(e, parser.getTokenNames());
+ } catch (RuntimeException e) {
+ if (e.getCause() instanceof SQLException) {
+ throw (SQLException) e.getCause();
+ }
+ throw PhoenixParserException.newException(e, parser.getTokenNames());
+ }
+ }
+
+ /**
+ * Parses the input as a SQL cursor close statement.
+ * Used only in tests
+ * @throws SQLException
+ */
+ public CloseStatement parseClose() throws SQLException {
+ try {
+ CloseStatement statement = parser.cursor_close_node();
+ return statement;
+ } catch (RecognitionException e) {
+ throw PhoenixParserException.newException(e, parser.getTokenNames());
+ } catch (RuntimeException e) {
+ if (e.getCause() instanceof SQLException) {
+ throw (SQLException) e.getCause();
+ }
+ throw PhoenixParserException.newException(e, parser.getTokenNames());
+ }
+ }
+
+ /**
+ * Parses the input as a SQL cursor fetch statement.
+ * Used only in tests
+ * @throws SQLException
+ */
+ public FetchStatement parseFetch() throws SQLException {
+ try {
+ FetchStatement statement = parser.cursor_fetch_node();
+ return statement;
+ } catch (RecognitionException e) {
+ throw PhoenixParserException.newException(e, parser.getTokenNames());
+ } catch (RuntimeException e) {
+ if (e.getCause() instanceof SQLException) {
+ throw (SQLException) e.getCause();
+ }
+ throw PhoenixParserException.newException(e, parser.getTokenNames());
+ }
+ }
+
+ /**
* Parses the input as a SQL select statement.
* Used only in tests
* @throws SQLException
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2cb617f3/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 8005e4a..1254d79 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -144,6 +144,7 @@ import org.apache.phoenix.compile.MutationPlan;
import org.apache.phoenix.compile.PostDDLCompiler;
import org.apache.phoenix.compile.PostIndexDDLCompiler;
import org.apache.phoenix.compile.PostLocalIndexDDLCompiler;
+import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.compile.StatementNormalizer;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
@@ -165,6 +166,7 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.parse.AddColumnStatement;
import org.apache.phoenix.parse.AlterIndexStatement;
+import org.apache.phoenix.parse.CloseStatement;
import org.apache.phoenix.parse.ColumnDef;
import org.apache.phoenix.parse.ColumnDefInPkConstraint;
import org.apache.phoenix.parse.ColumnName;
@@ -173,6 +175,7 @@ import org.apache.phoenix.parse.CreateIndexStatement;
import org.apache.phoenix.parse.CreateSchemaStatement;
import org.apache.phoenix.parse.CreateSequenceStatement;
import org.apache.phoenix.parse.CreateTableStatement;
+import org.apache.phoenix.parse.DeclareCursorStatement;
import org.apache.phoenix.parse.DropColumnStatement;
import org.apache.phoenix.parse.DropFunctionStatement;
import org.apache.phoenix.parse.DropIndexStatement;
@@ -181,6 +184,7 @@ import org.apache.phoenix.parse.DropSequenceStatement;
import org.apache.phoenix.parse.DropTableStatement;
import org.apache.phoenix.parse.IndexKeyConstraint;
import org.apache.phoenix.parse.NamedTableNode;
+import org.apache.phoenix.parse.OpenStatement;
import org.apache.phoenix.parse.PFunction;
import org.apache.phoenix.parse.PFunction.FunctionArgument;
import org.apache.phoenix.parse.PSchema;
@@ -214,6 +218,7 @@ import org.apache.phoenix.schema.types.PUnsignedLong;
import org.apache.phoenix.schema.types.PVarbinary;
import org.apache.phoenix.schema.types.PVarchar;
import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.CursorUtil;
import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.LogUtil;
@@ -1364,6 +1369,21 @@ public class MetaDataClient {
return fullName;
}
+ public MutationState declareCursor(DeclareCursorStatement statement, QueryPlan queryPlan) throws SQLException {
+ CursorUtil.declareCursor(statement, queryPlan);
+ return new MutationState(0,connection);
+ }
+
+ public MutationState open(OpenStatement statement) throws SQLException {
+ CursorUtil.openCursor(statement, connection);
+ return new MutationState(0,connection);
+ }
+
+ public MutationState close(CloseStatement statement) throws SQLException {
+ CursorUtil.closeCursor(statement);
+ return new MutationState(0,connection);
+ }
+
/**
* Create an index table by morphing the CreateIndexStatement into a CreateTableStatement and calling
* MetaDataClient.createTable. In doing so, we perform the following translations: