You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by an...@apache.org on 2017/09/09 08:02:06 UTC
[2/2] phoenix git commit: PHOENIX-4010 Hash Join cache may not be
send to all regionservers when we have stale HBase meta cache
PHOENIX-4010 Hash Join cache may not be send to all regionservers when we have stale HBase meta cache
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/7865a59b
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/7865a59b
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/7865a59b
Branch: refs/heads/master
Commit: 7865a59b0bbc4ea5e8ab775b920f5f608115707b
Parents: 44c0034
Author: Ankit Singhal <an...@gmail.com>
Authored: Sat Sep 9 13:31:55 2017 +0530
Committer: Ankit Singhal <an...@gmail.com>
Committed: Sat Sep 9 13:31:55 2017 +0530
----------------------------------------------------------------------
.../org/apache/phoenix/end2end/BaseJoinIT.java | 14 +
.../apache/phoenix/end2end/HashJoinCacheIT.java | 452 +++++++++++++++++++
.../org/apache/phoenix/end2end/HashJoinIT.java | 122 +++--
.../end2end/index/PartialIndexRebuilderIT.java | 23 +-
.../DelayedTableResultIteratorFactory.java | 12 +-
.../apache/phoenix/cache/ServerCacheClient.java | 338 +++++++++-----
.../coprocessor/BaseScannerRegionObserver.java | 1 +
.../HashJoinCacheNotFoundException.java | 45 ++
.../coprocessor/HashJoinRegionScanner.java | 11 +-
.../phoenix/exception/SQLExceptionCode.java | 3 +-
.../apache/phoenix/execute/AggregatePlan.java | 10 +-
.../apache/phoenix/execute/BaseQueryPlan.java | 27 +-
.../apache/phoenix/execute/HashJoinPlan.java | 16 +-
.../execute/LiteralResultIterationPlan.java | 9 +-
.../org/apache/phoenix/execute/ScanPlan.java | 11 +-
.../phoenix/iterate/BaseResultIterators.java | 51 ++-
.../DefaultTableResultIteratorFactory.java | 7 +-
.../phoenix/iterate/ParallelIterators.java | 14 +-
.../apache/phoenix/iterate/SerialIterators.java | 15 +-
.../phoenix/iterate/TableResultIterator.java | 56 ++-
.../iterate/TableResultIteratorFactory.java | 6 +-
.../apache/phoenix/join/HashCacheClient.java | 20 +-
.../apache/phoenix/query/QueryConstants.java | 2 +
.../java/org/apache/phoenix/util/ByteUtil.java | 15 +
.../org/apache/phoenix/util/ServerUtil.java | 7 +
.../java/org/apache/phoenix/query/BaseTest.java | 1 +
.../query/ParallelIteratorsSplitTest.java | 2 +-
.../java/org/apache/phoenix/util/TestUtil.java | 28 ++
28 files changed, 1033 insertions(+), 285 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7865a59b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseJoinIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseJoinIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseJoinIT.java
index 152bdf0..a823a72 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseJoinIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseJoinIT.java
@@ -17,6 +17,7 @@
*/
package org.apache.phoenix.end2end;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -28,16 +29,23 @@ import java.sql.SQLException;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.Map;
+import java.util.Properties;
import java.util.regex.Pattern;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.phoenix.cache.ServerCacheClient;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.StringUtil;
import org.junit.Before;
+import org.junit.BeforeClass;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
public abstract class BaseJoinIT extends ParallelStatsDisabledIT {
+
protected static final String JOIN_SCHEMA = "Join";
protected static final String JOIN_ORDER_TABLE = "OrderTable";
protected static final String JOIN_CUSTOMER_TABLE = "CustomerTable";
@@ -442,6 +450,12 @@ public abstract class BaseJoinIT extends ParallelStatsDisabledIT {
conn.commit();
}
+ protected Connection getConnection() throws SQLException {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.put(ServerCacheClient.HASH_JOIN_SERVER_CACHE_RESEND_PER_SERVER, "true");
+ return DriverManager.getConnection(getUrl(), props);
+ }
+
protected void createIndexes(Connection conn, String virtualName, String realName) throws Exception {
if (indexDDL != null && indexDDL.length > 0) {
for (String ddl : indexDDL) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7865a59b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java
new file mode 100644
index 0000000..76f45e2
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.phoenix.cache.GlobalCache;
+import org.apache.phoenix.cache.TenantCache;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.join.HashJoinInfo;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.Lists;
+
+@RunWith(Parameterized.class)
+public class HashJoinCacheIT extends HashJoinIT {
+
+ public HashJoinCacheIT(String[] indexDDL, String[] plans) throws Exception {
+ super(indexDDL, plans);
+ }
+
+ protected String getTableName(Connection conn, String virtualName) throws Exception {
+ String realName = super.getTableName(conn, virtualName);
+ TestUtil.addCoprocessor(conn, SchemaUtil.normalizeFullTableName(realName), InvalidateHashCache.class);
+ return realName;
+ }
+
+ @Parameters
+ public static Collection<Object> data() {
+ List<Object> testCases = Lists.newArrayList();
+ testCases.add(new String[][] {
+ {}, {
+ /*
+ * testLeftJoinWithAggregation()
+ * SELECT i.name, sum(quantity) FROM joinOrderTable o
+ * LEFT JOIN joinItemTable i ON o.item_id = i.item_id
+ * GROUP BY i.name ORDER BY i.name
+ */
+ "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_FULL_NAME + "\n" +
+ " SERVER AGGREGATE INTO DISTINCT ROWS BY [I.NAME]\n" +
+ "CLIENT MERGE SORT\n" +
+ " PARALLEL LEFT-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME,
+ /*
+ * testLeftJoinWithAggregation()
+ * SELECT i.item_id iid, sum(quantity) q FROM joinOrderTable o
+ * LEFT JOIN joinItemTable i ON o.item_id = i.item_id
+ * GROUP BY i.item_id ORDER BY q DESC"
+ */
+ "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_FULL_NAME + "\n" +
+ " SERVER AGGREGATE INTO DISTINCT ROWS BY [\"I.item_id\"]\n" +
+ "CLIENT MERGE SORT\n" +
+ "CLIENT SORTED BY [SUM(O.QUANTITY) DESC]\n" +
+ " PARALLEL LEFT-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" +
+ " SERVER FILTER BY FIRST KEY ONLY",
+ /*
+ * testLeftJoinWithAggregation()
+ * SELECT i.item_id iid, sum(quantity) q FROM joinItemTable i
+ * LEFT JOIN joinOrderTable o ON o.item_id = i.item_id
+ * GROUP BY i.item_id ORDER BY q DESC NULLS LAST, iid
+ */
+ "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" +
+ " SERVER FILTER BY FIRST KEY ONLY\n" +
+ " SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [\"I.item_id\"]\n" +
+ "CLIENT SORTED BY [SUM(O.QUANTITY) DESC NULLS LAST, \"I.item_id\"]\n" +
+ " PARALLEL LEFT-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_FULL_NAME,
+ /*
+ * testRightJoinWithAggregation()
+ * SELECT i.name, sum(quantity) FROM joinOrderTable o
+ * RIGHT JOIN joinItemTable i ON o.item_id = i.item_id
+ * GROUP BY i.name ORDER BY i.name
+ */
+ "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" +
+ " SERVER AGGREGATE INTO DISTINCT ROWS BY [I.NAME]\n" +
+ "CLIENT MERGE SORT\n" +
+ " PARALLEL LEFT-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_FULL_NAME,
+ /*
+ * testRightJoinWithAggregation()
+ * SELECT i.item_id iid, sum(quantity) q FROM joinOrderTable o
+ * RIGHT JOIN joinItemTable i ON o.item_id = i.item_id
+ * GROUP BY i.item_id ORDER BY q DESC NULLS LAST, iid
+ */
+ "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" +
+ " SERVER FILTER BY FIRST KEY ONLY\n" +
+ " SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [\"I.item_id\"]\n" +
+ "CLIENT SORTED BY [SUM(O.QUANTITY) DESC NULLS LAST, \"I.item_id\"]\n" +
+ " PARALLEL LEFT-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_FULL_NAME,
+ /*
+ * testJoinWithWildcard()
+ * SELECT * FROM joinItemTable LEFT JOIN joinSupplierTable supp
+ * ON joinItemTable.supplier_id = supp.supplier_id
+ * ORDER BY item_id
+ */
+ "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" +
+ " PARALLEL LEFT-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_FULL_NAME,
+ /*
+ * testJoinPlanWithIndex()
+ * SELECT item.item_id, item.name, supp.supplier_id, supp.name
+ * FROM joinItemTable item LEFT JOIN joinSupplierTable supp
+ * ON substr(item.name, 2, 1) = substr(supp.name, 2, 1)
+ * AND (supp.name BETWEEN 'S1' AND 'S5')
+ * WHERE item.name BETWEEN 'T1' AND 'T5'
+ */
+ "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" +
+ " SERVER FILTER BY (NAME >= 'T1' AND NAME <= 'T5')\n" +
+ " PARALLEL LEFT-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_FULL_NAME + "\n" +
+ " SERVER FILTER BY (NAME >= 'S1' AND NAME <= 'S5')",
+ /*
+ * testJoinPlanWithIndex()
+ * SELECT item.item_id, item.name, supp.supplier_id, supp.name
+ * FROM joinItemTable item INNER JOIN joinSupplierTable supp
+ * ON item.supplier_id = supp.supplier_id
+ * WHERE (item.name = 'T1' OR item.name = 'T5')
+ * AND (supp.name = 'S1' OR supp.name = 'S5')
+ */
+ "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" +
+ " SERVER FILTER BY (NAME = 'T1' OR NAME = 'T5')\n" +
+ " PARALLEL INNER-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_FULL_NAME + "\n" +
+ " SERVER FILTER BY (NAME = 'S1' OR NAME = 'S5')",
+ /*
+ * testJoinWithSkipMergeOptimization()
+ * SELECT s.name FROM joinItemTable i
+ * JOIN joinOrderTable o ON o.item_id = i.item_id AND quantity < 5000
+ * JOIN joinSupplierTable s ON i.supplier_id = s.supplier_id
+ */
+ "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" +
+ " PARALLEL INNER-JOIN TABLE 0 (SKIP MERGE)\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_FULL_NAME + "\n" +
+ " SERVER FILTER BY QUANTITY < 5000\n" +
+ " PARALLEL INNER-JOIN TABLE 1\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_FULL_NAME + "\n" +
+ " DYNAMIC SERVER FILTER BY \"I.item_id\" IN (\"O.item_id\")",
+ /*
+ * testSelfJoin()
+ * SELECT i2.item_id, i1.name FROM joinItemTable i1
+ * JOIN joinItemTable i2 ON i1.item_id = i2.item_id
+ * ORDER BY i1.item_id
+ */
+ "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" +
+ " PARALLEL INNER-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" +
+ " SERVER FILTER BY FIRST KEY ONLY\n" +
+ " DYNAMIC SERVER FILTER BY \"I1.item_id\" IN (\"I2.item_id\")",
+ /*
+ * testSelfJoin()
+ * SELECT i1.name, i2.name FROM joinItemTable i1
+ * JOIN joinItemTable i2 ON i1.item_id = i2.supplier_id
+ * ORDER BY i1.name, i2.name
+ */
+ "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" +
+ " SERVER SORTED BY [I1.NAME, I2.NAME]\n" +
+ "CLIENT MERGE SORT\n" +
+ " PARALLEL INNER-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" +
+ " DYNAMIC SERVER FILTER BY \"I1.item_id\" IN (\"I2.supplier_id\")",
+ /*
+ * testStarJoin()
+ * SELECT order_id, c.name, i.name iname, quantity, o.date
+ * FROM joinOrderTable o
+ * JOIN joinCustomerTable c ON o.customer_id = c.customer_id
+ * JOIN joinItemTable i ON o.item_id = i.item_id
+ * ORDER BY order_id
+ */
+ "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_FULL_NAME + "\n" +
+ " PARALLEL INNER-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_CUSTOMER_TABLE_FULL_NAME + "\n" +
+ " PARALLEL INNER-JOIN TABLE 1\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME,
+ /*
+ * testStarJoin()
+ * SELECT (*NO_STAR_JOIN*) order_id, c.name, i.name iname, quantity, o.date
+ * FROM joinOrderTable o
+ * JOIN joinCustomerTable c ON o.customer_id = c.customer_id
+ * JOIN joinItemTable i ON o.item_id = i.item_id
+ * ORDER BY order_id
+ */
+ "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" +
+ " SERVER SORTED BY [\"O.order_id\"]\n" +
+ "CLIENT MERGE SORT\n" +
+ " PARALLEL INNER-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_FULL_NAME + "\n" +
+ " PARALLEL INNER-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_CUSTOMER_TABLE_FULL_NAME + "\n" +
+ " DYNAMIC SERVER FILTER BY \"I.item_id\" IN (\"O.item_id\")",
+ /*
+ * testSubJoin()
+ * SELECT * FROM joinCustomerTable c
+ * INNER JOIN (joinOrderTable o
+ * INNER JOIN (joinSupplierTable s
+ * RIGHT JOIN joinItemTable i ON i.supplier_id = s.supplier_id)
+ * ON o.item_id = i.item_id)
+ * ON c.customer_id = o.customer_id
+ * WHERE c.customer_id <= '0000000005'
+ * AND order_id != '000000000000003'
+ * AND i.name != 'T3'
+ * ORDER BY c.customer_id, i.name
+ */
+ "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + JOIN_CUSTOMER_TABLE_FULL_NAME + " [*] - ['0000000005']\n" +
+ " SERVER SORTED BY [\"C.customer_id\", I.NAME]\n" +
+ "CLIENT MERGE SORT\n" +
+ " PARALLEL INNER-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_FULL_NAME + "\n" +
+ " SERVER FILTER BY \"order_id\" != '000000000000003'\n" +
+ " PARALLEL INNER-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" +
+ " SERVER FILTER BY NAME != 'T3'\n" +
+ " PARALLEL LEFT-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_FULL_NAME + "\n" +
+ " DYNAMIC SERVER FILTER BY \"C.customer_id\" IN (\"O.customer_id\")",
+ /*
+ * testJoinWithSubqueryAndAggregation()
+ * SELECT i.name, sum(quantity) FROM joinOrderTable o
+ * LEFT JOIN (SELECT name, item_id iid FROM joinItemTable) AS i
+ * ON o.item_id = i.iid
+ * GROUP BY i.name ORDER BY i.name
+ */
+ "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_FULL_NAME + "\n" +
+ " SERVER AGGREGATE INTO DISTINCT ROWS BY [I.NAME]\n" +
+ "CLIENT MERGE SORT\n" +
+ " PARALLEL LEFT-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME,
+ /*
+ * testJoinWithSubqueryAndAggregation()
+ * SELECT o.iid, sum(o.quantity) q
+ * FROM (SELECT item_id iid, quantity FROM joinOrderTable) AS o
+ * LEFT JOIN (SELECT item_id FROM joinItemTable) AS i
+ * ON o.iid = i.item_id
+ * GROUP BY o.iid ORDER BY q DESC
+ */
+ "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_FULL_NAME + "\n" +
+ " SERVER AGGREGATE INTO DISTINCT ROWS BY [O.IID]\n" +
+ "CLIENT MERGE SORT\n" +
+ "CLIENT SORTED BY [SUM(O.QUANTITY) DESC]\n" +
+ " PARALLEL LEFT-JOIN TABLE 0 (SKIP MERGE)\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" +
+ " SERVER FILTER BY FIRST KEY ONLY",
+ /*
+ * testJoinWithSubqueryAndAggregation()
+ * SELECT i.iid, o.q
+ * FROM (SELECT item_id iid FROM joinItemTable) AS i
+ * LEFT JOIN (SELECT item_id iid, sum(quantity) q FROM joinOrderTable GROUP BY item_id) AS o
+ * ON o.iid = i.iid
+ * ORDER BY o.q DESC NULLS LAST, i.iid
+ */
+ "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" +
+ " SERVER FILTER BY FIRST KEY ONLY\n" +
+ " SERVER SORTED BY [O.Q DESC NULLS LAST, I.IID]\n" +
+ "CLIENT MERGE SORT\n" +
+ " PARALLEL LEFT-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_FULL_NAME + "\n" +
+ " SERVER AGGREGATE INTO DISTINCT ROWS BY [\"item_id\"]\n" +
+ " CLIENT MERGE SORT",
+ /*
+ * testJoinWithSubqueryAndAggregation()
+ * SELECT i.iid, o.q
+ * FROM (SELECT item_id iid, sum(quantity) q FROM joinOrderTable GROUP BY item_id) AS o
+ * JOIN (SELECT item_id iid FROM joinItemTable) AS i
+ * ON o.iid = i.iid
+ * ORDER BY o.q DESC, i.iid
+ */
+ "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" +
+ " SERVER FILTER BY FIRST KEY ONLY\n" +
+ " SERVER SORTED BY [O.Q DESC, I.IID]\n" +
+ "CLIENT MERGE SORT\n" +
+ " PARALLEL INNER-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_FULL_NAME + "\n" +
+ " SERVER AGGREGATE INTO DISTINCT ROWS BY [\"item_id\"]\n" +
+ " CLIENT MERGE SORT",
+ /*
+ * testNestedSubqueries()
+ * SELECT * FROM (SELECT customer_id cid, name, phone, address, loc_id, date FROM joinCustomerTable) AS c
+ * INNER JOIN (SELECT o.oid ooid, o.cid ocid, o.iid oiid, o.price * o.quantity, o.date odate,
+ * qi.iiid iiid, qi.iname iname, qi.iprice iprice, qi.idiscount1 idiscount1, qi.idiscount2 idiscount2, qi.isid isid, qi.idescription idescription,
+ * qi.ssid ssid, qi.sname sname, qi.sphone sphone, qi.saddress saddress, qi.sloc_id sloc_id
+ * FROM (SELECT item_id iid, customer_id cid, order_id oid, price, quantity, date FROM joinOrderTable) AS o
+ * INNER JOIN (SELECT i.iid iiid, i.name iname, i.price iprice, i.discount1 idiscount1, i.discount2 idiscount2, i.sid isid, i.description idescription,
+ * s.sid ssid, s.name sname, s.phone sphone, s.address saddress, s.loc_id sloc_id
+ * FROM (SELECT supplier_id sid, name, phone, address, loc_id FROM joinSupplierTable) AS s
+ * RIGHT JOIN (SELECT item_id iid, name, price, discount1, discount2, supplier_id sid, description FROM joinItemTable) AS i
+ * ON i.sid = s.sid) as qi
+ * ON o.iid = qi.iiid) as qo
+ * ON c.cid = qo.ocid
+ * WHERE c.cid <= '0000000005'
+ * AND qo.ooid != '000000000000003'
+ * AND qo.iname != 'T3'
+ * ORDER BY c.cid, qo.iname
+ */
+ "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + JOIN_CUSTOMER_TABLE_FULL_NAME + " [*] - ['0000000005']\n" +
+ " SERVER SORTED BY [C.CID, QO.INAME]\n" +
+ "CLIENT MERGE SORT\n" +
+ " PARALLEL INNER-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_FULL_NAME + "\n" +
+ " SERVER FILTER BY \"order_id\" != '000000000000003'\n" +
+ " PARALLEL INNER-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" +
+ " SERVER FILTER BY NAME != 'T3'\n" +
+ " PARALLEL LEFT-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_FULL_NAME,
+ /*
+ * testJoinWithLimit()
+ * SELECT order_id, i.name, s.name, s.address, quantity
+ * FROM joinSupplierTable s
+ * LEFT JOIN joinItemTable i ON i.supplier_id = s.supplier_id
+ * LEFT JOIN joinOrderTable o ON o.item_id = i.item_id LIMIT 4
+ */
+ "CLIENT SERIAL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_FULL_NAME + "\n" +
+ " SERVER 4 ROW LIMIT\n" +
+ "CLIENT 4 ROW LIMIT\n" +
+ " PARALLEL LEFT-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ITEM_TABLE_FULL_NAME + "\n" +
+ " PARALLEL LEFT-JOIN TABLE 1(DELAYED EVALUATION)\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ORDER_TABLE_FULL_NAME + "\n" +
+ " JOIN-SCANNER 4 ROW LIMIT",
+ /*
+ * testJoinWithLimit()
+ * SELECT order_id, i.name, s.name, s.address, quantity
+ * FROM joinSupplierTable s
+ * JOIN joinItemTable i ON i.supplier_id = s.supplier_id
+ * JOIN joinOrderTable o ON o.item_id = i.item_id LIMIT 4
+ */
+ "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_FULL_NAME + "\n" +
+ "CLIENT 4 ROW LIMIT\n" +
+ " PARALLEL INNER-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ITEM_TABLE_FULL_NAME + "\n" +
+ " PARALLEL INNER-JOIN TABLE 1(DELAYED EVALUATION)\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ORDER_TABLE_FULL_NAME + "\n" +
+ " DYNAMIC SERVER FILTER BY \"S.supplier_id\" IN (\"I.supplier_id\")\n" +
+ " JOIN-SCANNER 4 ROW LIMIT",
+ /*
+ * testJoinWithSetMaxRows()
+ * statement.setMaxRows(4);
+ * SELECT order_id, i.name, quantity FROM joinItemTable i
+ * JOIN joinOrderTable o ON o.item_id = i.item_id;
+ * SELECT o.order_id, i.name, o.quantity FROM joinItemTable i
+ * JOIN (SELECT order_id, item_id, quantity FROM joinOrderTable) o
+ * ON o.item_id = i.item_id;
+ */
+ "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" +
+ "CLIENT 4 ROW LIMIT\n" +
+ " PARALLEL INNER-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ORDER_TABLE_FULL_NAME + "\n" +
+ " DYNAMIC SERVER FILTER BY \"I.item_id\" IN (\"O.item_id\")\n" +
+ " JOIN-SCANNER 4 ROW LIMIT",
+ /*
+ * testJoinWithOffset()
+ * SELECT order_id, i.name, s.name, s.address, quantity
+ * FROM joinSupplierTable s
+ * LEFT JOIN joinItemTable i ON i.supplier_id = s.supplier_id
+ * LEFT JOIN joinOrderTable o ON o.item_id = i.item_id LIMIT 1 OFFSET 2
+ */
+ "CLIENT SERIAL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_FULL_NAME + "\n" +
+ " SERVER OFFSET 2\n" +
+ " SERVER 3 ROW LIMIT\n" +
+ "CLIENT 1 ROW LIMIT\n" +
+ " PARALLEL LEFT-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ITEM_TABLE_FULL_NAME + "\n" +
+ " PARALLEL LEFT-JOIN TABLE 1(DELAYED EVALUATION)\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ORDER_TABLE_FULL_NAME + "\n" +
+ " JOIN-SCANNER 3 ROW LIMIT",
+ /*
+ * testJoinWithOffset()
+ * SELECT order_id, i.name, s.name, s.address, quantity
+ * FROM joinSupplierTable s
+ * JOIN joinItemTable i ON i.supplier_id = s.supplier_id
+ * JOIN joinOrderTable o ON o.item_id = i.item_id LIMIT 1 OFFSET 2
+ */
+ "CLIENT SERIAL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_FULL_NAME + "\n" +
+ " SERVER OFFSET 2\n" +
+ "CLIENT 1 ROW LIMIT\n" +
+ " PARALLEL INNER-JOIN TABLE 0\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ITEM_TABLE_FULL_NAME + "\n" +
+ " PARALLEL INNER-JOIN TABLE 1(DELAYED EVALUATION)\n" +
+ " CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ORDER_TABLE_FULL_NAME + "\n" +
+ " DYNAMIC SERVER FILTER BY \"S.supplier_id\" IN (\"I.supplier_id\")\n" +
+ " JOIN-SCANNER 3 ROW LIMIT",
+ }});
+ return testCases;
+ }
+
+ @Test
+ public void testInnerJoin() throws Exception {
+ // it involves sequences which may be incremented on re-try when hash
+ // cache is removed so this test may flap sometimes, so we don't need to
+ // test it for this case.
+ }
+
+ @Test
+ public void testUpsertWithJoin() throws Exception {
+ // TODO: We will enable this test once PHOENIX-3163
+ }
+
+ public static class InvalidateHashCache extends SimpleRegionObserver {
+ public static Random rand= new Random();
+ public static List<ImmutableBytesPtr> lastRemovedJoinIds=new ArrayList<ImmutableBytesPtr>();
+ @Override
+ public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan,
+ final RegionScanner s) throws IOException {
+ final HashJoinInfo joinInfo = HashJoinInfo.deserializeHashJoinFromScan(scan);
+ if (joinInfo != null) {
+ TenantCache cache = GlobalCache.getTenantCache(c.getEnvironment(), null);
+ int count = joinInfo.getJoinIds().length;
+ for (int i = 0; i < count; i++) {
+ ImmutableBytesPtr joinId = joinInfo.getJoinIds()[i];
+ if (!ByteUtil.contains(lastRemovedJoinIds,joinId)) {
+ lastRemovedJoinIds.add(joinId);
+ cache.removeServerCache(joinId);
+ }
+ }
+ }
+ return s;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7865a59b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
index 7bdea5f..edab319 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import java.io.IOException;
import java.sql.Connection;
import java.sql.Date;
import java.sql.DriverManager;
@@ -34,23 +35,44 @@ import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
-
+import java.util.Random;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.phoenix.cache.GlobalCache;
+import org.apache.phoenix.cache.ServerCacheClient;
+import org.apache.phoenix.cache.TenantCache;
import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.join.HashJoinInfo;
import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
@RunWith(Parameterized.class)
public class HashJoinIT extends BaseJoinIT {
+
+
public HashJoinIT(String[] indexDDL, String[] plans) {
super(indexDDL, plans);
}
@@ -1166,8 +1188,7 @@ public class HashJoinIT extends BaseJoinIT {
@Test
public void testDefaultJoin() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
+ Connection conn = getConnection();
String tableName1 = getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME);
String tableName2 = getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME);
String query = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + tableName1 + " item JOIN " + tableName2 + " supp ON item.\"supplier_id\" = supp.\"supplier_id\"";
@@ -1213,8 +1234,7 @@ public class HashJoinIT extends BaseJoinIT {
@Test
public void testInnerJoin() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
+ Connection conn = getConnection();
String tableName1 = getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME);
String tableName2 = getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME);
String query = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name, next value for " + seqName + " FROM " + tableName1 + " item INNER JOIN " + tableName2 + " supp ON item.\"supplier_id\" = supp.\"supplier_id\"";
@@ -1266,8 +1286,7 @@ public class HashJoinIT extends BaseJoinIT {
@Test
public void testLeftJoin() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
+ Connection conn = getConnection();
String tableName1 = getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME);
String tableName2 = getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME);
String query[] = new String[3];
@@ -1325,8 +1344,7 @@ public class HashJoinIT extends BaseJoinIT {
@Test
public void testRightJoin() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
+ Connection conn = getConnection();
String tableName1 = getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME);
String tableName2 = getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME);
String query = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + tableName1 + " supp RIGHT JOIN " + tableName2 + " item ON item.\"supplier_id\" = supp.\"supplier_id\" ORDER BY \"item_id\"";
@@ -1377,8 +1395,7 @@ public class HashJoinIT extends BaseJoinIT {
@Test
public void testInnerJoinWithPreFilters() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
+ Connection conn = getConnection();
String tableName1 = getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME);
String tableName2 = getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME);
String query1 = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + tableName1 + " item INNER JOIN " + tableName2 + " supp ON item.\"supplier_id\" = supp.\"supplier_id\" AND supp.\"supplier_id\" BETWEEN '0000000001' AND '0000000005'";
@@ -1441,8 +1458,7 @@ public class HashJoinIT extends BaseJoinIT {
@Test
public void testLeftJoinWithPreFilters() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
+ Connection conn = getConnection();
String tableName1 = getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME);
String tableName2 = getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME);
String query = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + tableName1 + " item LEFT JOIN " + tableName2 + " supp ON item.\"supplier_id\" = supp.\"supplier_id\" AND (supp.\"supplier_id\" = '0000000001' OR supp.\"supplier_id\" = '0000000005') ORDER BY \"item_id\"";
@@ -1493,8 +1509,7 @@ public class HashJoinIT extends BaseJoinIT {
@Test
public void testJoinWithPostFilters() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
+ Connection conn = getConnection();
String tableName1 = getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME);
String tableName2 = getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME);
String query1 = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + tableName1 + " supp RIGHT JOIN " + tableName2 + " item ON item.\"supplier_id\" = supp.\"supplier_id\" WHERE supp.\"supplier_id\" BETWEEN '0000000001' AND '0000000005'";
@@ -1557,8 +1572,7 @@ public class HashJoinIT extends BaseJoinIT {
@Test
public void testStarJoin() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
+ Connection conn = getConnection();
String tableName1 = getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME);
String tableName2 = getTableName(conn, JOIN_CUSTOMER_TABLE_FULL_NAME);
String tableName3 = getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME);
@@ -1631,8 +1645,7 @@ public class HashJoinIT extends BaseJoinIT {
@Test
public void testLeftJoinWithAggregation() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
+ Connection conn = getConnection();
String tableName1 = getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME);
String tableName2 = getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME);
String query1 = "SELECT i.name, sum(quantity) FROM " + tableName1 + " o LEFT JOIN "
@@ -1717,8 +1730,7 @@ public class HashJoinIT extends BaseJoinIT {
@Test
public void testRightJoinWithAggregation() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
+ Connection conn = getConnection();
String tableName1 = getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME);
String tableName2 = getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME);
String query1 = "SELECT i.name, sum(quantity) FROM " + tableName1 + " o RIGHT JOIN "
@@ -1790,8 +1802,7 @@ public class HashJoinIT extends BaseJoinIT {
@Test
public void testLeftRightJoin() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
+ Connection conn = getConnection();
String tableName1 = getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME);
String tableName2 = getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME);
String tableName3 = getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME);
@@ -1896,8 +1907,7 @@ public class HashJoinIT extends BaseJoinIT {
@Test
public void testMultiLeftJoin() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
+ Connection conn = getConnection();
String[] queries = {
"SELECT \"order_id\", i.name, s.name, quantity, \"DATE\" FROM " + getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME) + " o LEFT JOIN "
+ getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " i ON o.\"item_id\" = i.\"item_id\" LEFT JOIN "
@@ -1949,8 +1959,7 @@ public class HashJoinIT extends BaseJoinIT {
@Test
public void testMultiRightJoin() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
+ Connection conn = getConnection();
String query = "SELECT \"order_id\", i.name, s.name, quantity, \"DATE\" FROM " + getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME) + " o RIGHT JOIN "
+ getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " i ON o.\"item_id\" = i.\"item_id\" RIGHT JOIN "
+ getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME) + " s ON i.\"supplier_id\" = s.\"supplier_id\" ORDER BY \"order_id\", s.\"supplier_id\" DESC";
@@ -2025,6 +2034,7 @@ public class HashJoinIT extends BaseJoinIT {
public void testMultiRightJoin_SmallChunkSize() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, "1");
+ props.put(ServerCacheClient.HASH_JOIN_SERVER_CACHE_RESEND_PER_SERVER, "true");
Connection conn = DriverManager.getConnection(getUrl(), props);
String query = "SELECT \"order_id\", i.name, s.name, quantity, \"DATE\" FROM " + getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME) + " o RIGHT JOIN "
+ getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " i ON o.\"item_id\" = i.\"item_id\" RIGHT JOIN "
@@ -2096,8 +2106,7 @@ public class HashJoinIT extends BaseJoinIT {
@Test
public void testJoinWithWildcard() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
+ Connection conn = getConnection();
String query = "SELECT * FROM " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " LEFT JOIN " + getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME) + " supp ON " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + ".\"supplier_id\" = supp.\"supplier_id\" ORDER BY \"item_id\"";
try {
PreparedStatement statement = conn.prepareStatement(query);
@@ -2205,8 +2214,7 @@ public class HashJoinIT extends BaseJoinIT {
@Test
public void testJoinWithTableWildcard() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
+ Connection conn = getConnection();
String query = "SELECT s.*, "+ getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + ".*, \"order_id\" FROM " + getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME) + " o RIGHT JOIN "
+ getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " i ON o.\"item_id\" = i.\"item_id\" RIGHT JOIN "
+ getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME) + " s ON i.\"supplier_id\" = s.\"supplier_id\" ORDER BY \"order_id\", s.\"supplier_id\" DESC";
@@ -2351,8 +2359,7 @@ public class HashJoinIT extends BaseJoinIT {
@Test
public void testJoinMultiJoinKeys() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
+ Connection conn = getConnection();
String query = "SELECT c.name, s.name FROM " + getTableName(conn, JOIN_CUSTOMER_TABLE_FULL_NAME) + " c LEFT JOIN " + getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME) + " s ON \"customer_id\" = \"supplier_id\" AND c.loc_id = s.loc_id AND substr(s.name, 2, 1) = substr(c.name, 2, 1)";
try {
PreparedStatement statement = conn.prepareStatement(query);
@@ -2384,8 +2391,7 @@ public class HashJoinIT extends BaseJoinIT {
@Test
public void testJoinWithDifferentNumericJoinKeyTypes() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
+ Connection conn = getConnection();
String query = "SELECT \"order_id\", i.name, i.price, discount2, quantity FROM " + getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME) + " o INNER JOIN "
+ getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " i ON o.\"item_id\" = i.\"item_id\" AND o.price = (i.price * (100 - discount2)) / 100.0 WHERE quantity < 5000";
try {
@@ -2406,8 +2412,7 @@ public class HashJoinIT extends BaseJoinIT {
@Test
public void testJoinWithDifferentDateJoinKeyTypes() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
+ Connection conn = getConnection();
String query = "SELECT \"order_id\", c.name, o.\"DATE\" FROM " + getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME) + " o INNER JOIN "
+ getTableName(conn, JOIN_CUSTOMER_TABLE_FULL_NAME) + " c ON o.\"customer_id\" = c.\"customer_id\" AND o.\"DATE\" = c.\"DATE\"";
try {
@@ -2438,8 +2443,7 @@ public class HashJoinIT extends BaseJoinIT {
@Test
public void testJoinWithIncomparableJoinKeyTypes() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
+ Connection conn = getConnection();
String query = "SELECT \"order_id\", i.name, i.price, discount2, quantity FROM " + getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME) + " o INNER JOIN "
+ getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " i ON o.\"item_id\" = i.\"item_id\" AND o.price / 100 = substr(i.name, 2, 1)";
try {
@@ -2455,8 +2459,7 @@ public class HashJoinIT extends BaseJoinIT {
@Test
public void testJoinPlanWithIndex() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
+ Connection conn = getConnection();
String query1 = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " item LEFT JOIN " + getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME) + " supp ON substr(item.name, 2, 1) = substr(supp.name, 2, 1) AND (supp.name BETWEEN 'S1' AND 'S5') WHERE item.name BETWEEN 'T1' AND 'T5'";
String query2 = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " item INNER JOIN " + getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME) + " supp ON item.\"supplier_id\" = supp.\"supplier_id\" WHERE (item.name = 'T1' OR item.name = 'T5') AND (supp.name = 'S1' OR supp.name = 'S5')";
try {
@@ -2518,8 +2521,7 @@ public class HashJoinIT extends BaseJoinIT {
@Test
public void testJoinWithSkipMergeOptimization() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
+ Connection conn = getConnection();
String query = "SELECT s.name FROM " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " i JOIN "
+ getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME) + " o ON o.\"item_id\" = i.\"item_id\" AND quantity < 5000 JOIN "
+ getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME) + " s ON i.\"supplier_id\" = s.\"supplier_id\"";
@@ -2546,8 +2548,7 @@ public class HashJoinIT extends BaseJoinIT {
@Test
public void testSelfJoin() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
+ Connection conn = getConnection();
String query1 = "SELECT i2.\"item_id\", i1.name FROM " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " i1 JOIN "
+ getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " i2 ON i1.\"item_id\" = i2.\"item_id\" ORDER BY i1.\"item_id\"";
String query2 = "SELECT i1.name, i2.name FROM " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " i1 JOIN "
@@ -2615,8 +2616,7 @@ public class HashJoinIT extends BaseJoinIT {
@Test
public void testUpsertWithJoin() throws Exception {
String tempTable = generateUniqueName();
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
+ Connection conn = getConnection();
conn.setAutoCommit(true);
try {
conn.createStatement().execute("CREATE TABLE " + tempTable
@@ -2768,8 +2768,7 @@ public class HashJoinIT extends BaseJoinIT {
@Test
public void testSubJoin() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
+ Connection conn = getConnection();
String query1 = "SELECT i.name, count(c.name), min(s.name), max(quantity) FROM " + getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME) + " o LEFT JOIN "
+ "(" + getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME) + " s RIGHT JOIN " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " i ON i.\"supplier_id\" = s.\"supplier_id\")"
+ " ON o.\"item_id\" = i.\"item_id\" LEFT JOIN "
@@ -2894,8 +2893,7 @@ public class HashJoinIT extends BaseJoinIT {
@Test
public void testJoinWithSubquery() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
+ Connection conn = getConnection();
String query1 = "SELECT item.\"item_id\", item.name, supp.sid, supp.name FROM " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " item INNER JOIN (SELECT reverse(loc_id), \"supplier_id\" sid, name FROM " + getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME) + " WHERE name BETWEEN 'S1' AND 'S5') AS supp ON item.\"supplier_id\" = supp.sid";
String query2 = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " item INNER JOIN (SELECT reverse(loc_id), \"supplier_id\", name FROM " + getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME) + ") AS supp ON item.\"supplier_id\" = supp.\"supplier_id\" AND (supp.name = 'S1' OR supp.name = 'S5')";
try {
@@ -2956,8 +2954,7 @@ public class HashJoinIT extends BaseJoinIT {
@Test
public void testJoinWithSubqueryPostFilters() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
+ Connection conn = getConnection();
try {
String query = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " item INNER JOIN (SELECT reverse(loc_id), \"supplier_id\", name FROM " + getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME) + " LIMIT 5) AS supp ON item.\"supplier_id\" = supp.\"supplier_id\" AND (supp.name != 'S1')";
PreparedStatement statement = conn.prepareStatement(query);
@@ -3006,8 +3003,7 @@ public class HashJoinIT extends BaseJoinIT {
@Test
public void testJoinWithSubqueryAndAggregation() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
+ Connection conn = getConnection();
String query1 = "SELECT i.name, sum(quantity) FROM " + getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME) + " o LEFT JOIN (SELECT name, \"item_id\" iid FROM "
+ getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + ") AS i ON o.\"item_id\" = i.iid GROUP BY i.name ORDER BY i.name";
String query2 = "SELECT o.iid, sum(o.quantity) q FROM (SELECT \"item_id\" iid, quantity FROM " + getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME) + ") AS o LEFT JOIN (SELECT \"item_id\" FROM "
@@ -3112,8 +3108,7 @@ public class HashJoinIT extends BaseJoinIT {
@Test
public void testNestedSubqueries() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
+ Connection conn = getConnection();
String query1 = "SELECT q.iname, count(c.name), min(q.sname), max(o.quantity) FROM (SELECT \"customer_id\" cid, \"item_id\" iid, quantity FROM " + getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME) + ") AS o LEFT JOIN "
+ "(SELECT i.iid iid, s.name sname, i.name iname FROM (SELECT \"supplier_id\" sid, name FROM " + getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME) + ") AS s RIGHT JOIN (SELECT \"item_id\" iid, name, \"supplier_id\" sid FROM " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + ") AS i ON i.sid = s.sid) AS q"
+ " ON o.iid = q.iid LEFT JOIN (SELECT \"customer_id\" cid, name FROM "
@@ -3235,8 +3230,7 @@ public class HashJoinIT extends BaseJoinIT {
@Test
public void testJoinWithLimit() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
+ Connection conn = getConnection();
String query1 = "SELECT \"order_id\", i.name, s.name, s.address, quantity FROM " + getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME) + " s LEFT JOIN "
+ getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " i ON i.\"supplier_id\" = s.\"supplier_id\" LEFT JOIN "
+ getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME) + " o ON o.\"item_id\" = i.\"item_id\" LIMIT 4";
@@ -3314,8 +3308,7 @@ public class HashJoinIT extends BaseJoinIT {
@Test
public void testJoinWithOffset() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
+ Connection conn = getConnection();
String query1 = "SELECT \"order_id\", i.name, s.name, s.address, quantity FROM " + getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME)
+ " s LEFT JOIN " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " i ON i.\"supplier_id\" = s.\"supplier_id\" LEFT JOIN "
+ getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME) + " o ON o.\"item_id\" = i.\"item_id\" LIMIT 1 OFFSET 2 ";
@@ -3357,8 +3350,7 @@ public class HashJoinIT extends BaseJoinIT {
@Test
public void testNonEquiJoin() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
+ Connection conn = getConnection();
try {
String query = "SELECT item.name, supp.name FROM " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " item, " + getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME) + " supp WHERE item.\"supplier_id\" > supp.\"supplier_id\"";
PreparedStatement statement = conn.prepareStatement(query);
@@ -3414,8 +3406,7 @@ public class HashJoinIT extends BaseJoinIT {
@Test
public void testJoinWithSetMaxRows() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
+ Connection conn = getConnection();
String [] queries = new String[2];
queries[0] = "SELECT \"order_id\", i.name, quantity FROM " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " i JOIN "
+ getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME) + " o ON o.\"item_id\" = i.\"item_id\"";
@@ -3453,4 +3444,5 @@ public class HashJoinIT extends BaseJoinIT {
conn.close();
}
}
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7865a59b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
index 067f50f..12630f4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
@@ -690,7 +690,7 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
conn.commit();
doneSignal.await(30, TimeUnit.SECONDS);
// Install coprocessor that will simulate an index write failure during index rebuild
- addWriteFailingCoprocessor(conn,fullIndexName);
+ TestUtil.addCoprocessor(conn,fullIndexName,WriteFailingRegionObserver.class);
clock.time += WAIT_AFTER_DISABLED;
doneSignal.await(30, TimeUnit.SECONDS);
WAIT_FOR_REBUILD_TO_START.await(30, TimeUnit.SECONDS);
@@ -843,27 +843,6 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
t.start();
}
- private static void addWriteFailingCoprocessor(Connection conn, String tableName) throws Exception {
- int priority = QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY + 100;
- ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
- HTableDescriptor descriptor = services.getTableDescriptor(Bytes.toBytes(tableName));
- descriptor.addCoprocessor(WriteFailingRegionObserver.class.getName(), null, priority, null);
- int numTries = 10;
- try (HBaseAdmin admin = services.getAdmin()) {
- admin.modifyTable(Bytes.toBytes(tableName), descriptor);
- while (!admin.getTableDescriptor(Bytes.toBytes(tableName)).equals(descriptor)
- && numTries > 0) {
- numTries--;
- if (numTries == 0) {
- throw new Exception(
- "Check to detect if delaying co-processor was added failed after "
- + numTries + " retries.");
- }
- Thread.sleep(1000);
- }
- }
- }
-
private static void removeWriteFailingCoprocessor(Connection conn, String tableName) throws Exception {
ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
HTableDescriptor descriptor = services.getTableDescriptor(Bytes.toBytes(tableName));
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7865a59b/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java b/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java
index 5e13982..23bfebd 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java
@@ -18,14 +18,18 @@
package org.apache.phoenix.iterate;
import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.monitoring.ScanMetricsHolder;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.tuple.Tuple;
@@ -41,16 +45,16 @@ public class DelayedTableResultIteratorFactory implements TableResultIteratorFac
@Override
public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef,
Scan scan, ScanMetricsHolder scanMetricsHolder, long renewLeaseThreshold,
- QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException {
+ QueryPlan plan, ParallelScanGrouper scanGrouper, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException {
return new DelayedTableResultIterator(mutationState, tableRef, scan, scanMetricsHolder,
- renewLeaseThreshold, plan, scanGrouper);
+ renewLeaseThreshold, plan, scanGrouper, caches);
}
private class DelayedTableResultIterator extends TableResultIterator {
public DelayedTableResultIterator(MutationState mutationState, TableRef tableRef, Scan scan,
ScanMetricsHolder scanMetricsHolder, long renewLeaseThreshold, QueryPlan plan,
- ParallelScanGrouper scanGrouper) throws SQLException {
- super(mutationState, scan, scanMetricsHolder, renewLeaseThreshold, plan, scanGrouper);
+ ParallelScanGrouper scanGrouper, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException {
+ super(mutationState, scan, scanMetricsHolder, renewLeaseThreshold, plan, scanGrouper, caches);
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7865a59b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
index 18e4034..ce46a3e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
@@ -21,6 +21,9 @@ import static org.apache.phoenix.monitoring.TaskExecutionMetricsHolder.NO_OP_INS
import static org.apache.phoenix.util.LogUtil.addCustomAnnotations;
import java.io.Closeable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
@@ -56,9 +59,10 @@ import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.AddServerCac
import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.RemoveServerCacheRequest;
import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.RemoveServerCacheResponse;
import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.ServerCachingService;
-import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.job.JobManager.JobCallable;
+import org.apache.phoenix.join.HashCacheFactory;
+import org.apache.phoenix.memory.InsufficientMemoryException;
import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
import org.apache.phoenix.query.ConnectionQueryServices;
@@ -67,6 +71,7 @@ import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.Closeables;
import org.apache.phoenix.util.SQLCloseable;
import org.apache.phoenix.util.SQLCloseables;
@@ -86,6 +91,7 @@ public class ServerCacheClient {
public static final byte[] KEY_IN_FIRST_REGION = new byte[]{0};
private static final Log LOG = LogFactory.getLog(ServerCacheClient.class);
private static final Random RANDOM = new Random();
+ public static final String HASH_JOIN_SERVER_CACHE_RESEND_PER_SERVER = "hash.join.server.cache.resend.per.server";
private final PhoenixConnection connection;
private final Map<Integer, TableRef> cacheUsingTableRefMap = new ConcurrentHashMap<Integer, TableRef>();
@@ -115,12 +121,41 @@ public class ServerCacheClient {
public class ServerCache implements SQLCloseable {
private final int size;
private final byte[] id;
- private final ImmutableSet<HRegionLocation> servers;
+ private final Set<HRegionLocation> servers;
+ private ImmutableBytesWritable cachePtr;
+ private MemoryChunk chunk;
+ private File outputFile;
- public ServerCache(byte[] id, Set<HRegionLocation> servers, int size) {
+
+ public ServerCache(byte[] id, Set<HRegionLocation> servers, ImmutableBytesWritable cachePtr,
+ ConnectionQueryServices services, boolean storeCacheOnClient) throws IOException {
this.id = id;
- this.servers = ImmutableSet.copyOf(servers);
- this.size = size;
+ this.servers = new HashSet<HRegionLocation>(servers);
+ this.size = cachePtr.getLength();
+ if (storeCacheOnClient) {
+ try {
+ this.chunk = services.getMemoryManager().allocate(cachePtr.getLength());
+ this.cachePtr = cachePtr;
+ } catch (InsufficientMemoryException e) {
+ this.outputFile = File.createTempFile("HashJoinCacheSpooler", ".bin", new File(services.getProps()
+ .get(QueryServices.SPOOL_DIRECTORY, QueryServicesOptions.DEFAULT_SPOOL_DIRECTORY)));
+ try (FileOutputStream fio = new FileOutputStream(outputFile)) {
+ fio.write(cachePtr.get(), cachePtr.getOffset(), cachePtr.getLength());
+ }
+ }
+ }
+
+ }
+
+ public ImmutableBytesWritable getCachePtr() throws IOException {
+ if(this.outputFile!=null){
+ try (FileInputStream fio = new FileInputStream(outputFile)) {
+ byte[] b = new byte[this.size];
+ fio.read(b);
+ cachePtr = new ImmutableBytesWritable(b);
+ }
+ }
+ return cachePtr;
}
/**
@@ -136,22 +171,41 @@ public class ServerCacheClient {
public byte[] getId() {
return id;
}
-
+
+ public boolean addServer(HRegionLocation loc) {
+ return this.servers.add(loc);
+ }
+
/**
* Call to free up cache on region servers when no longer needed
*/
@Override
public void close() throws SQLException {
- removeServerCache(id, servers);
+ try{
+ removeServerCache(this, servers);
+ }finally{
+ cachePtr = null;
+ if (chunk != null) {
+ chunk.close();
+ }
+ if (outputFile != null) {
+ outputFile.delete();
+ }
+ }
}
-
+
+ }
+
+ public ServerCache addServerCache(ScanRanges keyRanges, final ImmutableBytesWritable cachePtr, final byte[] txState,
+ final ServerCacheFactory cacheFactory, final TableRef cacheUsingTableRef) throws SQLException {
+ return addServerCache(keyRanges, cachePtr, txState, cacheFactory, cacheUsingTableRef, false);
}
- public ServerCache addServerCache(ScanRanges keyRanges, final ImmutableBytesWritable cachePtr, final byte[] txState, final ServerCacheFactory cacheFactory, final TableRef cacheUsingTableRef) throws SQLException {
+ public ServerCache addServerCache(ScanRanges keyRanges, final ImmutableBytesWritable cachePtr, final byte[] txState,
+ final ServerCacheFactory cacheFactory, final TableRef cacheUsingTableRef, boolean storeCacheOnClient)
+ throws SQLException {
ConnectionQueryServices services = connection.getQueryServices();
- MemoryChunk chunk = services.getMemoryManager().allocate(cachePtr.getLength());
List<Closeable> closeables = new ArrayList<Closeable>();
- closeables.add(chunk);
ServerCache hashCacheSpec = null;
SQLException firstException = null;
final byte[] cacheId = generateId();
@@ -187,54 +241,7 @@ public class ServerCacheClient {
@Override
public Boolean call() throws Exception {
- final Map<byte[], AddServerCacheResponse> results;
- try {
- results = htable.coprocessorService(ServerCachingService.class, key, key,
- new Batch.Call<ServerCachingService, AddServerCacheResponse>() {
- @Override
- public AddServerCacheResponse call(ServerCachingService instance) throws IOException {
- ServerRpcController controller = new ServerRpcController();
- BlockingRpcCallback<AddServerCacheResponse> rpcCallback =
- new BlockingRpcCallback<AddServerCacheResponse>();
- AddServerCacheRequest.Builder builder = AddServerCacheRequest.newBuilder();
- final byte[] tenantIdBytes;
- if(cacheUsingTable.isMultiTenant()) {
- try {
- tenantIdBytes = connection.getTenantId() == null ? null :
- ScanUtil.getTenantIdBytes(
- cacheUsingTable.getRowKeySchema(),
- cacheUsingTable.getBucketNum() != null,
- connection.getTenantId(), cacheUsingTable.getViewIndexId() != null);
- } catch (SQLException e) {
- throw new IOException(e);
- }
- } else {
- tenantIdBytes = connection.getTenantId() == null ? null : connection.getTenantId().getBytes();
- }
- if (tenantIdBytes != null) {
- builder.setTenantId(ByteStringer.wrap(tenantIdBytes));
- }
- builder.setCacheId(ByteStringer.wrap(cacheId));
- builder.setCachePtr(org.apache.phoenix.protobuf.ProtobufUtil.toProto(cachePtr));
- builder.setHasProtoBufIndexMaintainer(true);
- ServerCacheFactoryProtos.ServerCacheFactory.Builder svrCacheFactoryBuider = ServerCacheFactoryProtos.ServerCacheFactory.newBuilder();
- svrCacheFactoryBuider.setClassName(cacheFactory.getClass().getName());
- builder.setCacheFactory(svrCacheFactoryBuider.build());
- builder.setTxState(ByteStringer.wrap(txState));
- instance.addServerCache(controller, builder.build(), rpcCallback);
- if(controller.getFailedOn() != null) {
- throw controller.getFailedOn();
- }
- return rpcCallback.get();
- }
- });
- } catch (Throwable t) {
- throw new Exception(t);
- }
- if(results != null && results.size() == 1){
- return results.values().iterator().next().getReturn();
- }
- return false;
+ return addServerCache(htable, key, cacheUsingTable, cacheId, cachePtr, cacheFactory, txState);
}
/**
@@ -257,7 +264,7 @@ public class ServerCacheClient {
}
}
- hashCacheSpec = new ServerCache(cacheId,servers,cachePtr.getLength());
+ hashCacheSpec = new ServerCache(cacheId,servers,cachePtr, services, storeCacheOnClient);
// Execute in parallel
int timeoutMs = services.getProps().getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS);
for (Future<Boolean> future : futures) {
@@ -303,73 +310,81 @@ public class ServerCacheClient {
* @throws SQLException
* @throws IllegalStateException if hashed table cannot be removed on any region server on which it was added
*/
- private void removeServerCache(final byte[] cacheId, Set<HRegionLocation> servers) throws SQLException {
- ConnectionQueryServices services = connection.getQueryServices();
- Throwable lastThrowable = null;
- TableRef cacheUsingTableRef = cacheUsingTableRefMap.get(Bytes.mapKey(cacheId));
- final PTable cacheUsingTable = cacheUsingTableRef.getTable();
- byte[] tableName = cacheUsingTableRef.getTable().getPhysicalName().getBytes();
- HTableInterface iterateOverTable = services.getTable(tableName);
- try {
- List<HRegionLocation> locations = services.getAllTableRegions(tableName);
- Set<HRegionLocation> remainingOnServers = new HashSet<HRegionLocation>(servers);
- /**
- * Allow for the possibility that the region we based where to send our cache has split and been
- * relocated to another region server *after* we sent it, but before we removed it. To accommodate
- * this, we iterate through the current metadata boundaries and remove the cache once for each
- * server that we originally sent to.
- */
- if (LOG.isDebugEnabled()) {LOG.debug(addCustomAnnotations("Removing Cache " + cacheId + " from servers.", connection));}
- for (HRegionLocation entry : locations) {
- if (remainingOnServers.contains(entry)) { // Call once per server
- try {
+ private void removeServerCache(final ServerCache cache, Set<HRegionLocation> remainingOnServers) throws SQLException {
+ HTableInterface iterateOverTable = null;
+ final byte[] cacheId = cache.getId();
+ try {
+ ConnectionQueryServices services = connection.getQueryServices();
+ Throwable lastThrowable = null;
+ TableRef cacheUsingTableRef = cacheUsingTableRefMap.get(Bytes.mapKey(cacheId));
+ final PTable cacheUsingTable = cacheUsingTableRef.getTable();
+ byte[] tableName = cacheUsingTableRef.getTable().getPhysicalName().getBytes();
+ iterateOverTable = services.getTable(tableName);
+
+ List<HRegionLocation> locations = services.getAllTableRegions(tableName);
+ /**
+ * Allow for the possibility that the region we based where to send our cache has split and been relocated
+ * to another region server *after* we sent it, but before we removed it. To accommodate this, we iterate
+ * through the current metadata boundaries and remove the cache once for each server that we originally sent
+ * to.
+ */
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(addCustomAnnotations("Removing Cache " + cacheId + " from servers.", connection));
+ }
+ for (HRegionLocation entry : locations) {
+ // Call once per server
+ if (remainingOnServers.contains(entry)) {
+ try {
byte[] key = getKeyInRegion(entry.getRegionInfo().getStartKey());
- iterateOverTable.coprocessorService(ServerCachingService.class, key, key,
- new Batch.Call<ServerCachingService, RemoveServerCacheResponse>() {
- @Override
- public RemoveServerCacheResponse call(ServerCachingService instance) throws IOException {
- ServerRpcController controller = new ServerRpcController();
- BlockingRpcCallback<RemoveServerCacheResponse> rpcCallback =
- new BlockingRpcCallback<RemoveServerCacheResponse>();
- RemoveServerCacheRequest.Builder builder = RemoveServerCacheRequest.newBuilder();
- final byte[] tenantIdBytes;
- if(cacheUsingTable.isMultiTenant()) {
- try {
- tenantIdBytes = connection.getTenantId() == null ? null :
- ScanUtil.getTenantIdBytes(
- cacheUsingTable.getRowKeySchema(),
- cacheUsingTable.getBucketNum() != null,
- connection.getTenantId(), cacheUsingTable.getViewIndexId() != null);
- } catch (SQLException e) {
- throw new IOException(e);
+ iterateOverTable.coprocessorService(ServerCachingService.class, key, key,
+ new Batch.Call<ServerCachingService, RemoveServerCacheResponse>() {
+ @Override
+ public RemoveServerCacheResponse call(ServerCachingService instance)
+ throws IOException {
+ ServerRpcController controller = new ServerRpcController();
+ BlockingRpcCallback<RemoveServerCacheResponse> rpcCallback = new BlockingRpcCallback<RemoveServerCacheResponse>();
+ RemoveServerCacheRequest.Builder builder = RemoveServerCacheRequest
+ .newBuilder();
+ final byte[] tenantIdBytes;
+ if (cacheUsingTable.isMultiTenant()) {
+ try {
+ tenantIdBytes = connection.getTenantId() == null ? null
+ : ScanUtil.getTenantIdBytes(cacheUsingTable.getRowKeySchema(),
+ cacheUsingTable.getBucketNum() != null,
+ connection.getTenantId(),
+ cacheUsingTable.getViewIndexId() != null);
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ } else {
+ tenantIdBytes = connection.getTenantId() == null ? null
+ : connection.getTenantId().getBytes();
+ }
+ if (tenantIdBytes != null) {
+ builder.setTenantId(ByteStringer.wrap(tenantIdBytes));
+ }
+ builder.setCacheId(ByteStringer.wrap(cacheId));
+ instance.removeServerCache(controller, builder.build(), rpcCallback);
+ if (controller.getFailedOn() != null) { throw controller.getFailedOn(); }
+ return rpcCallback.get();
}
- } else {
- tenantIdBytes = connection.getTenantId() == null ? null : connection.getTenantId().getBytes();
- }
- if (tenantIdBytes != null) {
- builder.setTenantId(ByteStringer.wrap(tenantIdBytes));
- }
- builder.setCacheId(ByteStringer.wrap(cacheId));
- instance.removeServerCache(controller, builder.build(), rpcCallback);
- if(controller.getFailedOn() != null) {
- throw controller.getFailedOn();
- }
- return rpcCallback.get();
- }
- });
- remainingOnServers.remove(entry);
- } catch (Throwable t) {
- lastThrowable = t;
- LOG.error(addCustomAnnotations("Error trying to remove hash cache for " + entry, connection), t);
- }
- }
- }
- if (!remainingOnServers.isEmpty()) {
- LOG.warn(addCustomAnnotations("Unable to remove hash cache for " + remainingOnServers, connection), lastThrowable);
- }
- } finally {
- Closeables.closeQuietly(iterateOverTable);
- }
+ });
+ remainingOnServers.remove(entry);
+ } catch (Throwable t) {
+ lastThrowable = t;
+ LOG.error(addCustomAnnotations("Error trying to remove hash cache for " + entry, connection),
+ t);
+ }
+ }
+ }
+ if (!remainingOnServers.isEmpty()) {
+ LOG.warn(addCustomAnnotations("Unable to remove hash cache for " + remainingOnServers, connection),
+ lastThrowable);
+ }
+ } finally {
+ cacheUsingTableRefMap.remove(cacheId);
+ Closeables.closeQuietly(iterateOverTable);
+ }
}
/**
@@ -394,4 +409,77 @@ public class ServerCacheClient {
}
return regionStartKey;
}
+
+ public boolean addServerCache(byte[] startkeyOfRegion, ServerCache cache, HashCacheFactory cacheFactory,
+ byte[] txState, PTable pTable) throws Exception {
+ HTableInterface table = null;
+ boolean success = true;
+ byte[] cacheId = cache.getId();
+ try {
+ ConnectionQueryServices services = connection.getQueryServices();
+
+ byte[] tableName = pTable.getPhysicalName().getBytes();
+ table = services.getTable(tableName);
+ HRegionLocation tableRegionLocation = services.getTableRegionLocation(tableName, startkeyOfRegion);
+ if (cache.addServer(tableRegionLocation) || services.getProps().getBoolean(HASH_JOIN_SERVER_CACHE_RESEND_PER_SERVER,false)) {
+ success = addServerCache(table, startkeyOfRegion, pTable, cacheId, cache.getCachePtr(), cacheFactory,
+ txState);
+ }
+ return success;
+ } finally {
+ Closeables.closeQuietly(table);
+ }
+ }
+
+ public boolean addServerCache(HTableInterface htable, byte[] key, final PTable cacheUsingTable, final byte[] cacheId,
+ final ImmutableBytesWritable cachePtr, final ServerCacheFactory cacheFactory, final byte[] txState)
+ throws Exception {
+ byte[] keyInRegion = getKeyInRegion(key);
+ final Map<byte[], AddServerCacheResponse> results;
+ try {
+ results = htable.coprocessorService(ServerCachingService.class, keyInRegion, keyInRegion,
+ new Batch.Call<ServerCachingService, AddServerCacheResponse>() {
+ @Override
+ public AddServerCacheResponse call(ServerCachingService instance) throws IOException {
+ ServerRpcController controller = new ServerRpcController();
+ BlockingRpcCallback<AddServerCacheResponse> rpcCallback = new BlockingRpcCallback<AddServerCacheResponse>();
+ AddServerCacheRequest.Builder builder = AddServerCacheRequest.newBuilder();
+ final byte[] tenantIdBytes;
+ if (cacheUsingTable.isMultiTenant()) {
+ try {
+ tenantIdBytes = connection.getTenantId() == null ? null
+ : ScanUtil.getTenantIdBytes(cacheUsingTable.getRowKeySchema(),
+ cacheUsingTable.getBucketNum() != null, connection.getTenantId(),
+ cacheUsingTable.getViewIndexId() != null);
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ } else {
+ tenantIdBytes = connection.getTenantId() == null ? null
+ : connection.getTenantId().getBytes();
+ }
+ if (tenantIdBytes != null) {
+ builder.setTenantId(ByteStringer.wrap(tenantIdBytes));
+ }
+ builder.setCacheId(ByteStringer.wrap(cacheId));
+ builder.setCachePtr(org.apache.phoenix.protobuf.ProtobufUtil.toProto(cachePtr));
+ builder.setHasProtoBufIndexMaintainer(true);
+ ServerCacheFactoryProtos.ServerCacheFactory.Builder svrCacheFactoryBuider = ServerCacheFactoryProtos.ServerCacheFactory
+ .newBuilder();
+ svrCacheFactoryBuider.setClassName(cacheFactory.getClass().getName());
+ builder.setCacheFactory(svrCacheFactoryBuider.build());
+ builder.setTxState(ByteStringer.wrap(txState));
+ instance.addServerCache(controller, builder.build(), rpcCallback);
+ if (controller.getFailedOn() != null) { throw controller.getFailedOn(); }
+ return rpcCallback.get();
+ }
+ });
+ } catch (Throwable t) {
+ throw new Exception(t);
+ }
+ if (results != null && results.size() == 1) { return results.values().iterator().next().getReturn(); }
+ return false;
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7865a59b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 34361ac..83cc24e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.coprocessor;
import java.io.IOException;
import java.util.List;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.DoNotRetryIOException;