You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@phoenix.apache.org by "Maryann Xue (JIRA)" <ji...@apache.org> on 2017/08/29 05:54:00 UTC
[jira] [Comment Edited] (PHOENIX-3999) Optimize inner joins as
SKIP-SCAN-JOIN when possible
[ https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16144780#comment-16144780 ]
Maryann Xue edited comment on PHOENIX-3999 at 8/29/17 5:53 AM:
---------------------------------------------------------------
The reason why hash cache is used in this query is that there is "b.BATCH_SEQUENCE_NUM" in the SELECT clause, so we have to do the actual join operation for referenced fields (in this case, only 1 field) from RHS. Although, in some sense the join is driven "by the client side" through skip-scan filter, only that we cannot omit the join operation as we would for semi-joins. So my point is, for this query, the best has been done.
was (Author: maryannxue):
The reason why hash cache is used in this query is that there is "b.BATCH_SEQUENCE_NUM" in the SELECT clause, so we have to do the actual join operation for referenced fields (in this case, only 1 field) from RHS. Although, in some sense the join is driven "by the client side" through skip-scan filter, only that we cannot omit the join operation as we would for semi-joins.
> Optimize inner joins as SKIP-SCAN-JOIN when possible
> ----------------------------------------------------
>
> Key: PHOENIX-3999
> URL: https://issues.apache.org/jira/browse/PHOENIX-3999
> Project: Phoenix
> Issue Type: Bug
> Reporter: James Taylor
>
> Semi joins on the leading part of the primary key end up doing batches of point queries (as opposed to a broadcast hash join), however inner joins do not.
> Here's a set of example schemas that executes a skip scan on the inner query:
> {code}
> CREATE TABLE COMPLETED_BATCHES (
> BATCH_SEQUENCE_NUM BIGINT NOT NULL,
> BATCH_ID BIGINT NOT NULL,
> CONSTRAINT PK PRIMARY KEY
> (
> BATCH_SEQUENCE_NUM,
> BATCH_ID
> )
> );
> CREATE TABLE ITEMS (
> BATCH_ID BIGINT NOT NULL,
> ITEM_ID BIGINT NOT NULL,
> ITEM_TYPE BIGINT,
> ITEM_VALUE VARCHAR,
> CONSTRAINT PK PRIMARY KEY
> (
> BATCH_ID,
> ITEM_ID
> )
> );
> CREATE TABLE COMPLETED_ITEMS (
> ITEM_TYPE BIGINT NOT NULL,
> BATCH_SEQUENCE_NUM BIGINT NOT NULL,
> ITEM_ID BIGINT NOT NULL,
> ITEM_VALUE VARCHAR,
> CONSTRAINT PK PRIMARY KEY
> (
> ITEM_TYPE,
> BATCH_SEQUENCE_NUM,
> ITEM_ID
> )
> );
> {code}
> The explain plan of these indicate that a dynamic filter will be performed like this:
> {code}
> UPSERT SELECT
> CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS
> SKIP-SCAN-JOIN TABLE 0
> CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2]
> SERVER FILTER BY FIRST KEY ONLY
> SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID]
> CLIENT MERGE SORT
> DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9)
> {code}
> We should also be able to leverage this optimization when an inner join is used such as this:
> {code}
> UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, ITEM_VALUE)
> SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE
> FROM ITEMS i, COMPLETED_BATCHES b
> WHERE b.BATCH_ID = i.BATCH_ID AND
> b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000;
> {code}
> A complete unit test looks like this:
> {code}
> @Test
> public void testNestedLoopJoin() throws Exception {
> try (Connection conn = DriverManager.getConnection(getUrl())) {
> String t1="COMPLETED_BATCHES";
> String ddl1 = "CREATE TABLE " + t1 + " (\n" +
> " BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" +
> " BATCH_ID BIGINT NOT NULL,\n" +
> " CONSTRAINT PK PRIMARY KEY\n" +
> " (\n" +
> " BATCH_SEQUENCE_NUM,\n" +
> " BATCH_ID\n" +
> " )\n" +
> ")" +
> "";
> conn.createStatement().execute(ddl1);
>
> String t2="ITEMS";
> String ddl2 = "CREATE TABLE " + t2 + " (\n" +
> " BATCH_ID BIGINT NOT NULL,\n" +
> " ITEM_ID BIGINT NOT NULL,\n" +
> " ITEM_TYPE BIGINT,\n" +
> " ITEM_VALUE VARCHAR,\n" +
> " CONSTRAINT PK PRIMARY KEY\n" +
> " (\n" +
> " BATCH_ID,\n" +
> " ITEM_ID\n" +
> " )\n" +
> ")";
> conn.createStatement().execute(ddl2);
> String t3="COMPLETED_ITEMS";
> String ddl3 = "CREATE TABLE " + t3 + "(\n" +
> " ITEM_TYPE BIGINT NOT NULL,\n" +
> " BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" +
> " ITEM_ID BIGINT NOT NULL,\n" +
> " ITEM_VALUE VARCHAR,\n" +
> " CONSTRAINT PK PRIMARY KEY\n" +
> " (\n" +
> " ITEM_TYPE,\n" +
> " BATCH_SEQUENCE_NUM, \n" +
> " ITEM_ID\n" +
> " )\n" +
> ")";
> conn.createStatement().execute(ddl3);
> conn.createStatement().execute("UPSERT INTO "+t1+"(BATCH_SEQUENCE_NUM, batch_id) VALUES (1,2)");
> conn.createStatement().execute("UPSERT INTO "+t1+"(BATCH_SEQUENCE_NUM, batch_id) VALUES (1,4)");
> conn.createStatement().execute("UPSERT INTO "+t2+"(batch_id, item_id, item_type, item_value) VALUES (1,100, 10, 'a')");
> conn.createStatement().execute("UPSERT INTO "+t2+"(batch_id, item_id, item_type, item_value) VALUES (2,200, 20, 'a')");
> conn.createStatement().execute("UPSERT INTO "+t2+"(batch_id, item_id, item_type, item_value) VALUES (3,300, 10, 'a')");
> conn.createStatement().execute("UPSERT INTO "+t2+"(batch_id, item_id, item_type, item_value) VALUES (4,400, 20, 'a')");
> conn.createStatement().execute("UPSERT INTO "+t2+"(batch_id, item_id, item_type, item_value) VALUES (5,500, 10, 'a')");
> conn.commit();
>
> conn.setAutoCommit(true);
> String dml = "UPSERT INTO " + t3 + " (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, ITEM_VALUE)\n" +
> "SELECT ITEM_TYPE, 1, ITEM_ID, ITEM_VALUE \n" +
> "FROM " + t2 + " i\n" +
> "WHERE EXISTS (" +
> " SELECT 1 FROM " + t1 + " b WHERE b.BATCH_ID = i.BATCH_ID AND " +
> " b.BATCH_SEQUENCE_NUM > 0 AND b.BATCH_SEQUENCE_NUM < 2)";
> conn.createStatement().execute(dml);
> ResultSet rs = conn.createStatement().executeQuery("SELECT ITEM_ID FROM " + t3);
> assertTrue(rs.next());
> assertEquals(rs.getLong(1), 200L);
> assertTrue(rs.next());
> assertEquals(rs.getLong(1), 400L);
> assertFalse(rs.next());
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)