You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by an...@apache.org on 2017/05/17 07:27:02 UTC

[1/2] phoenix git commit: PHOENIX-3572 Support FETCH NEXT| n ROWS from Cursor (Biju Nair)

Repository: phoenix
Updated Branches:
  refs/heads/master 1666e932d -> 2cb617f35


http://git-wip-us.apache.org/repos/asf/phoenix/blob/2cb617f3/phoenix-core/src/main/java/org/apache/phoenix/util/CursorUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/CursorUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/CursorUtil.java
new file mode 100644
index 0000000..877c436
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/CursorUtil.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.util;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.execute.CursorFetchPlan;
+import org.apache.phoenix.iterate.CursorResultIterator;
+import org.apache.phoenix.parse.CloseStatement;
+import org.apache.phoenix.parse.DeclareCursorStatement;
+import org.apache.phoenix.parse.OpenStatement;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+public final class CursorUtil {
+
+    private static class CursorWrapper {
+        private final String cursorName;
+        private final String selectSQL;
+        private boolean isOpen = false;
+        QueryPlan queryPlan;
+        ImmutableBytesWritable row;
+        ImmutableBytesWritable previousRow;
+        private Scan scan;
+        private boolean moreValues=true;
+        private boolean isReversed;
+        private boolean islastCallNext;
+        private CursorFetchPlan fetchPlan;
+        private int offset = -1;
+        private boolean isAggregate;
+
+        private CursorWrapper(String cursorName, String selectSQL, QueryPlan queryPlan){
+            this.cursorName = cursorName;
+            this.selectSQL = selectSQL;
+            this.queryPlan = queryPlan;
+            this.islastCallNext = true;
+            this.fetchPlan = new CursorFetchPlan(queryPlan,cursorName);
+            isAggregate = fetchPlan.isAggregate();
+        }
+
+        private synchronized void openCursor(Connection conn) throws SQLException {
+            if(isOpen){
+                return;
+            }
+            this.scan = this.queryPlan.getContext().getScan();
+            isReversed=OrderBy.REV_ROW_KEY_ORDER_BY.equals(this.queryPlan.getOrderBy());
+            isOpen = true;
+        }
+
+        private void closeCursor() throws SQLException {
+            isOpen = false;
+            ((CursorResultIterator) fetchPlan.iterator()).closeCursor();
+            //TODO: Determine if the cursor should be removed from the HashMap at this point.
+            //Semantically it makes sense that something which is 'Closed' one should be able to 'Open' again.
+            mapCursorIDQuery.remove(this.cursorName);
+        }
+
+        private QueryPlan getFetchPlan(boolean isNext, int fetchSize) throws SQLException {
+            if (!isOpen)
+                throw new SQLException("Fetch call on closed cursor '" + this.cursorName + "'!");
+            ((CursorResultIterator)fetchPlan.iterator()).setFetchSize(fetchSize);
+            if (!isAggregate) { 
+                if (row!=null){
+                    scan.setStartRow(row.get());
+                }
+            }
+            return this.fetchPlan;
+        }
+
+        public void updateLastScanRow(Tuple rowValues,Tuple nextRowValues) {
+        	
+            this.moreValues = !isReversed ? nextRowValues != null : rowValues != null;
+            if(!moreValues()){
+               return;
+            }
+            if (row == null) {
+                row = new ImmutableBytesWritable();
+            }
+            if (previousRow == null) {
+                previousRow = new ImmutableBytesWritable();
+            }
+            if (nextRowValues != null) {
+                nextRowValues.getKey(row);
+            } 
+            if (rowValues != null) {
+                rowValues.getKey(previousRow);
+            }
+            offset++;
+        }
+
+        public boolean moreValues() {
+            return moreValues;
+        }
+
+        public String getFetchSQL() throws SQLException {
+            if (!isOpen)
+                throw new SQLException("Fetch call on closed cursor '" + this.cursorName + "'!");
+            return selectSQL;
+        }
+    }
+
+    private static Map<String, CursorWrapper> mapCursorIDQuery = new HashMap<String,CursorWrapper>();
+
+    /**
+     * Private constructor
+     */
+    private CursorUtil() {
+    }
+
+    /**
+     *
+     * @param stmt DeclareCursorStatement instance intending to declare a new cursor.
+     * @return Returns true if the new cursor was successfully declared. False if a cursor with the same
+     * identifier already exists.
+     */
+    public static boolean declareCursor(DeclareCursorStatement stmt, QueryPlan queryPlan) throws SQLException {
+        if(mapCursorIDQuery.containsKey(stmt.getCursorName())){
+            throw new SQLException("Can't declare cursor " + stmt.getCursorName() + ", cursor identifier already in use.");
+        } else {
+            mapCursorIDQuery.put(stmt.getCursorName(), new CursorWrapper(stmt.getCursorName(), stmt.getQuerySQL(), queryPlan));
+            return true;
+        }
+    }
+
+    public static boolean openCursor(OpenStatement stmt, Connection conn) throws SQLException {
+        if(mapCursorIDQuery.containsKey(stmt.getCursorName())){
+            mapCursorIDQuery.get(stmt.getCursorName()).openCursor(conn);
+            return true;
+        } else{
+            throw new SQLException("Cursor " + stmt.getCursorName() + " not declared.");
+        }
+    }
+
+    public static void closeCursor(CloseStatement stmt) throws SQLException {
+        if(mapCursorIDQuery.containsKey(stmt.getCursorName())){
+            mapCursorIDQuery.get(stmt.getCursorName()).closeCursor();
+        }
+    }
+
+    public static QueryPlan getFetchPlan(String cursorName, boolean isNext, int fetchSize) throws SQLException {
+        if(mapCursorIDQuery.containsKey(cursorName)){
+            return mapCursorIDQuery.get(cursorName).getFetchPlan(isNext, fetchSize);
+        } else {
+            throw new SQLException("Cursor " + cursorName + " not declared.");
+        }
+    }
+    
+    public static String getFetchSQL(String cursorName) throws SQLException {
+        if (mapCursorIDQuery.containsKey(cursorName)) {
+            return mapCursorIDQuery.get(cursorName).getFetchSQL();
+        } else {
+            throw new SQLException("Cursor " + cursorName + " not declared.");
+        }
+    }
+
+    public static void updateCursor(String cursorName, Tuple rowValues, Tuple nextRowValues) throws SQLException {
+        mapCursorIDQuery.get(cursorName).updateLastScanRow(rowValues,nextRowValues);
+    }
+
+    public static boolean cursorDeclared(String cursorName){
+        return mapCursorIDQuery.containsKey(cursorName);
+    }
+
+    public static boolean moreValues(String cursorName) {
+        return mapCursorIDQuery.get(cursorName).moreValues();
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2cb617f3/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index 1fdc73b..9794a2a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -613,6 +613,10 @@ public class ScanUtil {
         scan.setAttribute(BaseScannerRegionObserver.REVERSE_SCAN, PDataType.TRUE_BYTES);
     }
 
+    public static void unsetReversed(Scan scan) {
+        scan.setAttribute(BaseScannerRegionObserver.REVERSE_SCAN, PDataType.FALSE_BYTES);
+    }
+
     private static byte[] getReversedRow(byte[] startRow) {
         /*
          * Must get previous key because this is going from an inclusive start key to an exclusive stop key, and we need
@@ -926,4 +930,4 @@ public class ScanUtil {
         return scan.getAttribute((BaseScannerRegionObserver.REBUILD_INDEXES)) != null;
     }
     
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2cb617f3/phoenix-core/src/test/java/org/apache/phoenix/compile/CursorCompilerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/CursorCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/CursorCompilerTest.java
new file mode 100644
index 0000000..a8f37f0
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/CursorCompilerTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.CountAggregator;
+import org.apache.phoenix.expression.aggregator.ServerAggregators;
+import org.apache.phoenix.expression.function.TimeUnit;
+import org.apache.phoenix.filter.ColumnProjectionFilter;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.query.BaseConnectionlessQueryTest;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.*;
+import org.apache.phoenix.util.*;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.sql.*;
+import java.util.*;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.apache.phoenix.util.TestUtil.assertDegenerate;
+import static org.junit.Assert.*;
+
+
+/**
+ * 
+ * Test for compiling the various cursor related statements
+ *
+ * 
+ * @since 0.1
+ */
+@edu.umd.cs.findbugs.annotations.SuppressWarnings(
+        value="RV_RETURN_VALUE_IGNORED",
+        justification="Test code.")
+public class CursorCompilerTest extends BaseConnectionlessQueryTest {
+
+    @Test
+    public void testCursorLifecycleCompile() throws SQLException {
+        String query = "SELECT a_string, b_string FROM atable";
+        String sql = "DECLARE testCursor CURSOR FOR " + query;
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        //Test declare cursor compile
+        PreparedStatement statement = conn.prepareStatement(sql);
+        //Test declare cursor execution
+        statement.execute();
+        assertTrue(CursorUtil.cursorDeclared("testCursor"));
+        //Test open cursor compile
+        sql = "OPEN testCursor";
+        statement = conn.prepareStatement(sql);
+        //Test open cursor execution
+        statement.execute();
+        //Test fetch cursor compile
+        sql = "FETCH NEXT FROM testCursor";
+        statement = conn.prepareStatement(sql);
+        statement.executeQuery();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2cb617f3/phoenix-core/src/test/java/org/apache/phoenix/parse/CursorParserTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/parse/CursorParserTest.java b/phoenix-core/src/test/java/org/apache/phoenix/parse/CursorParserTest.java
new file mode 100644
index 0000000..247ee44
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/parse/CursorParserTest.java
@@ -0,0 +1,367 @@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+import org.apache.phoenix.schema.SortOrder;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+
+public class CursorParserTest {
+
+    private void parseCursor(String sql) throws IOException, SQLException {
+        SQLParser parser = new SQLParser(new StringReader(sql));
+        BindableStatement stmt = null;
+        try{
+            stmt = parser.parseDeclareCursor();
+        } catch (SQLException e){
+            fail("Unable to parse:\n" + sql);
+        }
+    }
+
+    private void parseFetch(String sql) throws IOException, SQLException {
+        SQLParser parser = new SQLParser(new StringReader(sql));
+        BindableStatement stmt = null;
+        try{
+            stmt = parser.parseFetch();
+        } catch (SQLException e){
+            fail("Unable to parse:\n" + sql);
+        }
+    }
+
+    private void parseOpen(String sql) throws IOException, SQLException {
+        SQLParser parser = new SQLParser(new StringReader(sql));
+        BindableStatement stmt = null;
+        try{
+            stmt = parser.parseOpen();
+        } catch (SQLException e){
+            fail("Unable to parse:\n" + sql);
+        }
+    }
+
+    @Test
+    public void testParseCursor0() throws Exception {
+        String expectedNameToken = "testCursor";
+        String expectedSelectStatement = "select a from b\n" +
+                "where ((ind.name = 'X')" +
+                "and rownum <= (1000 + 1000))\n";
+
+        String sql = "DECLARE " + expectedNameToken + " CURSOR FOR " + expectedSelectStatement;
+        parseCursor(sql);
+    }
+
+    @Test
+    public void testParseCursor1() throws Exception {
+        String expectedNameToken = "testCursor";
+        String expectedSelectStatement = "select /*gatherSlowStats*/ count(1) from core.search_name_lookup ind\n" +
+                "where( (ind.name = 'X'\n" +
+                "and rownum <= 1 + 2)\n" +
+                "and (ind.organization_id = '000000000000000')\n" +
+                "and (ind.key_prefix = '00T')\n" +
+                "and (ind.name_type = 't'))";
+        
+
+        String sql = "DECLARE " + expectedNameToken + " CURSOR FOR " + expectedSelectStatement;
+        parseCursor(sql);
+    }
+
+    @Test
+    public void testParseCursor2() throws Exception {
+        String expectedNameToken = "testCursor";
+        String expectedSelectStatement = "select /*gatherSlowStats*/ count(1) from core.custom_index_value ind\n" +
+                "where (ind.string_value in ('a', 'b', 'c', 'd'))\n" +
+                "and rownum <= ( 3 + 1 )\n" +
+                "and (ind.organization_id = '000000000000000')\n" +
+                "and (ind.key_prefix = '00T')\n" +
+                "and (ind.deleted = '0')\n" +
+                "and (ind.index_num = 1)";
+        
+
+        String sql = "DECLARE " + expectedNameToken + " CURSOR FOR " + expectedSelectStatement;
+        parseCursor(sql);
+        
+    }
+
+    @Test
+    public void testParseCursor3() throws Exception {
+        String expectedNameToken = "testCursor";
+        String expectedSelectStatement = "select /*gatherSlowStats*/ count(1) from core.custom_index_value ind\n" +
+                "where (ind.number_value > 3)\n" +
+                "and rownum <= 1000\n" +
+                "and (ind.organization_id = '000000000000000')\n" +
+                "and (ind.key_prefix = '001'\n" +
+                "and (ind.deleted = '0'))\n" +
+                "and (ind.index_num = 2)";
+        
+
+        String sql = "DECLARE " + expectedNameToken + " CURSOR FOR " + expectedSelectStatement;
+        parseCursor(sql);
+        
+    }
+
+    @Test
+    public void testParseCursor4() throws Exception {
+        String expectedNameToken = "testCursor";
+        String expectedSelectStatement = "select /*+ index(t iecustom_entity_data_created) */ /*gatherSlowStats*/ count(1) from core.custom_entity_data t\n" +
+                "where (t.created_date > to_date('01/01/2001'))\n" +
+                "and rownum <= 4500\n" +
+                "and (t.organization_id = '000000000000000')\n" +
+                "and (t.key_prefix = '001')";
+        
+
+        String sql = "DECLARE " + expectedNameToken + " CURSOR FOR " + expectedSelectStatement;
+        parseCursor(sql);
+        
+    }
+
+    @Test
+    public void testCountDistinctCursor() throws Exception {
+        String expectedNameToken = "testCursor";
+        String expectedSelectStatement = "select count(distinct foo) from core.custom_entity_data t\n"
+                + "where (t.created_date > to_date('01/01/2001'))\n"
+                + "and (t.organization_id = '000000000000000')\n"
+                + "and (t.key_prefix = '001')\n" + "limit 4500";
+        
+        String sql = "DECLARE " + expectedNameToken + " CURSOR FOR " + expectedSelectStatement;
+        parseCursor(sql);
+        
+    }
+
+    @Test
+    public void testIsNullCursor() throws Exception {
+        String expectedNameToken = "testCursor";
+        String expectedSelectStatement = "select count(foo) from core.custom_entity_data t\n" +
+                "where (t.created_date is null)\n" +
+                "and (t.organization_id is not null)\n";
+        
+        String sql = "DECLARE " + expectedNameToken + " CURSOR FOR " + expectedSelectStatement;
+        parseCursor(sql);
+        
+    }
+
+    @Test
+    public void testAsInColumnAlias() throws Exception {
+        String expectedNameToken = "testCursor";
+        String expectedSelectStatement = "select count(foo) AS c from core.custom_entity_data t\n" +
+                "where (t.created_date is null)\n" +
+                "and (t.organization_id is not null)\n";
+        
+        String sql = "DECLARE " + expectedNameToken + " CURSOR FOR " + expectedSelectStatement;
+        parseCursor(sql);
+        
+    }
+
+    @Test
+    public void testParseJoin1() throws Exception {
+        String expectedNameToken = "testCursor";
+        String expectedSelectStatement = "select /*SOQL*/ \"Id\"\n" +
+                "from (select /*+ ordered index(cft) */\n" +
+                "cft.val188 \"Marketing_Offer_Code__c\",\n" +
+                "t.account_id \"Id\"\n" +
+                "from sales.account_cfdata cft,\n" +
+                "sales.account t\n" +
+                "where (cft.account_cfdata_id = t.account_id)\n" +
+                "and (cft.organization_id = '00D300000000XHP')\n" +
+                "and (t.organization_id = '00D300000000XHP')\n" +
+                "and (t.deleted = '0')\n" +
+                "and (t.account_id != '000000000000000'))\n" +
+                "where (\"Marketing_Offer_Code__c\" = 'FSCR')";
+        
+        String sql = "DECLARE " + expectedNameToken + " CURSOR FOR " + expectedSelectStatement;
+        parseCursor(sql);
+        
+    }
+
+    @Test
+    public void testParseJoin2() throws Exception {
+        String expectedNameToken = "testCursor";
+        String expectedSelectStatement = "select /*rptacctlist 00O40000002C3of*/ \"00N40000001M8VK\",\n" +
+                "\"00N40000001M8VK.ID\",\n" +
+                "\"00N30000000r0K2\",\n" +
+                "\"00N30000000jgjo\"\n" +
+                "from (select /*+ ordered use_hash(aval368) index(cfa) */\n" +
+                "a.record_type_id \"RECORDTYPE\",\n" +
+                "aval368.last_name,aval368.first_name || ' ' || aval368.last_name,aval368.name \"00N40000001M8VK\",\n" +
+                "a.last_update \"LAST_UPDATE\",\n" +
+                "cfa.val368 \"00N40000001M8VK.ID\",\n" +
+                "TO_DATE(cfa.val282) \"00N30000000r0K2\",\n" +
+                "cfa.val252 \"00N30000000jgjo\"\n" +
+                "from sales.account a,\n" +
+                "sales.account_cfdata cfa,\n" +
+                "core.name_denorm aval368\n" +
+                "where (cfa.account_cfdata_id = a.account_id)\n" +
+                "and (aval368.entity_id = cfa.val368)\n" +
+                "and (a.deleted = '0')\n" +
+                "and (a.organization_id = '00D300000000EaE')\n" +
+                "and (a.account_id <> '000000000000000')\n" +
+                "and (cfa.organization_id = '00D300000000EaE')\n" +
+                "and (aval368.organization_id = '00D300000000EaE')\n" +
+                "and (aval368.entity_id like '005%'))\n" +
+                "where (\"RECORDTYPE\" = '0123000000002Gv')\n" +
+                "AND (\"00N40000001M8VK\" is null or \"00N40000001M8VK\" in ('BRIAN IRWIN', 'BRIAN MILLER', 'COLLEEN HORNYAK', 'ERNIE ZAVORAL JR', 'JAMIE TRIMBUR', 'JOE ANTESBERGER', 'MICHAEL HYTLA', 'NATHAN DELSIGNORE', 'SANJAY GANDHI', 'TOM BASHIOUM'))\n" +
+                "AND (\"LAST_UPDATE\" >= to_date('2009-08-01 07:00:00'))";
+        
+        String sql = "DECLARE " + expectedNameToken + " CURSOR FOR " + expectedSelectStatement;
+        parseCursor(sql);
+        
+    }
+
+    @Test
+    public void testCommentCursor() throws Exception {
+        String expectedNameToken = "testCursor";
+        String expectedSelectStatement = "select a from b -- here we come\n" +
+                "where ((ind.name = 'X') // to save the day\n" +
+                "and rownum /* won't run */ <= (1000 + 1000))\n";
+        
+        String sql = "DECLARE " + expectedNameToken + " CURSOR FOR " + expectedSelectStatement;
+        parseCursor(sql);
+        
+    }
+
+    @Test
+    public void testQuoteEscapeCursor() throws Exception {
+        String expectedNameToken = "testCursor";
+        String expectedSelectStatement = "select a from b\n" +
+                "where ind.name = 'X''Y'\n";
+        
+        String sql = "DECLARE " + expectedNameToken + " CURSOR FOR " + expectedSelectStatement;
+        parseCursor(sql);
+        
+    }
+
+    @Test
+    public void testSubtractionInSelect() throws Exception {
+        String expectedNameToken = "testCursor";
+        String expectedSelectStatement = "select a, 3-1-2, -4- -1-1 from b\n" +
+                "where d = c - 1\n";
+        
+        String sql = "DECLARE " + expectedNameToken + " CURSOR FOR " + expectedSelectStatement;
+        parseCursor(sql);
+        
+    }
+
+    @Test
+    public void testNextValueForSelect() throws Exception {
+        String expectedNameToken = "testCursor";
+        String expectedSelectStatement = "select next value for foo.bar \n" +
+                "from core.custom_entity_data\n";
+        
+        String sql = "DECLARE " + expectedNameToken + " CURSOR FOR " + expectedSelectStatement;
+        parseCursor(sql);
+        
+    }
+
+    @Test
+    public void testPercentileQuery1() throws Exception {
+        String expectedNameToken = "testCursor";
+        String expectedSelectStatement = "select PERCENTILE_CONT(0.9) WITHIN GROUP (ORDER BY salary DESC) from core.custom_index_value ind";
+        
+        String sql = "DECLARE " + expectedNameToken + " CURSOR FOR " + expectedSelectStatement;
+        parseCursor(sql);
+        
+    }
+
+    @Test
+    public void testPercentileQuery2() throws Exception {
+        String expectedNameToken = "testCursor";
+        String expectedSelectStatement = "select PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY mark ASC) from core.custom_index_value ind";
+        
+        String sql = "DECLARE " + expectedNameToken + " CURSOR FOR " + expectedSelectStatement;
+        parseCursor(sql);
+        
+    }
+
+    @Test
+    public void testRowValueConstructorQuery() throws Exception {
+        String expectedNameToken = "testCursor";
+        String expectedSelectStatement = "select a_integer FROM aTable where (x_integer, y_integer) > (3, 4)";
+        
+        String sql = "DECLARE " + expectedNameToken + " CURSOR FOR " + expectedSelectStatement;
+        parseCursor(sql);
+        
+    }
+
+    @Test
+    public void testSingleTopLevelNot() throws Exception {
+        String expectedNameToken = "testCursor";
+        String expectedSelectStatement = "select * from t where not c = 5";
+        
+        String sql = "DECLARE " + expectedNameToken + " CURSOR FOR " + expectedSelectStatement;
+        parseCursor(sql);
+    }
+
+    @Test
+    public void testHavingWithNot() throws Exception {
+        String expectedNameToken = "testCursor";
+        String expectedSelectStatement = "select\n" +
+                "\"WEB_STAT_ALIAS\".\"DOMAIN\" as \"c0\"\n" +
+                "from \"WEB_STAT\" \"WEB_STAT_ALIAS\"\n" +
+                "group by \"WEB_STAT_ALIAS\".\"DOMAIN\" having\n" +
+                "(\n" +
+                "(\n" +
+                "NOT\n" +
+                "(\n" +
+                "(sum(\"WEB_STAT_ALIAS\".\"ACTIVE_VISITOR\") is null)\n" +
+                ")\n" +
+                "OR NOT((sum(\"WEB_STAT_ALIAS\".\"ACTIVE_VISITOR\") is null))\n" +
+                ")\n" +
+                "OR NOT((sum(\"WEB_STAT_ALIAS\".\"ACTIVE_VISITOR\") is null))\n" +
+                ")\n" +
+                "order by CASE WHEN \"WEB_STAT_ALIAS\".\"DOMAIN\" IS NULL THEN 1 ELSE 0 END,\n" +
+                "\"WEB_STAT_ALIAS\".\"DOMAIN\" ASC";
+        
+        String sql = "DECLARE " + expectedNameToken + " CURSOR FOR " + expectedSelectStatement;
+        parseCursor(sql);
+        
+    }
+
+    @Test
+    public void testDoubleBackslash() throws Exception {
+        String expectedNameToken = "testCursor";
+        String expectedSelectStatement = "SELECT * FROM T WHERE A LIKE 'a\\(d'";
+        
+        String sql = "DECLARE " + expectedNameToken + " CURSOR FOR " + expectedSelectStatement;
+        parseCursor(sql);
+        
+    }
+
+    @Test
+    public void testOpenCursor() throws Exception {
+        String expectedNameToken = "testCursor";
+        String sql = "OPEN " + expectedNameToken;
+        parseOpen(sql);
+    }
+
+    @Test
+    public void testFetchNext() throws Exception {
+        String expectedNameToken = "testCursor";
+        String sql = "FETCH NEXT FROM " + expectedNameToken;
+        parseFetch(sql);
+    }
+
+}


[2/2] phoenix git commit: PHOENIX-3572 Support FETCH NEXT| n ROWS from Cursor (Biju Nair)

Posted by an...@apache.org.
PHOENIX-3572 Support FETCH NEXT| n ROWS from Cursor (Biju Nair)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/2cb617f3
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/2cb617f3
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/2cb617f3

Branch: refs/heads/master
Commit: 2cb617f352048179439d242d1165a9ffb39ad81c
Parents: 1666e93
Author: Ankit Singhal <an...@gmail.com>
Authored: Wed May 17 12:56:42 2017 +0530
Committer: Ankit Singhal <an...@gmail.com>
Committed: Wed May 17 12:56:42 2017 +0530

----------------------------------------------------------------------
 .../CursorWithRowValueConstructorIT.java        | 687 +++++++++++++++++++
 phoenix-core/src/main/antlr3/PhoenixSQL.g       |  31 +
 .../phoenix/compile/CloseStatementCompiler.java |  57 ++
 .../phoenix/compile/DeclareCursorCompiler.java  |  75 ++
 .../phoenix/compile/OpenStatementCompiler.java  |  57 ++
 .../apache/phoenix/execute/CursorFetchPlan.java |  53 ++
 .../phoenix/iterate/CursorResultIterator.java   |  75 ++
 .../apache/phoenix/jdbc/PhoenixStatement.java   |  91 ++-
 .../apache/phoenix/parse/CloseStatement.java    |  40 ++
 .../org/apache/phoenix/parse/CursorName.java    |  26 +
 .../phoenix/parse/DeclareCursorStatement.java   |  60 ++
 .../apache/phoenix/parse/FetchStatement.java    |  52 ++
 .../org/apache/phoenix/parse/OpenStatement.java |  40 ++
 .../apache/phoenix/parse/ParseNodeFactory.java  |  20 +
 .../org/apache/phoenix/parse/SQLParser.java     |  76 ++
 .../apache/phoenix/schema/MetaDataClient.java   |  20 +
 .../org/apache/phoenix/util/CursorUtil.java     | 189 +++++
 .../java/org/apache/phoenix/util/ScanUtil.java  |   6 +-
 .../phoenix/compile/CursorCompilerTest.java     |  87 +++
 .../apache/phoenix/parse/CursorParserTest.java  | 367 ++++++++++
 20 files changed, 2106 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/2cb617f3/phoenix-core/src/it/java/org/apache/phoenix/end2end/CursorWithRowValueConstructorIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CursorWithRowValueConstructorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CursorWithRowValueConstructorIT.java
new file mode 100644
index 0000000..dda4bd1
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CursorWithRowValueConstructorIT.java
@@ -0,0 +1,687 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.phoenix.util.*;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.sql.*;
+import java.util.Properties;
+import java.util.Random;
+
+import static org.apache.phoenix.util.TestUtil.*;
+import static org.junit.Assert.*;
+
+
+public class CursorWithRowValueConstructorIT extends ParallelStatsDisabledIT {
+    private static final String TABLE_NAME = "CursorRVCTestTable";
+    protected static final Log LOG = LogFactory.getLog(CursorWithRowValueConstructorIT.class);
+
+    public void createAndInitializeTestTable() throws SQLException {
+        Connection conn = DriverManager.getConnection(getUrl());
+
+        PreparedStatement stmt = conn.prepareStatement("CREATE TABLE IF NOT EXISTS " + TABLE_NAME +
+                "(a_id INTEGER NOT NULL, " +
+                "a_data INTEGER, " +
+                "CONSTRAINT my_pk PRIMARY KEY (a_id))");
+        stmt.execute();
+        synchronized (conn){
+            conn.commit();
+        }
+
+        //Upsert test values into the test table
+        Random rand = new Random();
+        stmt = conn.prepareStatement("UPSERT INTO " + TABLE_NAME +
+                "(a_id, a_data) VALUES (?,?)");
+        int rowCount = 0;
+        while(rowCount < 100){
+            stmt.setInt(1, rowCount);
+            stmt.setInt(2, rand.nextInt(501));
+            stmt.execute();
+            ++rowCount;
+        }
+        synchronized (conn){
+            conn.commit();
+        }
+    }
+
+    public void deleteTestTable() throws SQLException {
+        Connection conn = DriverManager.getConnection(getUrl());
+        PreparedStatement stmt = conn.prepareStatement("DROP TABLE IF EXISTS " + TABLE_NAME);
+        stmt.execute();
+        synchronized (conn){
+            conn.commit();
+        }
+    }
+
+    @Test
+    public void testCursorsOnTestTablePK() throws SQLException {
+        try{
+            createAndInitializeTestTable();
+            String querySQL = "SELECT a_id FROM " + TABLE_NAME;
+
+            //Test actual cursor implementation
+            String cursorSQL = "DECLARE testCursor CURSOR FOR " + querySQL;
+            DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).execute();
+            cursorSQL = "OPEN testCursor";
+            DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).execute();
+            cursorSQL = "FETCH NEXT FROM testCursor";
+            ResultSet rs = DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).executeQuery();
+            int rowID = 0;
+            while(rs.next()){
+                assertEquals(rowID,rs.getInt(1));
+                ++rowID;
+                rs = DriverManager.getConnection(getUrl()).createStatement().executeQuery(cursorSQL);
+            }
+        } finally{
+            DriverManager.getConnection(getUrl()).prepareStatement("CLOSE testCursor").execute();
+            deleteTestTable();
+        }
+
+    }
+
+    @Test
+    public void testCursorsOnRandomTableData() throws SQLException {
+        try{
+            createAndInitializeTestTable();
+            String querySQL = "SELECT a_id,a_data FROM " + TABLE_NAME + " ORDER BY a_data";
+            String cursorSQL = "DECLARE testCursor CURSOR FOR " + querySQL;
+            DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).execute();
+            cursorSQL = "OPEN testCursor";
+            DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).execute();
+            cursorSQL = "FETCH NEXT FROM testCursor";
+            ResultSet cursorRS = DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).executeQuery();
+            ResultSet rs = DriverManager.getConnection(getUrl()).prepareStatement(querySQL).executeQuery();
+            int rowCount = 0;
+            while(rs.next() && cursorRS.next()){
+                assertEquals(rs.getInt(2),cursorRS.getInt(2));
+                ++rowCount;
+                cursorRS = DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).executeQuery();
+            }
+            assertEquals(100, rowCount);
+        } finally{
+            DriverManager.getConnection(getUrl()).prepareStatement("CLOSE testCursor").execute();
+            deleteTestTable();
+        }
+    }
+
+    @Test
+    public void testCursorsOnTestTablePKDesc() throws SQLException {
+        try{
+            createAndInitializeTestTable();
+            String dummySQL = "SELECT a_id FROM " + TABLE_NAME + " ORDER BY a_id DESC";
+
+            String cursorSQL = "DECLARE testCursor CURSOR FOR " + dummySQL;
+            DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).execute();
+            cursorSQL = "OPEN testCursor";
+            DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).execute();
+            cursorSQL = "FETCH NEXT FROM testCursor";
+            ResultSet rs = DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).executeQuery();
+            int rowCount = 0;
+            while(rs.next()){
+                assertEquals(99-rowCount, rs.getInt(1));
+                rs = DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).executeQuery();
+                ++rowCount;
+            }
+            assertEquals(100, rowCount);
+        } finally{
+            DriverManager.getConnection(getUrl()).prepareStatement("CLOSE testCursor").execute();
+            deleteTestTable();
+        }
+    }
+
+    @Test
+    public void testCursorsOnTestTableNonPKDesc() throws SQLException {
+        try{
+            createAndInitializeTestTable();
+            String dummySQL = "SELECT a_data FROM " + TABLE_NAME + " ORDER BY a_data DESC";
+
+            String cursorSQL = "DECLARE testCursor CURSOR FOR " + dummySQL;
+            DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).execute();
+            cursorSQL = "OPEN testCursor";
+            DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).execute();
+            cursorSQL = "FETCH NEXT FROM testCursor";
+            ResultSet rs = DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).executeQuery();
+            int rowCount = 0;
+            while(rs.next()){
+                rs = DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).executeQuery();
+                ++rowCount;
+            }
+            assertEquals(100, rowCount);
+        } finally{
+            DriverManager.getConnection(getUrl()).prepareStatement("CLOSE testCursor").execute();
+            deleteTestTable();
+        }
+    }
+
+    @Test
+    public void testCursorsOnWildcardSelect() throws SQLException {
+        try{
+            createAndInitializeTestTable();
+            String querySQL = "SELECT * FROM " + TABLE_NAME;
+            ResultSet rs = DriverManager.getConnection(getUrl()).prepareStatement(querySQL).executeQuery();
+
+            String cursorSQL = "DECLARE testCursor CURSOR FOR "+querySQL;
+            DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).execute();
+            cursorSQL = "OPEN testCursor";
+            DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).execute();
+            cursorSQL = "FETCH NEXT FROM testCursor";
+            ResultSet cursorRS = DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).executeQuery();
+            int rowCount = 0;
+            while(rs.next() && cursorRS.next()){
+                assertEquals(rs.getInt(1),cursorRS.getInt(1));
+                ++rowCount;
+                cursorRS = DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).executeQuery();
+            }
+            assertEquals(100, rowCount);
+        } finally{
+            DriverManager.getConnection(getUrl()).prepareStatement("CLOSE testCursor").execute();
+            deleteTestTable();
+        }
+    }
+
+    @Test
+    public void testCursorsWithBindings() throws Exception {
+        long ts = nextTimestamp();
+        String tenantId = getOrganizationId();
+        initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl(), null);
+        ensureTableCreated(getUrl(), CUSTOM_ENTITY_DATA_FULL_NAME, CUSTOM_ENTITY_DATA_FULL_NAME, ts-1);
+        String query = "SELECT a_integer, x_integer FROM aTable WHERE ?=organization_id AND (a_integer, x_integer) = (7, 5)";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String cursor = "DECLARE testCursor CURSOR FOR "+query;
+        try {
+            PreparedStatement statement = conn.prepareStatement(cursor);
+            statement.setString(1, tenantId);
+            statement.execute();
+        }catch(SQLException e){
+            assertTrue(e.getMessage().equalsIgnoreCase("Cannot declare cursor, internal SELECT statement contains bindings!"));
+            assertTrue(!CursorUtil.cursorDeclared("testCursor"));
+            return;
+        } finally {
+            cursor = "CLOSE testCursor";
+            conn.prepareStatement(cursor).execute();
+            conn.close();
+        }
+        fail();
+    }
+
+    @Test
+    public void testCursorsInWhereWithEqualsExpression() throws Exception {
+        long ts = nextTimestamp();
+        String tenantId = getOrganizationId();
+        initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl(), null);
+        ensureTableCreated(getUrl(), CUSTOM_ENTITY_DATA_FULL_NAME, CUSTOM_ENTITY_DATA_FULL_NAME, ts-1);
+        String query = "SELECT a_integer, x_integer FROM aTable WHERE '"+tenantId+"'=organization_id AND (a_integer, x_integer) = (7, 5)";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String cursor = "DECLARE testCursor CURSOR FOR "+query;
+        try {
+            conn.prepareStatement(cursor).execute();
+            cursor = "OPEN testCursor";
+            conn.prepareStatement(cursor).execute();
+            cursor = "FETCH NEXT FROM testCursor";
+            ResultSet rs = conn.prepareStatement(cursor).executeQuery();
+            int count = 0;
+            while(rs.next()) {
+                assertTrue(rs.getInt(1) == 7);
+                assertTrue(rs.getInt(2) == 5);
+                count++;
+                rs = conn.prepareStatement(cursor).executeQuery();
+            }
+            assertTrue(count == 1);
+        } finally {
+            cursor = "CLOSE testCursor";
+            conn.prepareStatement(cursor).execute();
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testCursorsInWhereWithGreaterThanExpression() throws Exception {
+        long ts = nextTimestamp();
+        String tenantId = getOrganizationId();
+        initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl(), null);
+        ensureTableCreated(getUrl(), CUSTOM_ENTITY_DATA_FULL_NAME, CUSTOM_ENTITY_DATA_FULL_NAME, ts-1);
+        String query = "SELECT a_integer, x_integer FROM aTable WHERE '"+tenantId+"'=organization_id  AND (a_integer, x_integer) >= (4, 4)";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String cursor = "DECLARE testCursor CURSOR FOR "+query;
+        try {
+            conn.prepareStatement(cursor).execute();
+            cursor = "OPEN testCursor";
+            conn.prepareStatement(cursor).execute();
+            cursor = "FETCH NEXT FROM testCursor";
+            ResultSet rs = conn.prepareStatement(cursor).executeQuery();
+            int count = 0;
+            while(rs.next()) {
+                assertTrue(rs.getInt(1) >= 4);
+                assertTrue(rs.getInt(1) == 4 ? rs.getInt(2) >= 4 : rs.getInt(2) >= 0);
+                count++;
+                rs = conn.prepareStatement(cursor).executeQuery();
+            }
+            assertTrue(count == 5);
+        } finally {
+            cursor = "CLOSE testCursor";
+            conn.prepareStatement(cursor).execute();
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testCursorsInWhereWithUnEqualNumberArgs() throws Exception {
+        long ts = nextTimestamp();
+        String tenantId = getOrganizationId();
+        initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl(), null);
+        ensureTableCreated(getUrl(), CUSTOM_ENTITY_DATA_FULL_NAME, CUSTOM_ENTITY_DATA_FULL_NAME, ts-1);
+        String query = "SELECT a_integer, x_integer FROM aTable WHERE '"+tenantId+"'=organization_id  AND (a_integer, x_integer, y_integer) >= (7, 5)";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String cursor = "DECLARE testCursor CURSOR FOR "+query;
+        try {
+            double startTime = System.nanoTime();
+            conn.prepareStatement(cursor).execute();
+            cursor = "OPEN testCursor";
+            conn.prepareStatement(cursor).execute();
+            cursor = "FETCH NEXT FROM testCursor";
+            ResultSet rs = conn.prepareStatement(cursor).executeQuery();
+            int count = 0;
+            while(rs.next()) {
+                assertTrue(rs.getInt(1) >= 7);
+                assertTrue(rs.getInt(1) == 7 ? rs.getInt(2) >= 5 : rs.getInt(2) >= 0);
+                count++;
+                rs = conn.prepareStatement(cursor).executeQuery();
+            }
+            // we have key values (7,5) (8,4) and (9,3) present in aTable. So the query should return the 3 records.
+            assertTrue(count == 3);
+            double endTime = System.nanoTime();
+            System.out.println("Method Time in milliseconds: "+Double.toString((endTime-startTime)/1000000));
+        } finally {
+            cursor = "CLOSE testCursor";
+            conn.prepareStatement(cursor).execute();
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testCursorsOnLHSAndLiteralExpressionOnRHS() throws Exception {
+        long ts = nextTimestamp();
+        String tenantId = getOrganizationId();
+        initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl(), null);
+        ensureTableCreated(getUrl(), CUSTOM_ENTITY_DATA_FULL_NAME, CUSTOM_ENTITY_DATA_FULL_NAME, ts-1);
+        String query = "SELECT a_integer, x_integer FROM aTable WHERE '"+tenantId+"'=organization_id  AND (a_integer, x_integer) >= 7";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String cursor = "DECLARE testCursor CURSOR FOR "+query;
+        try {
+            conn.prepareStatement(cursor).execute();
+            cursor = "OPEN testCursor";
+            conn.prepareStatement(cursor).execute();
+            cursor = "FETCH NEXT FROM testCursor";
+            ResultSet rs = conn.prepareStatement(cursor).executeQuery();
+            int count = 0;
+            while(rs.next()) {
+                count++;
+                rs = conn.prepareStatement(cursor).executeQuery();
+            }
+            // we have key values (7,5) (8,4) and (9,3) present in aTable. So the query should return the 3 records.
+            assertTrue(count == 3);
+        } finally {
+            cursor = "CLOSE testCursor";
+            conn.prepareStatement(cursor).execute();
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testCursorsOnRHSLiteralExpressionOnLHS() throws Exception {
+        long ts = nextTimestamp();
+        String tenantId = getOrganizationId();
+        initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl(), null);
+        ensureTableCreated(getUrl(), CUSTOM_ENTITY_DATA_FULL_NAME, CUSTOM_ENTITY_DATA_FULL_NAME, ts-1);
+        String query = "SELECT a_integer, x_integer FROM aTable WHERE '"+tenantId+"'=organization_id  AND 7 <= (a_integer, x_integer)";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String cursor = "DECLARE testCursor CURSOR FOR "+query;
+        try {
+            conn.prepareStatement(cursor).execute();
+            cursor = "OPEN testCursor";
+            conn.prepareStatement(cursor).execute();
+            cursor = "FETCH NEXT FROM testCursor";
+            ResultSet rs = conn.prepareStatement(cursor).executeQuery();
+            int count = 0;
+            while(rs.next()) {
+                count++;
+                rs = conn.prepareStatement(cursor).executeQuery();
+            }
+            // we have key values (7,5) (8,4) and (9,3) present in aTable. So the query should return the 3 records.
+            assertTrue(count == 3);
+        } finally {
+            cursor = "CLOSE testCursor";
+            conn.prepareStatement(cursor).execute();
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testCursorsOnBuiltInFunctionOperatingOnIntegerLiteral() throws Exception {
+        long ts = nextTimestamp();
+        String tenantId = getOrganizationId();
+        initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl(), null);
+        ensureTableCreated(getUrl(), CUSTOM_ENTITY_DATA_FULL_NAME, CUSTOM_ENTITY_DATA_FULL_NAME, ts-1);
+        String query = "SELECT a_integer, x_integer FROM aTable WHERE '"+tenantId+"'=organization_id  AND (a_integer, x_integer) >= to_number('7')";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String cursor = "DECLARE testCursor CURSOR FOR "+query;
+        try {
+            conn.prepareStatement(cursor).execute();
+            cursor = "OPEN testCursor";
+            conn.prepareStatement(cursor).execute();
+            cursor = "FETCH NEXT FROM testCursor";
+            ResultSet rs = conn.prepareStatement(cursor).executeQuery();
+            int count = 0;
+            while(rs.next()) {
+                count++;
+                rs = conn.prepareStatement(cursor).executeQuery();
+            }
+            // we have key values (7,5) (8,4) and (9,3) present in aTable. So the query should return the 3 records.
+            assertEquals(3, count);
+        } finally {
+            cursor = "CLOSE testCursor";
+            conn.prepareStatement(cursor).execute();
+            conn.close();
+        }
+    }
+
+    @Test
+    /**
+     * Test for the precision of Date datatype when used as part of a filter within the internal Select statement.
+     */
+    public void testCursorsWithDateDatatypeFilter() throws Exception {
+        long ts = nextTimestamp();
+        String tenantId = getOrganizationId();
+        long currentTime = System.currentTimeMillis();
+        java.sql.Date date = new java.sql.Date(currentTime);
+        String strCurrentDate = date.toString();
+
+        //Sets date to <yesterday's date> 23:59:59.999
+        while(date.toString().equals(strCurrentDate)){
+            currentTime -= 1;
+            date = new Date(currentTime);
+        }
+        //Sets date to <today's date> 00:00:00.001
+        date = new Date(currentTime+2);
+        java.sql.Date midnight = new Date(currentTime+1);
+
+
+        initEntityHistoryTableValues(tenantId, getDefaultSplits(tenantId), date, ts);
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2));
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+
+
+        String query = "select parent_id from " + ENTITY_HISTORY_TABLE_NAME +
+                " WHERE (organization_id, parent_id, created_date, entity_history_id) IN ((?,?,?,?),(?,?,?,?))";
+
+        query = query.replaceFirst("\\?", "'"+tenantId+"'");
+        query = query.replaceFirst("\\?", "'"+PARENTID3+"'");
+        query = query.replaceFirst("\\?", "TO_DATE('"+DateUtil.getDateFormatter(DateUtil.DEFAULT_DATE_FORMAT).format(date)+"')");
+        query = query.replaceFirst("\\?", "'"+ENTITYHISTID3+"'");
+        query = query.replaceFirst("\\?", "'"+tenantId+"'");
+        query = query.replaceFirst("\\?", "'"+PARENTID7+"'");
+        query = query.replaceFirst("\\?", "TO_DATE('"+DateUtil.getDateFormatter(DateUtil.DEFAULT_DATE_FORMAT).format(date)+"')");
+        query = query.replaceFirst("\\?", "'"+ENTITYHISTID7+"'");
+        String cursor = "DECLARE testCursor CURSOR FOR "+query;
+
+        conn.prepareStatement(cursor).execute();
+        cursor = "OPEN testCursor";
+        conn.prepareStatement(cursor).execute();
+        cursor = "FETCH NEXT FROM testCursor";
+
+        ResultSet rs = conn.prepareStatement(cursor).executeQuery();
+        assertTrue(rs.next());
+        assertEquals(PARENTID3, rs.getString(1));
+        rs = conn.prepareStatement(cursor).executeQuery();
+        assertTrue(rs.next());
+        assertEquals(PARENTID7, rs.getString(1));
+        assertFalse(rs.next());
+
+        //Test against the same table for the same records, but this time use the 'midnight' java.sql.Date instance.
+        //'midnight' is identical to 'date' to the tens of millisecond precision.
+        query = "select parent_id from " + ENTITY_HISTORY_TABLE_NAME +
+                " WHERE (organization_id, parent_id, created_date, entity_history_id) IN ((?,?,?,?),(?,?,?,?))";
+        query = query.replaceFirst("\\?", "'"+tenantId+"'");
+        query = query.replaceFirst("\\?", "'"+PARENTID3+"'");
+        query = query.replaceFirst("\\?", "TO_DATE('"+DateUtil.getDateFormatter(DateUtil.DEFAULT_DATE_FORMAT).format(midnight)+"')");
+        query = query.replaceFirst("\\?", "'"+ENTITYHISTID3+"'");
+        query = query.replaceFirst("\\?", "'"+tenantId+"'");
+        query = query.replaceFirst("\\?", "'"+PARENTID7+"'");
+        query = query.replaceFirst("\\?", "TO_DATE('"+DateUtil.getDateFormatter(DateUtil.DEFAULT_DATE_FORMAT).format(midnight)+"')");
+        query = query.replaceFirst("\\?", "'"+ENTITYHISTID7+"'");
+        cursor = "DECLARE testCursor2 CURSOR FOR "+query;
+
+        conn.prepareStatement(cursor).execute();
+        cursor = "OPEN testCursor2";
+        conn.prepareStatement(cursor).execute();
+        cursor = "FETCH NEXT FROM testCursor2";
+
+        rs = conn.prepareStatement(cursor).executeQuery();
+        assertTrue(!rs.next());
+        String sql = "CLOSE testCursor";
+        conn.prepareStatement(sql).execute();
+        sql = "CLOSE testCursor2";
+        conn.prepareStatement(sql).execute();
+    }
+
+    @Test
+    public void testCursorsWithNonLeadingPkColsOfTypesTimeStampAndVarchar() throws Exception {
+        long ts = nextTimestamp();
+        String tenantId = getOrganizationId();
+        initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl(), null);
+        ensureTableCreated(getUrl(), CUSTOM_ENTITY_DATA_FULL_NAME, CUSTOM_ENTITY_DATA_FULL_NAME, ts-1);
+        String updateStmt =
+                "upsert into " +
+                        "ATABLE(" +
+                        "    ORGANIZATION_ID, " +
+                        "    ENTITY_ID, " +
+                        "    A_TIMESTAMP) " +
+                        "VALUES (?, ?, ?)";
+        String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 1);
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection upsertConn = DriverManager.getConnection(url, props);
+        upsertConn.setAutoCommit(true);
+        PreparedStatement stmt = upsertConn.prepareStatement(updateStmt);
+        stmt.setString(1, tenantId);
+        stmt.setString(2, ROW4);
+        Timestamp tsValue = new Timestamp(System.nanoTime());
+        stmt.setTimestamp(3, tsValue);
+        stmt.execute();
+
+        String query = "SELECT a_timestamp, a_string FROM aTable WHERE ?=organization_id  AND (a_timestamp, a_string) = (?, 'a')";
+        query = query.replaceFirst("\\?", "'"+tenantId+"'");
+        query = query.replaceFirst("\\?", "TO_DATE('"+DateUtil.getDateFormatter(DateUtil.DEFAULT_TIMESTAMP_FORMAT).format(tsValue)+"')");
+
+        props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            String cursor = "DECLARE testCursor CURSOR FOR "+query;
+            conn.prepareStatement(cursor).execute();
+            cursor = "OPEN testCursor";
+            conn.prepareStatement(cursor).execute();
+            cursor = "FETCH NEXT FROM testCursor";
+
+            ResultSet rs = conn.prepareStatement(cursor).executeQuery();
+            int count = 0;
+            while(rs.next()) {
+                assertTrue(rs.getTimestamp(1).equals(tsValue));
+                assertTrue(rs.getString(2).compareTo("a") == 0);
+                count++;
+                rs = conn.prepareStatement(cursor).executeQuery();
+            }
+            assertTrue(count == 1);
+        } finally {
+            String sql = "CLOSE testCursor";
+            conn.prepareStatement(sql).execute();
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testCursorsQueryMoreWithInListClausePossibleNullValues() throws Exception {
+        long ts = nextTimestamp();
+        String tenantId = getOrganizationId();
+        initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl(), null);
+        ensureTableCreated(getUrl(), CUSTOM_ENTITY_DATA_FULL_NAME, CUSTOM_ENTITY_DATA_FULL_NAME, ts-1);
+        String updateStmt =
+                "upsert into " +
+                        "ATABLE(ORGANIZATION_ID, ENTITY_ID, Y_INTEGER, X_INTEGER) VALUES (?, ?, ?, ?)";
+        String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 1);
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection upsertConn = DriverManager.getConnection(url, props);
+        upsertConn.setAutoCommit(true);
+        PreparedStatement stmt = upsertConn.prepareStatement(updateStmt);
+        stmt.setString(1, tenantId);
+        stmt.setString(2, ROW4);
+        stmt.setInt(3, 4);
+        stmt.setInt(4, 5);
+        stmt.execute();
+
+        //we have a row present in aTable where x_integer = 5 and y_integer = NULL which gets translated to 0 when retriving from HBase.
+        String query = "SELECT x_integer, y_integer FROM aTable WHERE ? = organization_id AND (x_integer) IN ((5))";
+
+        query = query.replaceFirst("\\?", "'"+tenantId+"'");
+
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+
+        try {
+            String cursor = "DECLARE testCursor CURSOR FOR "+query;
+            conn.prepareStatement(cursor).execute();
+            cursor = "OPEN testCursor";
+            conn.prepareStatement(cursor).execute();
+            cursor = "FETCH NEXT FROM testCursor";
+
+            ResultSet rs = conn.prepareStatement(cursor).executeQuery();
+            assertTrue(rs.next());
+            assertEquals(5, rs.getInt(1));
+            assertEquals(4, rs.getInt(2));
+            rs = conn.prepareStatement(cursor).executeQuery();
+            assertTrue(rs.next());
+            assertEquals(5, rs.getInt(1));
+            assertEquals(0, rs.getInt(2));
+        } finally {
+            String sql = "CLOSE testCursor";
+            conn.prepareStatement(sql).execute();
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testCursorsWithColsOfTypesDecimal() throws Exception {
+        long ts = nextTimestamp();
+        String tenantId = getOrganizationId();
+        initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl(), null);
+        ensureTableCreated(getUrl(), CUSTOM_ENTITY_DATA_FULL_NAME, CUSTOM_ENTITY_DATA_FULL_NAME, ts-1);
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+
+        String query = "SELECT x_decimal FROM aTable WHERE ?=organization_id AND entity_id IN (?,?,?)";
+        query = query.replaceFirst("\\?", "'"+tenantId+"'");
+        query = query.replaceFirst("\\?", "'"+ROW7+"'");
+        query = query.replaceFirst("\\?", "'"+ROW8+"'");
+        query = query.replaceFirst("\\?", "'"+ROW9+"'");
+
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            String cursor = "DECLARE testCursor CURSOR FOR "+query;
+            conn.prepareStatement(cursor).execute();
+            cursor = "OPEN testCursor";
+            conn.prepareStatement(cursor).execute();
+            cursor = "FETCH NEXT FROM testCursor";
+
+            ResultSet rs = conn.prepareStatement(cursor).executeQuery();
+            int count = 0;
+            while(rs.next()) {
+                assertTrue(BigDecimal.valueOf(0.1).equals(rs.getBigDecimal(1)) || BigDecimal.valueOf(3.9).equals(rs.getBigDecimal(1)) || BigDecimal.valueOf(3.3).equals(rs.getBigDecimal(1)));
+                count++;
+                if(count == 3) break;
+                rs = conn.prepareStatement(cursor).executeQuery();
+            }
+            assertTrue(count == 3);
+        } finally {
+            String sql = "CLOSE testCursor";
+            conn.prepareStatement(sql).execute();
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testCursorsWithColsOfTypesTinyintSmallintFloatDouble() throws Exception {
+        long ts = nextTimestamp();
+        String tenantId = getOrganizationId();
+        initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl(), null);
+        ensureTableCreated(getUrl(), CUSTOM_ENTITY_DATA_FULL_NAME, CUSTOM_ENTITY_DATA_FULL_NAME, ts-1);
+        String query = "SELECT a_byte,a_short,a_float,a_double FROM aTable WHERE ?=organization_id AND entity_id IN (?,?,?)";
+        query = query.replaceFirst("\\?", "'"+tenantId+"'");
+        query = query.replaceFirst("\\?", "'"+ROW1+"'");
+        query = query.replaceFirst("\\?", "'"+ROW2+"'");
+        query = query.replaceFirst("\\?", "'"+ROW3+"'");
+
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            String cursor = "DECLARE testCursor CURSOR FOR "+query;
+            conn.prepareStatement(cursor).execute();
+            cursor = "OPEN testCursor";
+            conn.prepareStatement(cursor).execute();
+            cursor = "FETCH NEXT FROM testCursor";
+
+            ResultSet rs = conn.prepareStatement(cursor).executeQuery();
+            int count = 0;
+            while(rs.next()) {
+                assertTrue((byte)1 == (rs.getByte(1)) || (byte)2 == (rs.getByte(1)) || (byte)3 == (rs.getByte(1)));
+                assertTrue((short)128 == (rs.getShort(2)) || (short)129 == (rs.getShort(2)) || (short)130 == (rs.getShort(2)));
+                assertTrue(0.01f == (rs.getFloat(3)) || 0.02f == (rs.getFloat(3)) || 0.03f == (rs.getFloat(3)));
+                assertTrue(0.0001 == (rs.getDouble(4)) || 0.0002 == (rs.getDouble(4)) || 0.0003 == (rs.getDouble(4)));
+                count++;
+                if(count == 3) break;
+                rs = conn.prepareStatement(cursor).executeQuery();
+            }
+            assertTrue(count == 3);
+        } finally {
+            String sql = "CLOSE testCursor";
+            conn.prepareStatement(sql).execute();
+            conn.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2cb617f3/phoenix-core/src/main/antlr3/PhoenixSQL.g
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/antlr3/PhoenixSQL.g b/phoenix-core/src/main/antlr3/PhoenixSQL.g
index 07a51ce..66d50e9 100644
--- a/phoenix-core/src/main/antlr3/PhoenixSQL.g
+++ b/phoenix-core/src/main/antlr3/PhoenixSQL.g
@@ -130,6 +130,10 @@ tokens
     USE='use';
     OFFSET ='offset';
     FETCH = 'fetch';
+    DECLARE = 'declare';
+    CURSOR = 'cursor';
+    OPEN = 'open';
+    CLOSE = 'close';
     ROW = 'row';
     ROWS = 'rows';
     ONLY = 'only';
@@ -409,6 +413,10 @@ oneStatement returns [BindableStatement ret]
     |   s=create_schema_node
     |   s=create_view_node
     |   s=create_index_node
+    |   s=cursor_open_node
+    |   s=cursor_close_node
+    |   s=cursor_fetch_node
+    |   s=declare_cursor_node
     |   s=drop_table_node
     |   s=drop_index_node
     |   s=alter_index_node
@@ -744,6 +752,25 @@ upsert_column_refs returns [Pair<List<ColumnDef>,List<ColumnName>> ret]
        (COMMA d=dyn_column_name_or_def { if (d.getDataType()!=null) { $ret.getFirst().add(d); } $ret.getSecond().add(d.getColumnDefName()); } )*
 ;
 	
+
+// Parse a full declare cursor expression structure.
+declare_cursor_node returns [DeclareCursorStatement ret]
+    :    DECLARE c=cursor_name CURSOR FOR s=select_node
+        {ret = factory.declareCursor(c, s); }
+    ;
+
+cursor_open_node returns [OpenStatement ret]
+    :    OPEN c=cursor_name {ret = factory.open(c);}
+    ;
+ 
+cursor_close_node returns [CloseStatement ret]
+    :    CLOSE c=cursor_name {ret = factory.close(c);}
+    ;
+
+cursor_fetch_node returns [FetchStatement ret]
+    :    FETCH NEXT (a=NUMBER)? (ROW|ROWS)? FROM c=cursor_name {ret = factory.fetch(c,true, a == null ? 1 :  Integer.parseInt( a.getText() )); }
+    ;
+
 // Parse a full delete expression structure.
 delete_node returns [DeleteStatement ret]
     :   DELETE (hint=hintClause)? FROM t=from_table_name
@@ -1033,6 +1060,10 @@ index_name returns [NamedNode ret]
     :   name=identifier {$ret = factory.indexName(name); }
     ;
 
+cursor_name returns [CursorName ret]
+    :   name=identifier {$ret = factory.cursorName(name);}
+    ;
+
 // TODO: figure out how not repeat this two times
 table_name returns [TableName ret]
     :   t=identifier {$ret = factory.table(null, t); }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2cb617f3/phoenix-core/src/main/java/org/apache/phoenix/compile/CloseStatementCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/CloseStatementCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/CloseStatementCompiler.java
new file mode 100644
index 0000000..cc53a9d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/CloseStatementCompiler.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+import org.apache.phoenix.parse.CloseStatement;
+import org.apache.phoenix.parse.OpenStatement;
+import org.apache.phoenix.schema.MetaDataClient;
+
+import java.sql.SQLException;
+import java.util.Collections;
+
+public class CloseStatementCompiler {
+    private final PhoenixStatement statement;
+    private final Operation operation;
+
+    public CloseStatementCompiler(PhoenixStatement statement, Operation operation) {
+        this.statement = statement;
+        this.operation = operation;
+    }
+
+    public MutationPlan compile(final CloseStatement close) throws SQLException {
+        final PhoenixConnection connection = statement.getConnection();
+        final StatementContext context = new StatementContext(statement);
+        final MetaDataClient client = new MetaDataClient(connection);
+        
+        return new BaseMutationPlan(context, operation) {
+            @Override
+            public MutationState execute() throws SQLException {
+                return client.close(close);
+            }
+
+            @Override
+            public ExplainPlan getExplainPlan() throws SQLException {
+                return new ExplainPlan(Collections.singletonList("CLOSE CURSOR"));
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2cb617f3/phoenix-core/src/main/java/org/apache/phoenix/compile/DeclareCursorCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeclareCursorCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeclareCursorCompiler.java
new file mode 100644
index 0000000..5280291
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeclareCursorCompiler.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+import org.apache.phoenix.parse.CreateIndexStatement;
+import org.apache.phoenix.parse.DeclareCursorStatement;
+import org.apache.phoenix.parse.ParseNode;
+import org.apache.phoenix.util.CursorUtil;
+import org.apache.phoenix.schema.MetaDataClient;
+import org.apache.phoenix.schema.PTable.IndexType;
+
+import java.sql.ParameterMetaData;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+
+public class DeclareCursorCompiler {
+    private final PhoenixStatement statement;
+    private final Operation operation;
+    private QueryPlan queryPlan;
+
+    public DeclareCursorCompiler(PhoenixStatement statement, Operation operation, QueryPlan queryPlan) {
+        this.statement = statement;
+        this.operation = operation;
+        this.queryPlan = queryPlan;
+    }
+
+    public MutationPlan compile(final DeclareCursorStatement declare) throws SQLException {
+        if(declare.getBindCount() != 0){
+            throw new SQLException("Cannot declare cursor, internal SELECT statement contains bindings!");
+        }
+
+        final PhoenixConnection connection = statement.getConnection();
+        final StatementContext context = new StatementContext(statement);
+        final MetaDataClient client = new MetaDataClient(connection);
+        
+        return new BaseMutationPlan(context, operation) {
+            @Override
+            public MutationState execute() throws SQLException {
+                return client.declareCursor(declare, queryPlan);
+            }
+
+            @Override
+            public ExplainPlan getExplainPlan() throws SQLException {
+                return new ExplainPlan(Collections.singletonList("DECLARE CURSOR"));
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2cb617f3/phoenix-core/src/main/java/org/apache/phoenix/compile/OpenStatementCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/OpenStatementCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/OpenStatementCompiler.java
new file mode 100644
index 0000000..b6125fd
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/OpenStatementCompiler.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+import org.apache.phoenix.parse.DeclareCursorStatement;
+import org.apache.phoenix.parse.OpenStatement;
+import org.apache.phoenix.schema.MetaDataClient;
+
+import java.sql.SQLException;
+import java.util.Collections;
+
+public class OpenStatementCompiler {
+    private final PhoenixStatement statement;
+    private final Operation operation;
+
+    public OpenStatementCompiler(PhoenixStatement statement, Operation operation) {
+        this.statement = statement;
+        this.operation = operation;
+    }
+
+    public MutationPlan compile(final OpenStatement open) throws SQLException {
+        final PhoenixConnection connection = statement.getConnection();
+        final StatementContext context = new StatementContext(statement);
+        final MetaDataClient client = new MetaDataClient(connection);
+        
+        return new BaseMutationPlan(context, operation) {
+            @Override
+            public MutationState execute() throws SQLException {
+                return client.open(open);
+            }
+
+            @Override
+            public ExplainPlan getExplainPlan() throws SQLException {
+                return new ExplainPlan(Collections.singletonList("OPEN CURSOR"));
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2cb617f3/phoenix-core/src/main/java/org/apache/phoenix/execute/CursorFetchPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/CursorFetchPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/CursorFetchPlan.java
new file mode 100644
index 0000000..aaea13e
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CursorFetchPlan.java
@@ -0,0 +1,53 @@
+package org.apache.phoenix.execute;
+
+import java.sql.SQLException;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.ExplainPlan;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.iterate.CursorResultIterator;
+import org.apache.phoenix.iterate.LookAheadResultIterator;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
+import org.apache.phoenix.iterate.ResultIterator;
+
+public class CursorFetchPlan extends DelegateQueryPlan {
+
+    private CursorResultIterator resultIterator;
+    private int fetchSize;
+    private boolean isAggregate;
+    private String cursorName;
+
+	public CursorFetchPlan(QueryPlan cursorQueryPlan,String cursorName) {
+		super(cursorQueryPlan);
+        this.isAggregate = delegate.getStatement().isAggregate() || delegate.getStatement().isDistinct();
+        this.cursorName = cursorName;
+	}
+
+	@Override
+	public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
+		StatementContext context = delegate.getContext();
+		if (resultIterator == null) {
+			context.getOverallQueryMetrics().startQuery();
+			resultIterator = new CursorResultIterator(LookAheadResultIterator.wrap(delegate.iterator(scanGrouper, scan)),cursorName);
+		}
+	    return resultIterator;
+	}
+
+
+	@Override
+	public ExplainPlan getExplainPlan() throws SQLException {
+		return delegate.getExplainPlan();
+	}
+	
+	public void setFetchSize(int fetchSize){
+	    this.fetchSize = fetchSize;	
+	}
+
+	public int getFetchSize() {
+		return fetchSize;
+	}
+
+        public boolean isAggregate(){
+            return this.isAggregate;
+        }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2cb617f3/phoenix-core/src/main/java/org/apache/phoenix/iterate/CursorResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/CursorResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/CursorResultIterator.java
new file mode 100644
index 0000000..7ff2785
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/CursorResultIterator.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.CursorUtil;
+
+import java.sql.SQLException;
+import java.util.List;
+
+public class CursorResultIterator implements ResultIterator {
+    private String cursorName;
+    private PeekingResultIterator delegate;
+    //TODO Configure fetch size from FETCH call
+    private int fetchSize = 0;
+    private int rowsRead = 0;
+    public CursorResultIterator(PeekingResultIterator delegate, String cursorName) {
+        this.delegate = delegate;
+        this.cursorName = cursorName;
+    }
+
+    @Override
+    public Tuple next() throws SQLException {
+    	if(!CursorUtil.moreValues(cursorName)){
+    	    return null;
+        } else if (fetchSize == rowsRead) {
+            return null;
+    	}
+
+        Tuple next = delegate.next();
+        CursorUtil.updateCursor(cursorName,next, delegate.peek());
+        rowsRead++;
+        return next;
+    }
+    
+    @Override
+    public void explain(List<String> planSteps) {
+        delegate.explain(planSteps);
+        planSteps.add("CLIENT CURSOR " + cursorName);
+    }
+
+    @Override
+    public String toString() {
+        return "CursorResultIterator [cursor=" + cursorName + "]";
+    }
+
+    @Override
+    public void close() throws SQLException {
+        //NOP
+    }
+
+    public void closeCursor() throws SQLException {
+        delegate.close();
+    }
+
+    public void setFetchSize(int fetchSize){
+        this.fetchSize = fetchSize;
+        this.rowsRead = 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2cb617f3/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index f3c6d30..6981a30 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.call.CallRunner;
 import org.apache.phoenix.compile.BaseMutationPlan;
+import org.apache.phoenix.compile.CloseStatementCompiler;
 import org.apache.phoenix.compile.ColumnProjector;
 import org.apache.phoenix.compile.ColumnResolver;
 import org.apache.phoenix.compile.CreateFunctionCompiler;
@@ -55,6 +56,7 @@ import org.apache.phoenix.compile.CreateIndexCompiler;
 import org.apache.phoenix.compile.CreateSchemaCompiler;
 import org.apache.phoenix.compile.CreateSequenceCompiler;
 import org.apache.phoenix.compile.CreateTableCompiler;
+import org.apache.phoenix.compile.DeclareCursorCompiler;
 import org.apache.phoenix.compile.DeleteCompiler;
 import org.apache.phoenix.compile.DropSequenceCompiler;
 import org.apache.phoenix.compile.ExplainPlan;
@@ -63,6 +65,7 @@ import org.apache.phoenix.compile.FromCompiler;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.ListJarsQueryPlan;
 import org.apache.phoenix.compile.MutationPlan;
+import org.apache.phoenix.compile.OpenStatementCompiler;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.compile.QueryCompiler;
 import org.apache.phoenix.compile.QueryPlan;
@@ -91,13 +94,16 @@ import org.apache.phoenix.parse.AliasedNode;
 import org.apache.phoenix.parse.AlterIndexStatement;
 import org.apache.phoenix.parse.AlterSessionStatement;
 import org.apache.phoenix.parse.BindableStatement;
+import org.apache.phoenix.parse.CloseStatement;
 import org.apache.phoenix.parse.ColumnDef;
 import org.apache.phoenix.parse.ColumnName;
+import org.apache.phoenix.parse.CursorName;
 import org.apache.phoenix.parse.CreateFunctionStatement;
 import org.apache.phoenix.parse.CreateIndexStatement;
 import org.apache.phoenix.parse.CreateSchemaStatement;
 import org.apache.phoenix.parse.CreateSequenceStatement;
 import org.apache.phoenix.parse.CreateTableStatement;
+import org.apache.phoenix.parse.DeclareCursorStatement;
 import org.apache.phoenix.parse.DeleteJarStatement;
 import org.apache.phoenix.parse.DeleteStatement;
 import org.apache.phoenix.parse.DropColumnStatement;
@@ -107,6 +113,7 @@ import org.apache.phoenix.parse.DropSchemaStatement;
 import org.apache.phoenix.parse.DropSequenceStatement;
 import org.apache.phoenix.parse.DropTableStatement;
 import org.apache.phoenix.parse.ExecuteUpgradeStatement;
+import org.apache.phoenix.parse.FetchStatement;
 import org.apache.phoenix.parse.ExplainStatement;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.parse.HintNode;
@@ -117,6 +124,7 @@ import org.apache.phoenix.parse.LiteralParseNode;
 import org.apache.phoenix.parse.NamedNode;
 import org.apache.phoenix.parse.NamedTableNode;
 import org.apache.phoenix.parse.OffsetNode;
+import org.apache.phoenix.parse.OpenStatement;
 import org.apache.phoenix.parse.OrderByNode;
 import org.apache.phoenix.parse.PFunction;
 import org.apache.phoenix.parse.ParseNode;
@@ -155,6 +163,7 @@ import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.phoenix.trace.util.Tracing;
 import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.CursorUtil;
 import org.apache.phoenix.util.KeyValueUtil;
 import org.apache.phoenix.util.LogUtil;
 import org.apache.phoenix.util.PhoenixContextExecutor;
@@ -294,7 +303,7 @@ public class PhoenixStatement implements Statement, SQLCloseable {
                         }
                         StatementContext context = plan.getContext();
                         context.getOverallQueryMetrics().startQuery();
-                        PhoenixResultSet rs = newResultSet(resultIterator, plan.getProjector(), context);
+                        PhoenixResultSet rs = newResultSet(resultIterator, plan.getProjector(), plan.getContext());
                         resultSets.add(rs);
                         setLastQueryPlan(plan);
                         setLastResultSet(rs);
@@ -403,6 +412,13 @@ public class PhoenixStatement implements Statement, SQLCloseable {
             super(from, hint, isDistinct, select, where, groupBy, having, orderBy, limit, offset, bindCount, isAggregate, hasSequence, selects, udfParseNodes);
         }
         
+        private ExecutableSelectStatement(ExecutableSelectStatement select) {
+            this(select.getFrom(), select.getHint(), select.isDistinct(), select.getSelect(), select.getWhere(),
+                    select.getGroupBy(), select.getHaving(), select.getOrderBy(), select.getLimit(), select.getOffset(), select.getBindCount(),
+                    select.isAggregate(), select.hasSequence(), select.getSelects(), select.getUdfParseNodes());
+        }
+		
+        
         @SuppressWarnings("unchecked")
         @Override
         public QueryPlan compilePlan(PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException {
@@ -417,6 +433,7 @@ public class PhoenixStatement implements Statement, SQLCloseable {
                 resolver = FromCompiler.getResolverForQuery(transformedSelect, stmt.getConnection());
                 select = StatementNormalizer.normalize(transformedSelect, resolver);
             }
+
             QueryPlan plan = new QueryCompiler(stmt, select, resolver, Collections.<PDatum>emptyList(), stmt.getConnection().getIteratorFactory(), new SequenceManager(stmt), true).compile();
             plan.getContext().getSequenceManager().validateSequences(seqAction);
             return plan;
@@ -765,6 +782,56 @@ public class PhoenixStatement implements Statement, SQLCloseable {
         }
     }
 
+    private static class ExecutableDeclareCursorStatement extends DeclareCursorStatement implements CompilableStatement {
+        public ExecutableDeclareCursorStatement(CursorName cursor, SelectStatement select){
+            super(cursor, select);
+        }
+
+        @Override
+        public MutationPlan compilePlan(PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException {
+            ExecutableSelectStatement wrappedSelect = new ExecutableSelectStatement(
+            		(ExecutableSelectStatement) stmt.parseStatement(this.getQuerySQL()));
+            DeclareCursorCompiler compiler = new DeclareCursorCompiler(stmt, this.getOperation(),wrappedSelect.compilePlan(stmt, seqAction));
+            return compiler.compile(this);
+        }
+    }
+
+    private static class ExecutableOpenStatement extends OpenStatement implements CompilableStatement {
+        public ExecutableOpenStatement(CursorName cursor){
+            super(cursor);
+        }
+
+        @Override
+        public MutationPlan compilePlan(PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException {
+            OpenStatementCompiler compiler = new OpenStatementCompiler(stmt, this.getOperation());
+            return compiler.compile(this);
+        }
+    }
+
+    private static class ExecutableCloseStatement extends CloseStatement implements CompilableStatement {
+        public ExecutableCloseStatement(CursorName cursor){
+            super(cursor);
+        }
+
+        @Override
+        public MutationPlan compilePlan(PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException {
+            CloseStatementCompiler compiler = new CloseStatementCompiler(stmt, this.getOperation());
+            return compiler.compile(this);
+        }
+    }
+
+    private static class ExecutableFetchStatement extends FetchStatement implements CompilableStatement {
+        public ExecutableFetchStatement(CursorName cursor, boolean isNext, int fetchLimit){
+            super(cursor, isNext, fetchLimit);
+        }
+
+        @Override
+        public QueryPlan compilePlan(PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException {
+            return CursorUtil.getFetchPlan(this.getCursorName().getName(), this.isNext(), this.getFetchSize());
+        }
+
+    }
+
     private static class ExecutableDeleteJarStatement extends DeleteJarStatement implements CompilableStatement {
 
         public ExecutableDeleteJarStatement(LiteralParseNode jarPath) {
@@ -1209,7 +1276,27 @@ public class PhoenixStatement implements Statement, SQLCloseable {
                 Map<String, UDFParseNode> udfParseNodes, List<Pair<ColumnName,ParseNode>> onDupKeyPairs) {
             return new ExecutableUpsertStatement(table, hintNode, columns, values, select, bindCount, udfParseNodes, onDupKeyPairs);
         }
-        
+
+        @Override
+        public ExecutableDeclareCursorStatement declareCursor(CursorName cursor, SelectStatement select){
+            return new ExecutableDeclareCursorStatement(cursor, select);
+        }
+
+        @Override
+        public ExecutableFetchStatement fetch(CursorName cursor, boolean isNext, int fetchLimit){
+            return new ExecutableFetchStatement(cursor, isNext, fetchLimit);
+        }
+
+        @Override
+        public ExecutableOpenStatement open(CursorName cursor){
+            return new ExecutableOpenStatement(cursor);
+        }
+
+        @Override
+        public ExecutableCloseStatement close(CursorName cursor){
+            return new ExecutableCloseStatement(cursor);
+        }
+
         @Override
         public ExecutableDeleteStatement delete(NamedTableNode table, HintNode hint, ParseNode whereNode, List<OrderByNode> orderBy, LimitNode limit, int bindCount, Map<String, UDFParseNode> udfParseNodes) {
             return new ExecutableDeleteStatement(table, hint, whereNode, orderBy, limit, bindCount, udfParseNodes);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2cb617f3/phoenix-core/src/main/java/org/apache/phoenix/parse/CloseStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/CloseStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/CloseStatement.java
new file mode 100644
index 0000000..5d7af34
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/CloseStatement.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+
+public class CloseStatement implements BindableStatement {
+    private final CursorName cursorName;
+
+    public CloseStatement(CursorName cursorName){
+        this.cursorName = cursorName;
+    }
+
+    public String getCursorName(){
+        return cursorName.getName();
+    }
+
+    public int getBindCount(){
+        return 0;
+    }
+
+    public Operation getOperation(){
+        return Operation.UPSERT;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2cb617f3/phoenix-core/src/main/java/org/apache/phoenix/parse/CursorName.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/CursorName.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/CursorName.java
new file mode 100644
index 0000000..5b9de76
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/CursorName.java
@@ -0,0 +1,26 @@
+package org.apache.phoenix.parse;
+
+import org.apache.phoenix.util.SchemaUtil;
+
+public class CursorName {
+    private final String name;
+    private final boolean isCaseSensitive;
+
+    public CursorName(String name, boolean isCaseSensitive){
+        this.name = name;
+        this.isCaseSensitive = isCaseSensitive;
+    }
+
+    public CursorName(String name){
+        this.name = name;
+        this.isCaseSensitive = name == null ? false: SchemaUtil.isCaseSensitive(name);
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public boolean isCaseSensitive() {
+        return isCaseSensitive;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2cb617f3/phoenix-core/src/main/java/org/apache/phoenix/parse/DeclareCursorStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/DeclareCursorStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/DeclareCursorStatement.java
new file mode 100644
index 0000000..68129ec
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/DeclareCursorStatement.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+import java.util.*;
+
+public class DeclareCursorStatement implements BindableStatement {
+    private final CursorName cursorName;
+    private final SelectStatement select;
+
+    public DeclareCursorStatement(CursorName cursorName, SelectStatement select){
+        this.cursorName = cursorName;
+        this.select = select;
+    }
+
+    public String getCursorName(){
+        return cursorName.getName();
+    }
+
+    public String getQuerySQL(){
+        //Check if there are parameters to bind.
+        if(select.getBindCount() > 0){
+
+        }
+        //TODO: Test if this works
+        return select.toString();
+    }
+
+    public SelectStatement getSelect(){
+    	return select;
+    }
+
+    public List<OrderByNode> getSelectOrderBy() {
+        return select.getOrderBy();
+    }
+
+    public int getBindCount(){
+        return select.getBindCount();
+    }
+
+    public Operation getOperation(){
+        return Operation.UPSERT;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2cb617f3/phoenix-core/src/main/java/org/apache/phoenix/parse/FetchStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/FetchStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/FetchStatement.java
new file mode 100644
index 0000000..08e9724
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/FetchStatement.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+
+public class FetchStatement implements BindableStatement {
+    private final CursorName cursorName;
+    private final boolean isNext;
+    private final int fetchSize;
+
+    public FetchStatement(CursorName cursorName, boolean isNext, int fetchSize){
+        this.cursorName = cursorName;
+        this.isNext = isNext;
+        this.fetchSize = fetchSize;
+    }
+
+    public CursorName getCursorName(){
+        return cursorName;
+    }
+
+    public boolean isNext(){
+        return isNext;
+    }
+
+    public int getBindCount(){
+        return 0;
+    }
+
+    public Operation getOperation(){
+        return Operation.QUERY;
+    }
+    
+    public int getFetchSize(){
+    	return fetchSize;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2cb617f3/phoenix-core/src/main/java/org/apache/phoenix/parse/OpenStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/OpenStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/OpenStatement.java
new file mode 100644
index 0000000..ad905b0
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/OpenStatement.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+
+public class OpenStatement implements BindableStatement {
+    private final CursorName cursorName;
+
+    public OpenStatement(CursorName cursorName){
+        this.cursorName = cursorName;
+    }
+
+    public String getCursorName(){
+        return cursorName.getName();
+    }
+
+    public int getBindCount(){
+        return 0;
+    }
+
+    public Operation getOperation(){
+        return Operation.UPSERT;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2cb617f3/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
index 4b65c6a..4628d51 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
@@ -731,6 +731,26 @@ public class ParseNodeFactory {
         return new UpsertStatement(table, hint, columns, values, select, bindCount, udfParseNodes, onDupKeyPairs);
     }
 
+    public CursorName cursorName(String name){
+        return new CursorName(name);
+    }
+
+    public DeclareCursorStatement declareCursor(CursorName cursor, SelectStatement select){
+        return new DeclareCursorStatement(cursor, select);
+    }
+
+    public FetchStatement fetch(CursorName cursor, boolean isNext, int fetchLimit){
+        return new FetchStatement(cursor, isNext, fetchLimit);
+    }
+
+    public OpenStatement open(CursorName cursor){
+        return new OpenStatement(cursor);
+    }
+
+    public CloseStatement close(CursorName cursor){
+        return new CloseStatement(cursor);
+    }
+
     public DeleteStatement delete(NamedTableNode table, HintNode hint, ParseNode node, List<OrderByNode> orderBy, LimitNode limit, int bindCount, Map<String, UDFParseNode> udfParseNodes) {
         return new DeleteStatement(table, hint, node, orderBy, limit, bindCount, udfParseNodes);
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2cb617f3/phoenix-core/src/main/java/org/apache/phoenix/parse/SQLParser.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/SQLParser.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/SQLParser.java
index 1a80991..b6b7de2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/SQLParser.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/SQLParser.java
@@ -139,6 +139,82 @@ public class SQLParser {
     }
 
     /**
+     * Parses the input as a SQL declare cursor statement.
+     * Used only in tests
+     * @throws SQLException
+     */
+    public DeclareCursorStatement parseDeclareCursor() throws SQLException {
+        try {
+            DeclareCursorStatement statement = parser.declare_cursor_node();
+            return statement;
+        } catch (RecognitionException e) {
+            throw PhoenixParserException.newException(e, parser.getTokenNames());
+        } catch (RuntimeException e) {
+            if (e.getCause() instanceof SQLException) {
+                throw (SQLException) e.getCause();
+            }
+            throw PhoenixParserException.newException(e, parser.getTokenNames());
+        }
+    }
+
+    /**
+     * Parses the input as a SQL cursor open statement.
+     * Used only in tests
+     * @throws SQLException
+     */
+    public OpenStatement parseOpen() throws SQLException {
+        try {
+            OpenStatement statement = parser.cursor_open_node();
+            return statement;
+        } catch (RecognitionException e) {
+            throw PhoenixParserException.newException(e, parser.getTokenNames());
+        } catch (RuntimeException e) {
+            if (e.getCause() instanceof SQLException) {
+                throw (SQLException) e.getCause();
+            }
+            throw PhoenixParserException.newException(e, parser.getTokenNames());
+        }
+    }
+
+    /**
+     * Parses the input as a SQL cursor close statement.
+     * Used only in tests
+     * @throws SQLException
+     */
+    public CloseStatement parseClose() throws SQLException {
+        try {
+            CloseStatement statement = parser.cursor_close_node();
+            return statement;
+        } catch (RecognitionException e) {
+            throw PhoenixParserException.newException(e, parser.getTokenNames());
+        } catch (RuntimeException e) {
+            if (e.getCause() instanceof SQLException) {
+                throw (SQLException) e.getCause();
+            }
+            throw PhoenixParserException.newException(e, parser.getTokenNames());
+        }
+    }
+
+    /**
+     * Parses the input as a SQL cursor fetch statement.
+     * Used only in tests
+     * @throws SQLException
+     */
+    public FetchStatement parseFetch() throws SQLException {
+        try {
+            FetchStatement statement = parser.cursor_fetch_node();
+            return statement;
+        } catch (RecognitionException e) {
+            throw PhoenixParserException.newException(e, parser.getTokenNames());
+        } catch (RuntimeException e) {
+            if (e.getCause() instanceof SQLException) {
+                throw (SQLException) e.getCause();
+            }
+            throw PhoenixParserException.newException(e, parser.getTokenNames());
+        }
+    }
+
+    /**
      * Parses the input as a SQL select statement.
      * Used only in tests
      * @throws SQLException 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2cb617f3/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 8005e4a..1254d79 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -144,6 +144,7 @@ import org.apache.phoenix.compile.MutationPlan;
 import org.apache.phoenix.compile.PostDDLCompiler;
 import org.apache.phoenix.compile.PostIndexDDLCompiler;
 import org.apache.phoenix.compile.PostLocalIndexDDLCompiler;
+import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.compile.StatementNormalizer;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
@@ -165,6 +166,7 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.parse.AddColumnStatement;
 import org.apache.phoenix.parse.AlterIndexStatement;
+import org.apache.phoenix.parse.CloseStatement;
 import org.apache.phoenix.parse.ColumnDef;
 import org.apache.phoenix.parse.ColumnDefInPkConstraint;
 import org.apache.phoenix.parse.ColumnName;
@@ -173,6 +175,7 @@ import org.apache.phoenix.parse.CreateIndexStatement;
 import org.apache.phoenix.parse.CreateSchemaStatement;
 import org.apache.phoenix.parse.CreateSequenceStatement;
 import org.apache.phoenix.parse.CreateTableStatement;
+import org.apache.phoenix.parse.DeclareCursorStatement;
 import org.apache.phoenix.parse.DropColumnStatement;
 import org.apache.phoenix.parse.DropFunctionStatement;
 import org.apache.phoenix.parse.DropIndexStatement;
@@ -181,6 +184,7 @@ import org.apache.phoenix.parse.DropSequenceStatement;
 import org.apache.phoenix.parse.DropTableStatement;
 import org.apache.phoenix.parse.IndexKeyConstraint;
 import org.apache.phoenix.parse.NamedTableNode;
+import org.apache.phoenix.parse.OpenStatement;
 import org.apache.phoenix.parse.PFunction;
 import org.apache.phoenix.parse.PFunction.FunctionArgument;
 import org.apache.phoenix.parse.PSchema;
@@ -214,6 +218,7 @@ import org.apache.phoenix.schema.types.PUnsignedLong;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.CursorUtil;
 import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.LogUtil;
@@ -1364,6 +1369,21 @@ public class MetaDataClient {
         return fullName;
     }
 
+    public MutationState declareCursor(DeclareCursorStatement statement, QueryPlan queryPlan) throws SQLException {
+        CursorUtil.declareCursor(statement, queryPlan);
+        return new MutationState(0,connection);
+    }
+
+    public MutationState open(OpenStatement statement) throws SQLException {
+        CursorUtil.openCursor(statement, connection);
+        return new MutationState(0,connection);
+    }
+
+    public MutationState close(CloseStatement statement) throws SQLException {
+        CursorUtil.closeCursor(statement);
+        return new MutationState(0,connection);
+    }
+
     /**
      * Create an index table by morphing the CreateIndexStatement into a CreateTableStatement and calling
      * MetaDataClient.createTable. In doing so, we perform the following translations: