You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2014/12/22 06:39:11 UTC
[2/2] phoenix git commit: PHOENIX-1535 Secondary local index casues
Undefined column error with queries involving joins(Maryann Xue)
PHOENIX-1535 Secondary local index casues Undefined column error with queries involving joins(Maryann Xue)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/f9ca8816
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/f9ca8816
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/f9ca8816
Branch: refs/heads/master
Commit: f9ca8816d6356b64368d3e1b2b34eee8696f8cc2
Parents: 01cef51
Author: Rajeshbabu Chintaguntla <ra...@apache.org>
Authored: Mon Dec 22 11:07:39 2014 +0530
Committer: Rajeshbabu Chintaguntla <ra...@apache.org>
Committed: Mon Dec 22 11:07:39 2014 +0530
----------------------------------------------------------------------
.../org/apache/phoenix/end2end/HashJoinIT.java | 251 +-----------------
.../apache/phoenix/end2end/SortMergeJoinIT.java | 251 +-----------------
.../org/apache/phoenix/end2end/SubqueryIT.java | 251 +-----------------
.../apache/phoenix/compile/JoinCompiler.java | 114 +++++---
.../apache/phoenix/compile/QueryCompiler.java | 10 +-
.../coprocessor/BaseScannerRegionObserver.java | 191 ++++++++++++++
.../GroupedAggregateRegionObserver.java | 64 ++---
.../phoenix/coprocessor/ScanRegionObserver.java | 174 +-----------
.../UngroupedAggregateRegionObserver.java | 32 ++-
.../java/org/apache/phoenix/query/BaseTest.java | 262 +++++++++++++++++++
10 files changed, 593 insertions(+), 1007 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f9ca8816/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
index f8c5899..23d6c13 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
@@ -89,7 +89,7 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
@Before
public void initTable() throws Exception {
- initTableValues();
+ initJoinTableValues(getUrl(), null, null);
if (indexDDL != null && indexDDL.length > 0) {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Connection conn = DriverManager.getConnection(getUrl(), props);
@@ -1216,255 +1216,6 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
}
- protected void initTableValues() throws Exception {
- ensureTableCreated(getUrl(), JOIN_CUSTOMER_TABLE_FULL_NAME);
- ensureTableCreated(getUrl(), JOIN_ITEM_TABLE_FULL_NAME);
- ensureTableCreated(getUrl(), JOIN_SUPPLIER_TABLE_FULL_NAME);
- ensureTableCreated(getUrl(), JOIN_ORDER_TABLE_FULL_NAME);
-
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
- try {
- conn.createStatement().execute("CREATE SEQUENCE my.seq");
- // Insert into customer table
- PreparedStatement stmt = conn.prepareStatement(
- "upsert into " + JOIN_CUSTOMER_TABLE_FULL_NAME +
- " (\"customer_id\", " +
- " NAME, " +
- " PHONE, " +
- " ADDRESS, " +
- " LOC_ID, " +
- " DATE) " +
- "values (?, ?, ?, ?, ?, ?)");
- stmt.setString(1, "0000000001");
- stmt.setString(2, "C1");
- stmt.setString(3, "999-999-1111");
- stmt.setString(4, "101 XXX Street");
- stmt.setString(5, "10001");
- stmt.setDate(6, new Date(format.parse("2013-11-01 10:20:36").getTime()));
- stmt.execute();
-
- stmt.setString(1, "0000000002");
- stmt.setString(2, "C2");
- stmt.setString(3, "999-999-2222");
- stmt.setString(4, "202 XXX Street");
- stmt.setString(5, null);
- stmt.setDate(6, new Date(format.parse("2013-11-25 16:45:07").getTime()));
- stmt.execute();
-
- stmt.setString(1, "0000000003");
- stmt.setString(2, "C3");
- stmt.setString(3, "999-999-3333");
- stmt.setString(4, "303 XXX Street");
- stmt.setString(5, null);
- stmt.setDate(6, new Date(format.parse("2013-11-25 10:06:29").getTime()));
- stmt.execute();
-
- stmt.setString(1, "0000000004");
- stmt.setString(2, "C4");
- stmt.setString(3, "999-999-4444");
- stmt.setString(4, "404 XXX Street");
- stmt.setString(5, "10004");
- stmt.setDate(6, new Date(format.parse("2013-11-22 14:22:56").getTime()));
- stmt.execute();
-
- stmt.setString(1, "0000000005");
- stmt.setString(2, "C5");
- stmt.setString(3, "999-999-5555");
- stmt.setString(4, "505 XXX Street");
- stmt.setString(5, "10005");
- stmt.setDate(6, new Date(format.parse("2013-11-27 09:37:50").getTime()));
- stmt.execute();
-
- stmt.setString(1, "0000000006");
- stmt.setString(2, "C6");
- stmt.setString(3, "999-999-6666");
- stmt.setString(4, "606 XXX Street");
- stmt.setString(5, "10001");
- stmt.setDate(6, new Date(format.parse("2013-11-01 10:20:36").getTime()));
- stmt.execute();
-
- // Insert into item table
- stmt = conn.prepareStatement(
- "upsert into " + JOIN_ITEM_TABLE_FULL_NAME +
- " (\"item_id\", " +
- " NAME, " +
- " PRICE, " +
- " DISCOUNT1, " +
- " DISCOUNT2, " +
- " \"supplier_id\", " +
- " DESCRIPTION) " +
- "values (?, ?, ?, ?, ?, ?, ?)");
- stmt.setString(1, "0000000001");
- stmt.setString(2, "T1");
- stmt.setInt(3, 100);
- stmt.setInt(4, 5);
- stmt.setInt(5, 10);
- stmt.setString(6, "0000000001");
- stmt.setString(7, "Item T1");
- stmt.execute();
-
- stmt.setString(1, "0000000002");
- stmt.setString(2, "T2");
- stmt.setInt(3, 200);
- stmt.setInt(4, 5);
- stmt.setInt(5, 8);
- stmt.setString(6, "0000000001");
- stmt.setString(7, "Item T2");
- stmt.execute();
-
- stmt.setString(1, "0000000003");
- stmt.setString(2, "T3");
- stmt.setInt(3, 300);
- stmt.setInt(4, 8);
- stmt.setInt(5, 12);
- stmt.setString(6, "0000000002");
- stmt.setString(7, "Item T3");
- stmt.execute();
-
- stmt.setString(1, "0000000004");
- stmt.setString(2, "T4");
- stmt.setInt(3, 400);
- stmt.setInt(4, 6);
- stmt.setInt(5, 10);
- stmt.setString(6, "0000000002");
- stmt.setString(7, "Item T4");
- stmt.execute();
-
- stmt.setString(1, "0000000005");
- stmt.setString(2, "T5");
- stmt.setInt(3, 500);
- stmt.setInt(4, 8);
- stmt.setInt(5, 15);
- stmt.setString(6, "0000000005");
- stmt.setString(7, "Item T5");
- stmt.execute();
-
- stmt.setString(1, "0000000006");
- stmt.setString(2, "T6");
- stmt.setInt(3, 600);
- stmt.setInt(4, 8);
- stmt.setInt(5, 15);
- stmt.setString(6, "0000000006");
- stmt.setString(7, "Item T6");
- stmt.execute();
-
- stmt.setString(1, "invalid001");
- stmt.setString(2, "INVALID-1");
- stmt.setInt(3, 0);
- stmt.setInt(4, 0);
- stmt.setInt(5, 0);
- stmt.setString(6, "0000000000");
- stmt.setString(7, "Invalid item for join test");
- stmt.execute();
-
- // Insert into supplier table
- stmt = conn.prepareStatement(
- "upsert into " + JOIN_SUPPLIER_TABLE_FULL_NAME +
- " (\"supplier_id\", " +
- " NAME, " +
- " PHONE, " +
- " ADDRESS, " +
- " LOC_ID) " +
- "values (?, ?, ?, ?, ?)");
- stmt.setString(1, "0000000001");
- stmt.setString(2, "S1");
- stmt.setString(3, "888-888-1111");
- stmt.setString(4, "101 YYY Street");
- stmt.setString(5, "10001");
- stmt.execute();
-
- stmt.setString(1, "0000000002");
- stmt.setString(2, "S2");
- stmt.setString(3, "888-888-2222");
- stmt.setString(4, "202 YYY Street");
- stmt.setString(5, "10002");
- stmt.execute();
-
- stmt.setString(1, "0000000003");
- stmt.setString(2, "S3");
- stmt.setString(3, "888-888-3333");
- stmt.setString(4, "303 YYY Street");
- stmt.setString(5, null);
- stmt.execute();
-
- stmt.setString(1, "0000000004");
- stmt.setString(2, "S4");
- stmt.setString(3, "888-888-4444");
- stmt.setString(4, "404 YYY Street");
- stmt.setString(5, null);
- stmt.execute();
-
- stmt.setString(1, "0000000005");
- stmt.setString(2, "S5");
- stmt.setString(3, "888-888-5555");
- stmt.setString(4, "505 YYY Street");
- stmt.setString(5, "10005");
- stmt.execute();
-
- stmt.setString(1, "0000000006");
- stmt.setString(2, "S6");
- stmt.setString(3, "888-888-6666");
- stmt.setString(4, "606 YYY Street");
- stmt.setString(5, "10006");
- stmt.execute();
-
- // Insert into order table
- stmt = conn.prepareStatement(
- "upsert into " + JOIN_ORDER_TABLE_FULL_NAME +
- " (\"order_id\", " +
- " \"customer_id\", " +
- " \"item_id\", " +
- " PRICE, " +
- " QUANTITY," +
- " DATE) " +
- "values (?, ?, ?, ?, ?, ?)");
- stmt.setString(1, "000000000000001");
- stmt.setString(2, "0000000004");
- stmt.setString(3, "0000000001");
- stmt.setInt(4, 100);
- stmt.setInt(5, 1000);
- stmt.setTimestamp(6, new Timestamp(format.parse("2013-11-22 14:22:56").getTime()));
- stmt.execute();
-
- stmt.setString(1, "000000000000002");
- stmt.setString(2, "0000000003");
- stmt.setString(3, "0000000006");
- stmt.setInt(4, 552);
- stmt.setInt(5, 2000);
- stmt.setTimestamp(6, new Timestamp(format.parse("2013-11-25 10:06:29").getTime()));
- stmt.execute();
-
- stmt.setString(1, "000000000000003");
- stmt.setString(2, "0000000002");
- stmt.setString(3, "0000000002");
- stmt.setInt(4, 190);
- stmt.setInt(5, 3000);
- stmt.setTimestamp(6, new Timestamp(format.parse("2013-11-25 16:45:07").getTime()));
- stmt.execute();
-
- stmt.setString(1, "000000000000004");
- stmt.setString(2, "0000000004");
- stmt.setString(3, "0000000006");
- stmt.setInt(4, 510);
- stmt.setInt(5, 4000);
- stmt.setTimestamp(6, new Timestamp(format.parse("2013-11-26 13:26:04").getTime()));
- stmt.execute();
-
- stmt.setString(1, "000000000000005");
- stmt.setString(2, "0000000005");
- stmt.setString(3, "0000000003");
- stmt.setInt(4, 264);
- stmt.setInt(5, 5000);
- stmt.setTimestamp(6, new Timestamp(format.parse("2013-11-27 09:37:50").getTime()));
- stmt.execute();
-
- conn.commit();
- } finally {
- conn.close();
- }
- }
-
@Test
public void testDefaultJoin() throws Exception {
String query = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + JOIN_ITEM_TABLE_FULL_NAME + " item JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " supp ON item.\"supplier_id\" = supp.\"supplier_id\"";
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f9ca8816/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinIT.java
index c75d49a..514664e 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinIT.java
@@ -88,7 +88,7 @@ public class SortMergeJoinIT extends BaseHBaseManagedTimeIT {
@Before
public void initTable() throws Exception {
- initTableValues();
+ initJoinTableValues(getUrl(), null, null);
if (indexDDL != null && indexDDL.length > 0) {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Connection conn = DriverManager.getConnection(getUrl(), props);
@@ -168,255 +168,6 @@ public class SortMergeJoinIT extends BaseHBaseManagedTimeIT {
return testCases;
}
-
- protected void initTableValues() throws Exception {
- ensureTableCreated(getUrl(), JOIN_CUSTOMER_TABLE_FULL_NAME);
- ensureTableCreated(getUrl(), JOIN_ITEM_TABLE_FULL_NAME);
- ensureTableCreated(getUrl(), JOIN_SUPPLIER_TABLE_FULL_NAME);
- ensureTableCreated(getUrl(), JOIN_ORDER_TABLE_FULL_NAME);
-
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
- try {
- conn.createStatement().execute("CREATE SEQUENCE my.seq");
- // Insert into customer table
- PreparedStatement stmt = conn.prepareStatement(
- "upsert into " + JOIN_CUSTOMER_TABLE_FULL_NAME +
- " (\"customer_id\", " +
- " NAME, " +
- " PHONE, " +
- " ADDRESS, " +
- " LOC_ID, " +
- " DATE) " +
- "values (?, ?, ?, ?, ?, ?)");
- stmt.setString(1, "0000000001");
- stmt.setString(2, "C1");
- stmt.setString(3, "999-999-1111");
- stmt.setString(4, "101 XXX Street");
- stmt.setString(5, "10001");
- stmt.setDate(6, new Date(format.parse("2013-11-01 10:20:36").getTime()));
- stmt.execute();
-
- stmt.setString(1, "0000000002");
- stmt.setString(2, "C2");
- stmt.setString(3, "999-999-2222");
- stmt.setString(4, "202 XXX Street");
- stmt.setString(5, null);
- stmt.setDate(6, new Date(format.parse("2013-11-25 16:45:07").getTime()));
- stmt.execute();
-
- stmt.setString(1, "0000000003");
- stmt.setString(2, "C3");
- stmt.setString(3, "999-999-3333");
- stmt.setString(4, "303 XXX Street");
- stmt.setString(5, null);
- stmt.setDate(6, new Date(format.parse("2013-11-25 10:06:29").getTime()));
- stmt.execute();
-
- stmt.setString(1, "0000000004");
- stmt.setString(2, "C4");
- stmt.setString(3, "999-999-4444");
- stmt.setString(4, "404 XXX Street");
- stmt.setString(5, "10004");
- stmt.setDate(6, new Date(format.parse("2013-11-22 14:22:56").getTime()));
- stmt.execute();
-
- stmt.setString(1, "0000000005");
- stmt.setString(2, "C5");
- stmt.setString(3, "999-999-5555");
- stmt.setString(4, "505 XXX Street");
- stmt.setString(5, "10005");
- stmt.setDate(6, new Date(format.parse("2013-11-27 09:37:50").getTime()));
- stmt.execute();
-
- stmt.setString(1, "0000000006");
- stmt.setString(2, "C6");
- stmt.setString(3, "999-999-6666");
- stmt.setString(4, "606 XXX Street");
- stmt.setString(5, "10001");
- stmt.setDate(6, new Date(format.parse("2013-11-01 10:20:36").getTime()));
- stmt.execute();
-
- // Insert into item table
- stmt = conn.prepareStatement(
- "upsert into " + JOIN_ITEM_TABLE_FULL_NAME +
- " (\"item_id\", " +
- " NAME, " +
- " PRICE, " +
- " DISCOUNT1, " +
- " DISCOUNT2, " +
- " \"supplier_id\", " +
- " DESCRIPTION) " +
- "values (?, ?, ?, ?, ?, ?, ?)");
- stmt.setString(1, "0000000001");
- stmt.setString(2, "T1");
- stmt.setInt(3, 100);
- stmt.setInt(4, 5);
- stmt.setInt(5, 10);
- stmt.setString(6, "0000000001");
- stmt.setString(7, "Item T1");
- stmt.execute();
-
- stmt.setString(1, "0000000002");
- stmt.setString(2, "T2");
- stmt.setInt(3, 200);
- stmt.setInt(4, 5);
- stmt.setInt(5, 8);
- stmt.setString(6, "0000000001");
- stmt.setString(7, "Item T2");
- stmt.execute();
-
- stmt.setString(1, "0000000003");
- stmt.setString(2, "T3");
- stmt.setInt(3, 300);
- stmt.setInt(4, 8);
- stmt.setInt(5, 12);
- stmt.setString(6, "0000000002");
- stmt.setString(7, "Item T3");
- stmt.execute();
-
- stmt.setString(1, "0000000004");
- stmt.setString(2, "T4");
- stmt.setInt(3, 400);
- stmt.setInt(4, 6);
- stmt.setInt(5, 10);
- stmt.setString(6, "0000000002");
- stmt.setString(7, "Item T4");
- stmt.execute();
-
- stmt.setString(1, "0000000005");
- stmt.setString(2, "T5");
- stmt.setInt(3, 500);
- stmt.setInt(4, 8);
- stmt.setInt(5, 15);
- stmt.setString(6, "0000000005");
- stmt.setString(7, "Item T5");
- stmt.execute();
-
- stmt.setString(1, "0000000006");
- stmt.setString(2, "T6");
- stmt.setInt(3, 600);
- stmt.setInt(4, 8);
- stmt.setInt(5, 15);
- stmt.setString(6, "0000000006");
- stmt.setString(7, "Item T6");
- stmt.execute();
-
- stmt.setString(1, "invalid001");
- stmt.setString(2, "INVALID-1");
- stmt.setInt(3, 0);
- stmt.setInt(4, 0);
- stmt.setInt(5, 0);
- stmt.setString(6, "0000000000");
- stmt.setString(7, "Invalid item for join test");
- stmt.execute();
-
- // Insert into supplier table
- stmt = conn.prepareStatement(
- "upsert into " + JOIN_SUPPLIER_TABLE_FULL_NAME +
- " (\"supplier_id\", " +
- " NAME, " +
- " PHONE, " +
- " ADDRESS, " +
- " LOC_ID) " +
- "values (?, ?, ?, ?, ?)");
- stmt.setString(1, "0000000001");
- stmt.setString(2, "S1");
- stmt.setString(3, "888-888-1111");
- stmt.setString(4, "101 YYY Street");
- stmt.setString(5, "10001");
- stmt.execute();
-
- stmt.setString(1, "0000000002");
- stmt.setString(2, "S2");
- stmt.setString(3, "888-888-2222");
- stmt.setString(4, "202 YYY Street");
- stmt.setString(5, "10002");
- stmt.execute();
-
- stmt.setString(1, "0000000003");
- stmt.setString(2, "S3");
- stmt.setString(3, "888-888-3333");
- stmt.setString(4, "303 YYY Street");
- stmt.setString(5, null);
- stmt.execute();
-
- stmt.setString(1, "0000000004");
- stmt.setString(2, "S4");
- stmt.setString(3, "888-888-4444");
- stmt.setString(4, "404 YYY Street");
- stmt.setString(5, null);
- stmt.execute();
-
- stmt.setString(1, "0000000005");
- stmt.setString(2, "S5");
- stmt.setString(3, "888-888-5555");
- stmt.setString(4, "505 YYY Street");
- stmt.setString(5, "10005");
- stmt.execute();
-
- stmt.setString(1, "0000000006");
- stmt.setString(2, "S6");
- stmt.setString(3, "888-888-6666");
- stmt.setString(4, "606 YYY Street");
- stmt.setString(5, "10006");
- stmt.execute();
-
- // Insert into order table
- stmt = conn.prepareStatement(
- "upsert into " + JOIN_ORDER_TABLE_FULL_NAME +
- " (\"order_id\", " +
- " \"customer_id\", " +
- " \"item_id\", " +
- " PRICE, " +
- " QUANTITY," +
- " DATE) " +
- "values (?, ?, ?, ?, ?, ?)");
- stmt.setString(1, "000000000000001");
- stmt.setString(2, "0000000004");
- stmt.setString(3, "0000000001");
- stmt.setInt(4, 100);
- stmt.setInt(5, 1000);
- stmt.setTimestamp(6, new Timestamp(format.parse("2013-11-22 14:22:56").getTime()));
- stmt.execute();
-
- stmt.setString(1, "000000000000002");
- stmt.setString(2, "0000000003");
- stmt.setString(3, "0000000006");
- stmt.setInt(4, 552);
- stmt.setInt(5, 2000);
- stmt.setTimestamp(6, new Timestamp(format.parse("2013-11-25 10:06:29").getTime()));
- stmt.execute();
-
- stmt.setString(1, "000000000000003");
- stmt.setString(2, "0000000002");
- stmt.setString(3, "0000000002");
- stmt.setInt(4, 190);
- stmt.setInt(5, 3000);
- stmt.setTimestamp(6, new Timestamp(format.parse("2013-11-25 16:45:07").getTime()));
- stmt.execute();
-
- stmt.setString(1, "000000000000004");
- stmt.setString(2, "0000000004");
- stmt.setString(3, "0000000006");
- stmt.setInt(4, 510);
- stmt.setInt(5, 4000);
- stmt.setTimestamp(6, new Timestamp(format.parse("2013-11-26 13:26:04").getTime()));
- stmt.execute();
-
- stmt.setString(1, "000000000000005");
- stmt.setString(2, "0000000005");
- stmt.setString(3, "0000000003");
- stmt.setInt(4, 264);
- stmt.setInt(5, 5000);
- stmt.setTimestamp(6, new Timestamp(format.parse("2013-11-27 09:37:50").getTime()));
- stmt.execute();
-
- conn.commit();
- } finally {
- conn.close();
- }
- }
@Test
public void testDefaultJoin() throws Exception {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f9ca8816/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java
index 470ba9c..2d02970 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java
@@ -35,13 +35,10 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.sql.Connection;
-import java.sql.Date;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
-import java.sql.Timestamp;
-import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -67,7 +64,6 @@ import com.google.common.collect.Maps;
@RunWith(Parameterized.class)
public class SubqueryIT extends BaseHBaseManagedTimeIT {
- private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
private String[] indexDDL;
private String[] plans;
@@ -88,7 +84,8 @@ public class SubqueryIT extends BaseHBaseManagedTimeIT {
@Before
public void initTable() throws Exception {
- initTableValues();
+ initJoinTableValues(getUrl(), null, null);
+ initCoItemTableValues();
if (indexDDL != null && indexDDL.length > 0) {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Connection conn = DriverManager.getConnection(getUrl(), props);
@@ -319,254 +316,14 @@ public class SubqueryIT extends BaseHBaseManagedTimeIT {
}
- protected void initTableValues() throws Exception {
- ensureTableCreated(getUrl(), JOIN_CUSTOMER_TABLE_FULL_NAME);
- ensureTableCreated(getUrl(), JOIN_ITEM_TABLE_FULL_NAME);
- ensureTableCreated(getUrl(), JOIN_SUPPLIER_TABLE_FULL_NAME);
- ensureTableCreated(getUrl(), JOIN_ORDER_TABLE_FULL_NAME);
+ protected void initCoItemTableValues() throws Exception {
ensureTableCreated(getUrl(), JOIN_COITEM_TABLE_FULL_NAME);
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Connection conn = DriverManager.getConnection(getUrl(), props);
try {
- conn.createStatement().execute("CREATE SEQUENCE my.seq");
- // Insert into customer table
- PreparedStatement stmt = conn.prepareStatement(
- "upsert into " + JOIN_CUSTOMER_TABLE_FULL_NAME +
- " (\"customer_id\", " +
- " NAME, " +
- " PHONE, " +
- " ADDRESS, " +
- " LOC_ID, " +
- " DATE) " +
- "values (?, ?, ?, ?, ?, ?)");
- stmt.setString(1, "0000000001");
- stmt.setString(2, "C1");
- stmt.setString(3, "999-999-1111");
- stmt.setString(4, "101 XXX Street");
- stmt.setString(5, "10001");
- stmt.setDate(6, new Date(format.parse("2013-11-01 10:20:36").getTime()));
- stmt.execute();
-
- stmt.setString(1, "0000000002");
- stmt.setString(2, "C2");
- stmt.setString(3, "999-999-2222");
- stmt.setString(4, "202 XXX Street");
- stmt.setString(5, null);
- stmt.setDate(6, new Date(format.parse("2013-11-25 16:45:07").getTime()));
- stmt.execute();
-
- stmt.setString(1, "0000000003");
- stmt.setString(2, "C3");
- stmt.setString(3, "999-999-3333");
- stmt.setString(4, "303 XXX Street");
- stmt.setString(5, null);
- stmt.setDate(6, new Date(format.parse("2013-11-25 10:06:29").getTime()));
- stmt.execute();
-
- stmt.setString(1, "0000000004");
- stmt.setString(2, "C4");
- stmt.setString(3, "999-999-4444");
- stmt.setString(4, "404 XXX Street");
- stmt.setString(5, "10004");
- stmt.setDate(6, new Date(format.parse("2013-11-22 14:22:56").getTime()));
- stmt.execute();
-
- stmt.setString(1, "0000000005");
- stmt.setString(2, "C5");
- stmt.setString(3, "999-999-5555");
- stmt.setString(4, "505 XXX Street");
- stmt.setString(5, "10005");
- stmt.setDate(6, new Date(format.parse("2013-11-27 09:37:50").getTime()));
- stmt.execute();
-
- stmt.setString(1, "0000000006");
- stmt.setString(2, "C6");
- stmt.setString(3, "999-999-6666");
- stmt.setString(4, "606 XXX Street");
- stmt.setString(5, "10001");
- stmt.setDate(6, new Date(format.parse("2013-11-01 10:20:36").getTime()));
- stmt.execute();
-
- // Insert into item table
- stmt = conn.prepareStatement(
- "upsert into " + JOIN_ITEM_TABLE_FULL_NAME +
- " (\"item_id\", " +
- " NAME, " +
- " PRICE, " +
- " DISCOUNT1, " +
- " DISCOUNT2, " +
- " \"supplier_id\", " +
- " DESCRIPTION) " +
- "values (?, ?, ?, ?, ?, ?, ?)");
- stmt.setString(1, "0000000001");
- stmt.setString(2, "T1");
- stmt.setInt(3, 100);
- stmt.setInt(4, 5);
- stmt.setInt(5, 10);
- stmt.setString(6, "0000000001");
- stmt.setString(7, "Item T1");
- stmt.execute();
-
- stmt.setString(1, "0000000002");
- stmt.setString(2, "T2");
- stmt.setInt(3, 200);
- stmt.setInt(4, 5);
- stmt.setInt(5, 8);
- stmt.setString(6, "0000000001");
- stmt.setString(7, "Item T2");
- stmt.execute();
-
- stmt.setString(1, "0000000003");
- stmt.setString(2, "T3");
- stmt.setInt(3, 300);
- stmt.setInt(4, 8);
- stmt.setInt(5, 12);
- stmt.setString(6, "0000000002");
- stmt.setString(7, "Item T3");
- stmt.execute();
-
- stmt.setString(1, "0000000004");
- stmt.setString(2, "T4");
- stmt.setInt(3, 400);
- stmt.setInt(4, 6);
- stmt.setInt(5, 10);
- stmt.setString(6, "0000000002");
- stmt.setString(7, "Item T4");
- stmt.execute();
-
- stmt.setString(1, "0000000005");
- stmt.setString(2, "T5");
- stmt.setInt(3, 500);
- stmt.setInt(4, 8);
- stmt.setInt(5, 15);
- stmt.setString(6, "0000000005");
- stmt.setString(7, "Item T5");
- stmt.execute();
-
- stmt.setString(1, "0000000006");
- stmt.setString(2, "T6");
- stmt.setInt(3, 600);
- stmt.setInt(4, 8);
- stmt.setInt(5, 15);
- stmt.setString(6, "0000000006");
- stmt.setString(7, "Item T6");
- stmt.execute();
-
- stmt.setString(1, "invalid001");
- stmt.setString(2, "INVALID-1");
- stmt.setInt(3, 0);
- stmt.setInt(4, 0);
- stmt.setInt(5, 0);
- stmt.setString(6, "0000000000");
- stmt.setString(7, "Invalid item for join test");
- stmt.execute();
-
- // Insert into supplier table
- stmt = conn.prepareStatement(
- "upsert into " + JOIN_SUPPLIER_TABLE_FULL_NAME +
- " (\"supplier_id\", " +
- " NAME, " +
- " PHONE, " +
- " ADDRESS, " +
- " LOC_ID) " +
- "values (?, ?, ?, ?, ?)");
- stmt.setString(1, "0000000001");
- stmt.setString(2, "S1");
- stmt.setString(3, "888-888-1111");
- stmt.setString(4, "101 YYY Street");
- stmt.setString(5, "10001");
- stmt.execute();
-
- stmt.setString(1, "0000000002");
- stmt.setString(2, "S2");
- stmt.setString(3, "888-888-2222");
- stmt.setString(4, "202 YYY Street");
- stmt.setString(5, "10002");
- stmt.execute();
-
- stmt.setString(1, "0000000003");
- stmt.setString(2, "S3");
- stmt.setString(3, "888-888-3333");
- stmt.setString(4, "303 YYY Street");
- stmt.setString(5, null);
- stmt.execute();
-
- stmt.setString(1, "0000000004");
- stmt.setString(2, "S4");
- stmt.setString(3, "888-888-4444");
- stmt.setString(4, "404 YYY Street");
- stmt.setString(5, null);
- stmt.execute();
-
- stmt.setString(1, "0000000005");
- stmt.setString(2, "S5");
- stmt.setString(3, "888-888-5555");
- stmt.setString(4, "505 YYY Street");
- stmt.setString(5, "10005");
- stmt.execute();
-
- stmt.setString(1, "0000000006");
- stmt.setString(2, "S6");
- stmt.setString(3, "888-888-6666");
- stmt.setString(4, "606 YYY Street");
- stmt.setString(5, "10006");
- stmt.execute();
-
- // Insert into order table
- stmt = conn.prepareStatement(
- "upsert into " + JOIN_ORDER_TABLE_FULL_NAME +
- " (\"order_id\", " +
- " \"customer_id\", " +
- " \"item_id\", " +
- " PRICE, " +
- " QUANTITY," +
- " DATE) " +
- "values (?, ?, ?, ?, ?, ?)");
- stmt.setString(1, "000000000000001");
- stmt.setString(2, "0000000004");
- stmt.setString(3, "0000000001");
- stmt.setInt(4, 100);
- stmt.setInt(5, 1000);
- stmt.setTimestamp(6, new Timestamp(format.parse("2013-11-22 14:22:56").getTime()));
- stmt.execute();
-
- stmt.setString(1, "000000000000002");
- stmt.setString(2, "0000000003");
- stmt.setString(3, "0000000006");
- stmt.setInt(4, 552);
- stmt.setInt(5, 2000);
- stmt.setTimestamp(6, new Timestamp(format.parse("2013-11-25 10:06:29").getTime()));
- stmt.execute();
-
- stmt.setString(1, "000000000000003");
- stmt.setString(2, "0000000002");
- stmt.setString(3, "0000000002");
- stmt.setInt(4, 190);
- stmt.setInt(5, 3000);
- stmt.setTimestamp(6, new Timestamp(format.parse("2013-11-25 16:45:07").getTime()));
- stmt.execute();
-
- stmt.setString(1, "000000000000004");
- stmt.setString(2, "0000000004");
- stmt.setString(3, "0000000006");
- stmt.setInt(4, 510);
- stmt.setInt(5, 4000);
- stmt.setTimestamp(6, new Timestamp(format.parse("2013-11-26 13:26:04").getTime()));
- stmt.execute();
-
- stmt.setString(1, "000000000000005");
- stmt.setString(2, "0000000005");
- stmt.setString(3, "0000000003");
- stmt.setInt(4, 264);
- stmt.setInt(5, 5000);
- stmt.setTimestamp(6, new Timestamp(format.parse("2013-11-27 09:37:50").getTime()));
- stmt.execute();
-
- conn.commit();
-
// Insert into coitem table
- stmt = conn.prepareStatement(
+ PreparedStatement stmt = conn.prepareStatement(
"upsert into " + JOIN_COITEM_TABLE_FULL_NAME +
" (item_id, " +
" item_name, " +
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f9ca8816/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
index f90cef8..ecc66dd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
@@ -41,6 +41,7 @@ import org.apache.phoenix.expression.CoerceExpression;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.LiteralExpression;
import org.apache.phoenix.expression.function.CountAggregateFunction;
+import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.parse.AliasedNode;
import org.apache.phoenix.parse.AndParseNode;
@@ -69,26 +70,29 @@ import org.apache.phoenix.parse.WildcardParseNode;
import org.apache.phoenix.schema.AmbiguousColumnException;
import org.apache.phoenix.schema.ColumnNotFoundException;
import org.apache.phoenix.schema.ColumnRef;
-import org.apache.phoenix.schema.types.PDate;
-import org.apache.phoenix.schema.types.PDecimal;
-import org.apache.phoenix.schema.types.PBoolean;
-import org.apache.phoenix.schema.types.PDouble;
-import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.schema.MetaDataEntityNotFoundException;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PColumnImpl;
-import org.apache.phoenix.schema.types.PDataType;
-import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PNameFactory;
-import org.apache.phoenix.schema.types.PSmallint;
import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.schema.PTableImpl;
import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.types.PBoolean;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PDate;
+import org.apache.phoenix.schema.types.PDecimal;
+import org.apache.phoenix.schema.types.PDouble;
+import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.schema.types.PSmallint;
import org.apache.phoenix.schema.types.PTimestamp;
import org.apache.phoenix.schema.types.PTinyint;
import org.apache.phoenix.schema.types.PVarbinary;
import org.apache.phoenix.schema.types.PVarchar;
-import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.SchemaUtil;
import com.google.common.collect.ArrayListMultimap;
@@ -129,9 +133,9 @@ public class JoinCompiler {
joinTable.addFilter(select.getWhere());
}
- ColumnRefParseNodeVisitor generalRefVisitor = new ColumnRefParseNodeVisitor(resolver);
- ColumnRefParseNodeVisitor joinLocalRefVisitor = new ColumnRefParseNodeVisitor(resolver);
- ColumnRefParseNodeVisitor prefilterRefVisitor = new ColumnRefParseNodeVisitor(resolver);
+ ColumnRefParseNodeVisitor generalRefVisitor = new ColumnRefParseNodeVisitor(resolver, statement.getConnection());
+ ColumnRefParseNodeVisitor joinLocalRefVisitor = new ColumnRefParseNodeVisitor(resolver, statement.getConnection());
+ ColumnRefParseNodeVisitor prefilterRefVisitor = new ColumnRefParseNodeVisitor(resolver, statement.getConnection());
joinTable.pushDownColumnRefVisitors(generalRefVisitor, joinLocalRefVisitor, prefilterRefVisitor);
@@ -315,7 +319,7 @@ public class JoinCompiler {
WhereNodeVisitor visitor = new WhereNodeVisitor(origResolver, table,
postFilters, Collections.<TableRef>singletonList(table.getTableRef()),
- isPrefilterAccepted, prefilterAcceptedTables);
+ isPrefilterAccepted, prefilterAcceptedTables, statement.getConnection());
filter.accept(visitor);
}
@@ -438,7 +442,7 @@ public class JoinCompiler {
}
}
- public static class JoinSpec {
+ public class JoinSpec {
private final JoinType type;
private final List<EqualParseNode> onConditions;
private final JoinTable joinTable;
@@ -453,7 +457,7 @@ public class JoinCompiler {
this.joinTable = joinTable;
this.singleValueOnly = singleValueOnly;
this.dependencies = new HashSet<TableRef>();
- this.onNodeVisitor = new OnNodeVisitor(resolver, onConditions, dependencies, joinTable);
+ this.onNodeVisitor = new OnNodeVisitor(resolver, onConditions, dependencies, joinTable, statement.getConnection());
if (onNode != null) {
onNode.accept(this.onNodeVisitor);
}
@@ -711,13 +715,14 @@ public class JoinCompiler {
}
for (ColumnRef columnRef : columnRefs.keySet()) {
if (columnRef.getTableRef().equals(tableRef)
- && !SchemaUtil.isPKColumn(columnRef.getColumn())) {
+ && !SchemaUtil.isPKColumn(columnRef.getColumn())
+ && !(columnRef instanceof LocalIndexColumnRef)) {
scan.addColumn(columnRef.getColumn().getFamilyName().getBytes(), columnRef.getColumn().getName().getBytes());
}
}
}
- public ProjectedPTableWrapper createProjectedTable(boolean retainPKColumns) throws SQLException {
+ public ProjectedPTableWrapper createProjectedTable(boolean retainPKColumns, StatementContext context) throws SQLException {
assert(!isSubselect());
List<PColumn> projectedColumns = new ArrayList<PColumn>();
List<Expression> sourceExpressions = new ArrayList<Expression>();
@@ -727,14 +732,14 @@ public class JoinCompiler {
if (retainPKColumns) {
for (PColumn column : table.getPKColumns()) {
addProjectedColumn(projectedColumns, sourceExpressions, columnNameMap,
- column, column.getFamilyName(), hasSaltingColumn);
+ column, column.getFamilyName(), hasSaltingColumn, false, context);
}
}
if (isWildCardSelect()) {
for (PColumn column : table.getColumns()) {
if (!retainPKColumns || !SchemaUtil.isPKColumn(column)) {
addProjectedColumn(projectedColumns, sourceExpressions, columnNameMap,
- column, PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), hasSaltingColumn);
+ column, PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), hasSaltingColumn, false, context);
}
}
} else {
@@ -745,7 +750,8 @@ public class JoinCompiler {
&& (!retainPKColumns || !SchemaUtil.isPKColumn(columnRef.getColumn()))) {
PColumn column = columnRef.getColumn();
addProjectedColumn(projectedColumns, sourceExpressions, columnNameMap,
- column, PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), hasSaltingColumn);
+ column, PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), hasSaltingColumn,
+ columnRef instanceof LocalIndexColumnRef, context);
}
}
}
@@ -758,7 +764,8 @@ public class JoinCompiler {
}
private void addProjectedColumn(List<PColumn> projectedColumns, List<Expression> sourceExpressions,
- ListMultimap<String, String> columnNameMap, PColumn sourceColumn, PName familyName, boolean hasSaltingColumn)
+ ListMultimap<String, String> columnNameMap, PColumn sourceColumn, PName familyName, boolean hasSaltingColumn,
+ boolean isLocalIndexColumnRef, StatementContext context)
throws SQLException {
if (sourceColumn == SALTING_COLUMN)
return;
@@ -767,7 +774,7 @@ public class JoinCompiler {
PTable table = tableRef.getTable();
String schemaName = table.getSchemaName().getString();
String tableName = table.getTableName().getString();
- String colName = sourceColumn.getName().getString();
+ String colName = isLocalIndexColumnRef ? IndexUtil.getIndexColumnName(sourceColumn) : sourceColumn.getName().getString();
String fullName = getProjectedColumnName(schemaName, tableName, colName);
String aliasedName = tableRef.getTableAlias() == null ? fullName : getProjectedColumnName(null, tableRef.getTableAlias(), colName);
@@ -780,7 +787,9 @@ public class JoinCompiler {
PColumnImpl column = new PColumnImpl(name, familyName, sourceColumn.getDataType(),
sourceColumn.getMaxLength(), sourceColumn.getScale(), sourceColumn.isNullable(),
position, sourceColumn.getSortOrder(), sourceColumn.getArraySize(), sourceColumn.getViewConstant(), sourceColumn.isViewReferenced());
- Expression sourceExpression = new ColumnRef(tableRef, sourceColumn.getPosition()).newColumnExpression();
+ Expression sourceExpression = isLocalIndexColumnRef ?
+ NODE_FACTORY.column(TableName.create(schemaName, tableName), "\"" + colName + "\"", null).accept(new ExpressionCompiler(context))
+ : new ColumnRef(tableRef, sourceColumn.getPosition()).newColumnExpression();
projectedColumns.add(column);
sourceExpressions.add(sourceExpression);
}
@@ -818,13 +827,13 @@ public class JoinCompiler {
public WhereNodeVisitor(ColumnResolver resolver, Table table,
List<ParseNode> postFilters, List<TableRef> selfTableRefs, boolean isPrefilterAccepted,
- List<JoinSpec> prefilterAcceptedTables) {
+ List<JoinSpec> prefilterAcceptedTables, PhoenixConnection connection) {
this.table = table;
this.postFilters = postFilters;
this.selfTableRefs = selfTableRefs;
this.isPrefilterAccepted = isPrefilterAccepted;
this.prefilterAcceptedTables = prefilterAcceptedTables;
- this.columnRefVisitor = new ColumnRefParseNodeVisitor(resolver);
+ this.columnRefVisitor = new ColumnRefParseNodeVisitor(resolver, connection);
}
@Override
@@ -919,11 +928,11 @@ public class JoinCompiler {
private ColumnRefParseNodeVisitor columnRefVisitor;
public OnNodeVisitor(ColumnResolver resolver, List<EqualParseNode> onConditions,
- Set<TableRef> dependencies, JoinTable joinTable) {
+ Set<TableRef> dependencies, JoinTable joinTable, PhoenixConnection connection) {
this.onConditions = onConditions;
this.dependencies = dependencies;
this.joinTable = joinTable;
- this.columnRefVisitor = new ColumnRefParseNodeVisitor(resolver);
+ this.columnRefVisitor = new ColumnRefParseNodeVisitor(resolver, connection);
}
@Override
protected boolean enterBooleanNode(ParseNode node) throws SQLException {
@@ -1005,18 +1014,35 @@ public class JoinCompiler {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.AMBIGUOUS_JOIN_CONDITION).build().buildException();
}
}
+
+ private static class LocalIndexColumnRef extends ColumnRef {
+ private final TableRef indexTableRef;
+
+ public LocalIndexColumnRef(TableRef tableRef, String familyName,
+ String columnName, TableRef indexTableRef) throws MetaDataEntityNotFoundException {
+ super(tableRef, familyName, columnName);
+ this.indexTableRef = indexTableRef;
+ }
+
+ @Override
+ public TableRef getTableRef() {
+ return indexTableRef;
+ }
+ }
private static class ColumnRefParseNodeVisitor extends StatelessTraverseAllParseNodeVisitor {
public enum ColumnRefType {NONE, SELF_ONLY, FOREIGN_ONLY, COMPLEX};
- private ColumnResolver resolver;
+ private final ColumnResolver resolver;
+ private final PhoenixConnection connection;
private final Set<TableRef> tableRefSet;
private final Map<ColumnRef, ColumnParseNode> columnRefMap;
- public ColumnRefParseNodeVisitor(ColumnResolver resolver) {
+ public ColumnRefParseNodeVisitor(ColumnResolver resolver, PhoenixConnection connection) {
this.resolver = resolver;
this.tableRefSet = new HashSet<TableRef>();
this.columnRefMap = new HashMap<ColumnRef, ColumnParseNode>();
+ this.connection = connection;
}
public void reset() {
@@ -1026,7 +1052,27 @@ public class JoinCompiler {
@Override
public Void visit(ColumnParseNode node) throws SQLException {
- ColumnRef columnRef = resolver.resolveColumn(node.getSchemaName(), node.getTableName(), node.getName());
+ ColumnRef columnRef = null;
+ try {
+ columnRef = resolver.resolveColumn(node.getSchemaName(), node.getTableName(), node.getName());
+ } catch (ColumnNotFoundException e) {
+ // This could be a LocalIndexDataColumnRef. If so, the table name must have
+ // been appended by the IndexStatementRewriter, and we can convert it into.
+ TableRef tableRef = resolver.resolveTable(node.getSchemaName(), node.getTableName());
+ if (tableRef.getTable().getIndexType() == IndexType.LOCAL) {
+ TableRef parentTableRef = FromCompiler.getResolver(
+ NODE_FACTORY.namedTable(null, TableName.create(tableRef.getTable()
+ .getSchemaName().getString(), tableRef.getTable()
+ .getParentTableName().getString())), connection).resolveTable(
+ tableRef.getTable().getSchemaName().getString(),
+ tableRef.getTable().getParentTableName().getString());
+ columnRef = new LocalIndexColumnRef(parentTableRef,
+ IndexUtil.getDataColumnFamilyName(node.getName()),
+ IndexUtil.getDataColumnName(node.getName()), tableRef);
+ } else {
+ throw e;
+ }
+ }
columnRefMap.put(columnRef, node);
tableRefSet.add(columnRef.getTableRef());
return null;
@@ -1092,9 +1138,9 @@ public class JoinCompiler {
return NODE_FACTORY.and(nodes);
}
- private static List<AliasedNode> extractFromSelect(List<AliasedNode> select, TableRef tableRef, ColumnResolver resolver) throws SQLException {
+ private List<AliasedNode> extractFromSelect(List<AliasedNode> select, TableRef tableRef, ColumnResolver resolver) throws SQLException {
List<AliasedNode> ret = new ArrayList<AliasedNode>();
- ColumnRefParseNodeVisitor visitor = new ColumnRefParseNodeVisitor(resolver);
+ ColumnRefParseNodeVisitor visitor = new ColumnRefParseNodeVisitor(resolver, statement.getConnection());
for (AliasedNode aliasedNode : select) {
ParseNode node = aliasedNode.getNode();
if (node instanceof TableWildcardParseNode) {
@@ -1145,7 +1191,7 @@ public class JoinCompiler {
TableRef groupByTableRef = null;
TableRef orderByTableRef = null;
if (select.getGroupBy() != null && !select.getGroupBy().isEmpty()) {
- ColumnRefParseNodeVisitor groupByVisitor = new ColumnRefParseNodeVisitor(resolver);
+ ColumnRefParseNodeVisitor groupByVisitor = new ColumnRefParseNodeVisitor(resolver, statement.getConnection());
for (ParseNode node : select.getGroupBy()) {
node.accept(groupByVisitor);
}
@@ -1154,7 +1200,7 @@ public class JoinCompiler {
groupByTableRef = set.iterator().next();
}
} else if (select.getOrderBy() != null && !select.getOrderBy().isEmpty()) {
- ColumnRefParseNodeVisitor orderByVisitor = new ColumnRefParseNodeVisitor(resolver);
+ ColumnRefParseNodeVisitor orderByVisitor = new ColumnRefParseNodeVisitor(resolver, statement.getConnection());
for (OrderByNode node : select.getOrderBy()) {
node.getNode().accept(orderByVisitor);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f9ca8816/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 3f98ddc..014e73a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -191,9 +191,9 @@ public class QueryCompiler {
Table table = joinTable.getTable();
SelectStatement subquery = table.getAsSubquery(orderBy);
if (!table.isSubselect()) {
- ProjectedPTableWrapper projectedTable = table.createProjectedTable(!projectPKColumns);
- TupleProjector.serializeProjectorIntoScan(context.getScan(), projectedTable.createTupleProjector());
context.setCurrentTable(table.getTableRef());
+ ProjectedPTableWrapper projectedTable = table.createProjectedTable(!projectPKColumns, context);
+ TupleProjector.serializeProjectorIntoScan(context.getScan(), projectedTable.createTupleProjector());
context.setResolver(projectedTable.createColumnResolver());
table.projectColumns(context.getScan());
return compileSingleQuery(context, subquery, binds, asSubquery, !asSubquery);
@@ -211,7 +211,8 @@ public class QueryCompiler {
TableRef tableRef;
SelectStatement query;
if (!table.isSubselect()) {
- initialProjectedTable = table.createProjectedTable(!projectPKColumns);
+ context.setCurrentTable(table.getTableRef());
+ initialProjectedTable = table.createProjectedTable(!projectPKColumns, context);
tableRef = table.getTableRef();
table.projectColumns(context.getScan());
query = joinTable.getAsSingleSubquery(table.getAsSubquery(orderBy), asSubquery);
@@ -300,7 +301,8 @@ public class QueryCompiler {
TableRef rhsTableRef;
SelectStatement rhs;
if (!rhsTable.isSubselect()) {
- rhsProjTable = rhsTable.createProjectedTable(!projectPKColumns);
+ context.setCurrentTable(rhsTable.getTableRef());
+ rhsProjTable = rhsTable.createProjectedTable(!projectPKColumns, context);
rhsTableRef = rhsTable.getTableRef();
rhsTable.projectColumns(context.getScan());
rhs = rhsJoinTable.getAsSingleSubquery(rhsTable.getAsSubquery(orderBy), asSubquery);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f9ca8816/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 68fa3d1..2e179a5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -18,23 +18,43 @@
package org.apache.phoenix.coprocessor;
import java.io.IOException;
+import java.util.List;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.execute.TupleProjector;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.KeyValueColumnExpression;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.KeyValueSchema;
import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
+import org.apache.phoenix.schema.ValueBitSet;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
import org.apache.phoenix.trace.util.Tracing;
+import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.ServerUtil;
import org.cloudera.htrace.Span;
+import com.google.common.collect.ImmutableList;
+
abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
@@ -163,4 +183,175 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
return null; // impossible
}
}
+
+ /**
+ * Return wrapped scanner that catches unexpected exceptions (i.e. Phoenix bugs) and
+ * re-throws as DoNotRetryIOException to prevent needless retrying hanging the query
+ * for 30 seconds. Unfortunately, until HBASE-7481 gets fixed, there's no way to do
+ * the same from a custom filter.
+ * @param offset starting position in the rowkey.
+ * @param scan
+ * @param tupleProjector
+ * @param dataRegion
+ * @param indexMaintainer
+ * @param viewConstants
+ */
+ protected RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment> c,
+ final RegionScanner s, final int offset, final Scan scan,
+ final ColumnReference[] dataColumns, final TupleProjector tupleProjector,
+ final HRegion dataRegion, final IndexMaintainer indexMaintainer,
+ final byte[][] viewConstants, final ImmutableBytesWritable ptr) {
+ return getWrappedScanner(c, s, null, null, offset, scan, dataColumns, tupleProjector,
+ dataRegion, indexMaintainer, viewConstants, null, null, ptr);
+ }
+
+ /**
+ * Return wrapped scanner that catches unexpected exceptions (i.e. Phoenix bugs) and
+ * re-throws as DoNotRetryIOException to prevent needless retrying hanging the query
+ * for 30 seconds. Unfortunately, until HBASE-7481 gets fixed, there's no way to do
+ * the same from a custom filter.
+ * @param arrayFuncRefs
+ * @param arrayKVRefs
+ * @param offset starting position in the rowkey.
+ * @param scan
+ * @param tupleProjector
+ * @param dataRegion
+ * @param indexMaintainer
+ * @param viewConstants
+ */
+ protected RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment> c,
+ final RegionScanner s, final Set<KeyValueColumnExpression> arrayKVRefs,
+ final Expression[] arrayFuncRefs, final int offset, final Scan scan,
+ final ColumnReference[] dataColumns, final TupleProjector tupleProjector,
+ final HRegion dataRegion, final IndexMaintainer indexMaintainer,
+ final byte[][] viewConstants, final KeyValueSchema kvSchema,
+ final ValueBitSet kvSchemaBitSet, final ImmutableBytesWritable ptr) {
+ return new RegionScanner() {
+
+ @Override
+ public boolean next(List<Cell> results) throws IOException {
+ try {
+ return s.next(results);
+ } catch (Throwable t) {
+ ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t);
+ return false; // impossible
+ }
+ }
+
+ @Override
+ public boolean next(List<Cell> result, int limit) throws IOException {
+ try {
+ return s.next(result, limit);
+ } catch (Throwable t) {
+ ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t);
+ return false; // impossible
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ s.close();
+ }
+
+ @Override
+ public HRegionInfo getRegionInfo() {
+ return s.getRegionInfo();
+ }
+
+ @Override
+ public boolean isFilterDone() throws IOException {
+ return s.isFilterDone();
+ }
+
+ @Override
+ public boolean reseek(byte[] row) throws IOException {
+ return s.reseek(row);
+ }
+
+ @Override
+ public long getMvccReadPoint() {
+ return s.getMvccReadPoint();
+ }
+
+ @Override
+ public boolean nextRaw(List<Cell> result) throws IOException {
+ try {
+ boolean next = s.nextRaw(result);
+ if (result.size() == 0) {
+ return next;
+ }
+ if (arrayFuncRefs != null && arrayFuncRefs.length > 0 && arrayKVRefs.size() > 0) {
+ replaceArrayIndexElement(arrayKVRefs, arrayFuncRefs, result);
+ }
+ if (ScanUtil.isLocalIndex(scan) && !ScanUtil.isAnalyzeTable(scan)) {
+ IndexUtil.wrapResultUsingOffset(result, offset, dataColumns, tupleProjector, dataRegion, indexMaintainer, viewConstants, ptr);
+ }
+ // There is a scanattribute set to retrieve the specific array element
+ return next;
+ } catch (Throwable t) {
+ ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t);
+ return false; // impossible
+ }
+ }
+
+ @Override
+ public boolean nextRaw(List<Cell> result, int limit) throws IOException {
+ try {
+ boolean next = s.nextRaw(result, limit);
+ if (result.size() == 0) {
+ return next;
+ }
+ if (arrayFuncRefs != null && arrayFuncRefs.length > 0 && arrayKVRefs.size() > 0) {
+ replaceArrayIndexElement(arrayKVRefs, arrayFuncRefs, result);
+ }
+ if ((offset > 0 || ScanUtil.isLocalIndex(scan)) && !ScanUtil.isAnalyzeTable(scan)) {
+ IndexUtil.wrapResultUsingOffset(result, offset, dataColumns, tupleProjector, dataRegion, indexMaintainer, viewConstants, ptr);
+ }
+ // There is a scanattribute set to retrieve the specific array element
+ return next;
+ } catch (Throwable t) {
+ ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t);
+ return false; // impossible
+ }
+ }
+
+ private void replaceArrayIndexElement(final Set<KeyValueColumnExpression> arrayKVRefs,
+ final Expression[] arrayFuncRefs, List<Cell> result) {
+ // make a copy of the results array here, as we're modifying it below
+ MultiKeyValueTuple tuple = new MultiKeyValueTuple(ImmutableList.copyOf(result));
+ // The size of both the arrays would be same?
+ // Using KeyValueSchema to set and retrieve the value
+ // collect the first kv to get the row
+ Cell rowKv = result.get(0);
+ for (KeyValueColumnExpression kvExp : arrayKVRefs) {
+ if (kvExp.evaluate(tuple, ptr)) {
+ for (int idx = tuple.size() - 1; idx >= 0; idx--) {
+ Cell kv = tuple.getValue(idx);
+ if (Bytes.equals(kvExp.getColumnFamily(), 0, kvExp.getColumnFamily().length,
+ kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength())
+ && Bytes.equals(kvExp.getColumnName(), 0, kvExp.getColumnName().length,
+ kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength())) {
+ // remove the kv that has the full array values.
+ result.remove(idx);
+ break;
+ }
+ }
+ }
+ }
+ byte[] value = kvSchema.toBytes(tuple, arrayFuncRefs,
+ kvSchemaBitSet, ptr);
+ // Add a dummy kv with the exact value of the array index
+ result.add(new KeyValue(rowKv.getRowArray(), rowKv.getRowOffset(), rowKv.getRowLength(),
+ QueryConstants.ARRAY_VALUE_COLUMN_FAMILY, 0, QueryConstants.ARRAY_VALUE_COLUMN_FAMILY.length,
+ QueryConstants.ARRAY_VALUE_COLUMN_QUALIFIER, 0,
+ QueryConstants.ARRAY_VALUE_COLUMN_QUALIFIER.length, HConstants.LATEST_TIMESTAMP,
+ Type.codeToType(rowKv.getTypeByte()), value, 0, value.length));
+ }
+
+ @Override
+ public long getMaxResultSize() {
+ return s.getMaxResultSize();
+ }
+ };
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f9ca8816/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
index ca21742..8b59b85 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
@@ -63,9 +63,9 @@ import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.join.HashJoinInfo;
import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.util.Closeables;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.KeyValueUtil;
@@ -123,23 +123,10 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
.getAttribute(BaseScannerRegionObserver.AGGREGATORS), c
.getEnvironment().getConfiguration());
- final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan);
- final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
- long limit = Long.MAX_VALUE;
- byte[] limitBytes = scan.getAttribute(GROUP_BY_LIMIT);
- if (limitBytes != null) {
- limit = PInteger.INSTANCE.getCodec().decodeInt(limitBytes, 0, SortOrder.getDefault());
- }
-
RegionScanner innerScanner = s;
- if (p != null || j != null) {
- innerScanner =
- new HashJoinRegionScanner(s, p, j, ScanUtil.getTenantId(scan),
- c.getEnvironment());
- }
+
byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD);
List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes);
- boolean localIndexScan = ScanUtil.isLocalIndex(scan);
TupleProjector tupleProjector = null;
HRegion dataRegion = null;
byte[][] viewConstants = null;
@@ -150,17 +137,30 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
dataRegion = IndexUtil.getDataRegion(c.getEnvironment());
viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan);
}
+ ImmutableBytesWritable tempPtr = new ImmutableBytesWritable();
+ innerScanner =
+ getWrappedScanner(c, innerScanner, offset, scan, dataColumns, tupleProjector,
+ dataRegion, indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, tempPtr);
}
+ final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan);
+ final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
+ if (p != null || j != null) {
+ innerScanner =
+ new HashJoinRegionScanner(innerScanner, p, j, ScanUtil.getTenantId(scan),
+ c.getEnvironment());
+ }
+
+ long limit = Long.MAX_VALUE;
+ byte[] limitBytes = scan.getAttribute(GROUP_BY_LIMIT);
+ if (limitBytes != null) {
+ limit = PInteger.INSTANCE.getCodec().decodeInt(limitBytes, 0, SortOrder.getDefault());
+ }
if (keyOrdered) { // Optimize by taking advantage that the rows are
// already in the required group by key order
- return scanOrdered(c, scan, innerScanner, expressions, aggregators, limit, offset,
- localIndexScan, dataColumns, tupleProjector, indexMaintainers, dataRegion,
- viewConstants);
+ return scanOrdered(c, scan, innerScanner, expressions, aggregators, limit);
} else { // Otherwse, collect them all up in an in memory map
- return scanUnordered(c, scan, innerScanner, expressions, aggregators, limit, offset,
- localIndexScan, dataColumns, tupleProjector, indexMaintainers, dataRegion,
- viewConstants);
+ return scanUnordered(c, scan, innerScanner, expressions, aggregators, limit);
}
}
@@ -376,10 +376,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
*/
private RegionScanner scanUnordered(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan,
final RegionScanner s, final List<Expression> expressions,
- final ServerAggregators aggregators, long limit, int offset, boolean localIndexScan,
- ColumnReference[] dataColumns, TupleProjector tupleProjector,
- List<IndexMaintainer> indexMaintainers, HRegion dataRegion, byte[][] viewConstants)
- throws IOException {
+ final ServerAggregators aggregators, long limit) throws IOException {
if (logger.isDebugEnabled()) {
logger.debug(LogUtil.addCustomAnnotations("Grouped aggregation over unordered rows with scan " + scan
+ ", group by " + expressions + ", aggregators " + aggregators, ScanUtil.getCustomAnnotations(scan)));
@@ -401,7 +398,6 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
GroupByCacheFactory.INSTANCE.newCache(
env, ScanUtil.getTenantId(scan), ScanUtil.getCustomAnnotations(scan),
aggregators, estDistVals);
- ImmutableBytesWritable tempPtr = new ImmutableBytesWritable();
boolean success = false;
try {
boolean hasMore;
@@ -423,11 +419,6 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
// ones returned
hasMore = s.nextRaw(results);
if (!results.isEmpty()) {
- if (localIndexScan) {
- IndexUtil.wrapResultUsingOffset(results, offset, dataColumns, tupleProjector,
- dataRegion, indexMaintainers == null ? null : indexMaintainers.get(0),
- viewConstants, tempPtr);
- }
result.setKeyValues(results);
ImmutableBytesWritable key =
TupleUtil.getConcatenatedValue(result, expressions);
@@ -463,16 +454,12 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
*/
private RegionScanner scanOrdered(final ObserverContext<RegionCoprocessorEnvironment> c,
final Scan scan, final RegionScanner s, final List<Expression> expressions,
- final ServerAggregators aggregators, final long limit, final int offset,
- final boolean localIndexScan, final ColumnReference[] dataColumns,
- final TupleProjector tupleProjector, final List<IndexMaintainer> indexMaintainers,
- final HRegion dataRegion, final byte[][] viewConstants) throws IOException {
+ final ServerAggregators aggregators, final long limit) throws IOException {
if (logger.isDebugEnabled()) {
logger.debug(LogUtil.addCustomAnnotations("Grouped aggregation over ordered rows with scan " + scan + ", group by "
+ expressions + ", aggregators " + aggregators, ScanUtil.getCustomAnnotations(scan)));
}
- final ImmutableBytesWritable tempPtr = new ImmutableBytesWritable();
return new BaseRegionScanner() {
private long rowCount = 0;
private ImmutableBytesWritable currentKey = null;
@@ -510,11 +497,6 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
// ones returned
hasMore = s.nextRaw(kvs);
if (!kvs.isEmpty()) {
- if (localIndexScan) {
- IndexUtil.wrapResultUsingOffset(kvs, offset, dataColumns, tupleProjector,
- dataRegion, indexMaintainers == null ? null : indexMaintainers.get(0),
- viewConstants, tempPtr);
- }
result.setKeyValues(kvs);
key = TupleUtil.getConcatenatedValue(result, expressions);
aggBoundary = currentKey != null && currentKey.compareTo(key) != 0;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f9ca8816/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
index 1672fd7..f0ae4a2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
@@ -28,17 +28,13 @@ import java.util.Set;
import com.google.common.collect.Sets;
import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.WritableUtils;
import org.apache.phoenix.cache.GlobalCache;
import org.apache.phoenix.cache.TenantCache;
@@ -54,17 +50,14 @@ import org.apache.phoenix.iterate.RegionScannerResultIterator;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.join.HashJoinInfo;
import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
-import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.KeyValueSchema;
import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
import org.apache.phoenix.schema.ValueBitSet;
-import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.ServerUtil;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@@ -187,14 +180,7 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
ScanUtil.setRowKeyOffset(scan, offset);
}
- final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan);
- final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
- final ImmutableBytesWritable tenantId = ScanUtil.getTenantId(scan);
-
RegionScanner innerScanner = s;
- if (p != null || j != null) {
- innerScanner = new HashJoinRegionScanner(s, p, j, tenantId, c.getEnvironment());
- }
Set<KeyValueColumnExpression> arrayKVRefs = Sets.newHashSet();
Expression[] arrayFuncRefs = deserializeArrayPostionalExpressionInfoFromScan(
@@ -214,7 +200,16 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
}
innerScanner =
getWrappedScanner(c, innerScanner, arrayKVRefs, arrayFuncRefs, offset, scan,
- dataColumns, tupleProjector, dataRegion, indexMaintainer, viewConstants);
+ dataColumns, tupleProjector, dataRegion, indexMaintainer, viewConstants,
+ kvSchema, kvSchemaBitSet, ptr);
+
+ final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan);
+ final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
+ final ImmutableBytesWritable tenantId = ScanUtil.getTenantId(scan);
+ if (p != null || j != null) {
+ innerScanner = new HashJoinRegionScanner(innerScanner, p, j, tenantId, c.getEnvironment());
+ }
+
final OrderedResultIterator iterator = deserializeFromScan(scan,innerScanner);
if (iterator == null) {
return innerScanner;
@@ -296,155 +291,6 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
};
}
- /**
- * Return wrapped scanner that catches unexpected exceptions (i.e. Phoenix bugs) and
- * re-throws as DoNotRetryIOException to prevent needless retrying hanging the query
- * for 30 seconds. Unfortunately, until HBASE-7481 gets fixed, there's no way to do
- * the same from a custom filter.
- * @param arrayFuncRefs
- * @param arrayKVRefs
- * @param offset starting position in the rowkey.
- * @param scan
- * @param tupleProjector
- * @param dataRegion
- * @param indexMaintainer
- * @param viewConstants
- */
- private RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment> c,
- final RegionScanner s, final Set<KeyValueColumnExpression> arrayKVRefs,
- final Expression[] arrayFuncRefs, final int offset, final Scan scan,
- final ColumnReference[] dataColumns, final TupleProjector tupleProjector,
- final HRegion dataRegion, final IndexMaintainer indexMaintainer,
- final byte[][] viewConstants) {
- return new RegionScanner() {
-
- @Override
- public boolean next(List<Cell> results) throws IOException {
- try {
- return s.next(results);
- } catch (Throwable t) {
- ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t);
- return false; // impossible
- }
- }
-
- @Override
- public boolean next(List<Cell> result, int limit) throws IOException {
- try {
- return s.next(result, limit);
- } catch (Throwable t) {
- ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t);
- return false; // impossible
- }
- }
-
- @Override
- public void close() throws IOException {
- s.close();
- }
-
- @Override
- public HRegionInfo getRegionInfo() {
- return s.getRegionInfo();
- }
-
- @Override
- public boolean isFilterDone() throws IOException {
- return s.isFilterDone();
- }
-
- @Override
- public boolean reseek(byte[] row) throws IOException {
- return s.reseek(row);
- }
-
- @Override
- public long getMvccReadPoint() {
- return s.getMvccReadPoint();
- }
-
- @Override
- public boolean nextRaw(List<Cell> result) throws IOException {
- try {
- boolean next = s.nextRaw(result);
- if (result.size() == 0) {
- return next;
- }
- if (arrayFuncRefs != null && arrayFuncRefs.length > 0 && arrayKVRefs.size() > 0) {
- replaceArrayIndexElement(arrayKVRefs, arrayFuncRefs, result);
- }
- if (ScanUtil.isLocalIndex(scan)) {
- IndexUtil.wrapResultUsingOffset(result, offset, dataColumns, tupleProjector, dataRegion, indexMaintainer, viewConstants, ptr);
- }
- // There is a scanattribute set to retrieve the specific array element
- return next;
- } catch (Throwable t) {
- ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t);
- return false; // impossible
- }
- }
-
- @Override
- public boolean nextRaw(List<Cell> result, int limit) throws IOException {
- try {
- boolean next = s.nextRaw(result, limit);
- if (result.size() == 0) {
- return next;
- }
- if (arrayFuncRefs != null && arrayFuncRefs.length > 0 && arrayKVRefs.size() > 0) {
- replaceArrayIndexElement(arrayKVRefs, arrayFuncRefs, result);
- }
- if (offset > 0 || ScanUtil.isLocalIndex(scan)) {
- IndexUtil.wrapResultUsingOffset(result, offset, dataColumns, tupleProjector, dataRegion, indexMaintainer, viewConstants, ptr);
- }
- // There is a scanattribute set to retrieve the specific array element
- return next;
- } catch (Throwable t) {
- ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t);
- return false; // impossible
- }
- }
-
- private void replaceArrayIndexElement(final Set<KeyValueColumnExpression> arrayKVRefs,
- final Expression[] arrayFuncRefs, List<Cell> result) {
- // make a copy of the results array here, as we're modifying it below
- MultiKeyValueTuple tuple = new MultiKeyValueTuple(ImmutableList.copyOf(result));
- // The size of both the arrays would be same?
- // Using KeyValueSchema to set and retrieve the value
- // collect the first kv to get the row
- Cell rowKv = result.get(0);
- for (KeyValueColumnExpression kvExp : arrayKVRefs) {
- if (kvExp.evaluate(tuple, ptr)) {
- for (int idx = tuple.size() - 1; idx >= 0; idx--) {
- Cell kv = tuple.getValue(idx);
- if (Bytes.equals(kvExp.getColumnFamily(), 0, kvExp.getColumnFamily().length,
- kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength())
- && Bytes.equals(kvExp.getColumnName(), 0, kvExp.getColumnName().length,
- kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength())) {
- // remove the kv that has the full array values.
- result.remove(idx);
- break;
- }
- }
- }
- }
- byte[] value = kvSchema.toBytes(tuple, arrayFuncRefs,
- kvSchemaBitSet, ptr);
- // Add a dummy kv with the exact value of the array index
- result.add(new KeyValue(rowKv.getRowArray(), rowKv.getRowOffset(), rowKv.getRowLength(),
- QueryConstants.ARRAY_VALUE_COLUMN_FAMILY, 0, QueryConstants.ARRAY_VALUE_COLUMN_FAMILY.length,
- QueryConstants.ARRAY_VALUE_COLUMN_QUALIFIER, 0,
- QueryConstants.ARRAY_VALUE_COLUMN_QUALIFIER.length, HConstants.LATEST_TIMESTAMP,
- Type.codeToType(rowKv.getTypeByte()), value, 0, value.length));
- }
-
- @Override
- public long getMaxResultSize() {
- return s.getMaxResultSize();
- }
- };
- }
-
@Override
protected boolean isRegionObserverFor(Scan scan) {
return scan.getAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY) != null;