You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2017/03/16 21:12:10 UTC
[48/50] [abbrv] phoenix git commit: Fix merge conflicts;
fix compilation errors; fix test failures
http://git-wip-us.apache.org/repos/asf/phoenix/blob/db78bd6f/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteLocalIndexIT.java
----------------------------------------------------------------------
diff --cc phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteLocalIndexIT.java
index 15452b7,0000000..1324717
mode 100644,000000..100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteLocalIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteLocalIndexIT.java
@@@ -1,280 -1,0 +1,280 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.calcite;
+
+import static org.junit.Assert.fail;
+
+import java.sql.SQLException;
+import java.util.Properties;
+
+import org.junit.Test;
+
+public class CalciteLocalIndexIT extends BaseCalciteIndexIT {
+
+ public CalciteLocalIndexIT() {
+ super(true);
+ }
+
+ @Test public void testIndex() throws Exception {
+ final Start start1000 = start(true, 1000f);
+ final Start start1000000 = start(true, 1000000f);
+
+ start1000.sql("select * from aTable where b_string = 'b'")
+ .explainIs("PhoenixToEnumerableConverter\n" +
+ " PhoenixServerProject(ORGANIZATION_ID=[$1], ENTITY_ID=[$2], A_STRING=[$3], B_STRING=[$0], A_INTEGER=[$4], A_DATE=[$5], A_TIME=[$6], A_TIMESTAMP=[$7], X_DECIMAL=[$8], X_LONG=[$9], X_INTEGER=[$10], Y_INTEGER=[$11], A_BYTE=[$12], A_SHORT=[$13], A_FLOAT=[$14], A_DOUBLE=[$15], A_UNSIGNED_FLOAT=[$16], A_UNSIGNED_DOUBLE=[$17])\n" +
+ " PhoenixTableScan(table=[[phoenix, IDX_FULL]], filter=[=($0, 'b')])\n");
+ start1000.sql("select x_integer from aTable")
+ .explainIs("PhoenixToEnumerableConverter\n" +
+ " PhoenixServerProject(X_INTEGER=[$10])\n" +
+ " PhoenixTableScan(table=[[phoenix, ATABLE]])\n");
+ /*.explainIs("PhoenixToEnumerableConverter\n" +
+ " PhoenixServerProject(X_INTEGER=[$4])\n" +
+ " PhoenixTableScan(table=[[phoenix, IDX1]])\n")*/
+ start1000.sql("select a_string from aTable order by a_string")
+ .explainIs("PhoenixToEnumerableConverter\n" +
+ " PhoenixServerProject(0:A_STRING=[$0])\n" +
+ " PhoenixTableScan(table=[[phoenix, IDX1]], scanOrder=[FORWARD])\n");
+ start1000000.sql("select a_string from aTable order by organization_id")
+ .explainIs("PhoenixToEnumerableConverter\n" +
+ " PhoenixServerProject(A_STRING=[$2], ORGANIZATION_ID=[$0])\n" +
+ " PhoenixTableScan(table=[[phoenix, ATABLE]], scanOrder=[FORWARD])\n");
+ start1000.sql("select a_integer from aTable order by a_string")
+ .explainIs("PhoenixToEnumerableConverter\n" +
+ " PhoenixServerSort(sort0=[$1], dir0=[ASC])\n" +
+ " PhoenixServerProject(A_INTEGER=[$4], A_STRING=[$2])\n" +
+ " PhoenixTableScan(table=[[phoenix, ATABLE]])\n");
+ /*.explainIs("PhoenixToEnumerableConverter\n" +
+ " PhoenixServerSort(sort0=[$1], dir0=[ASC])\n" +
+ " PhoenixServerProject(A_INTEGER=[$4], A_STRING=[$3])\n" +
+ " PhoenixTableScan(table=[[phoenix, IDX_FULL]])\n")*/
+ start1000.sql("select a_string, b_string from aTable where a_string = 'a'")
+ .explainMatches("PhoenixToEnumerableConverter\n" +
+ " PhoenixServerProject\\((0:)?A_STRING=\\[\\$0\\], (0:)?B_STRING=\\[\\$3\\]\\)\n" +
+ " PhoenixTableScan\\(table=\\[\\[phoenix, IDX1\\]\\], filter=\\[=\\(\\$0, 'a'\\)\\]\\)\n");
+ start1000.sql("select a_string, b_string from aTable where b_string = 'b'")
+ .explainIs("PhoenixToEnumerableConverter\n" +
+ " PhoenixServerProject(A_STRING=[$3], B_STRING=[$0])\n" +
+ " PhoenixTableScan(table=[[phoenix, IDX2]], filter=[=($0, 'b')])\n");
+ start1000.sql("select a_string, b_string, x_integer, y_integer from aTable where b_string = 'b'")
+ .explainIs("PhoenixToEnumerableConverter\n" +
+ " PhoenixServerProject(A_STRING=[$3], B_STRING=[$0], X_INTEGER=[$10], Y_INTEGER=[$11])\n" +
+ " PhoenixTableScan(table=[[phoenix, IDX_FULL]], filter=[=($0, 'b')])\n");
+ start1000.sql("select a_string, count(*) from aTable group by a_string")
+ .explainIs("PhoenixToEnumerableConverter\n" +
+ " PhoenixServerAggregate(group=[{0}], EXPR$1=[COUNT()], isOrdered=[true])\n" +
+ " PhoenixTableScan(table=[[phoenix, IDX1]], scanOrder=[FORWARD])\n");
+
+ start1000.close();
+ start1000000.close();
+ }
+
+ @Test public void testSaltedIndex() throws Exception {
+ final Start start1 = start(true, 1f);
+ start1.sql("select count(*) from " + NOSALT_TABLE_NAME + " where col0 > 3")
+ .explainIs("PhoenixToEnumerableConverter\n" +
+ " PhoenixServerAggregate(group=[{}], EXPR$0=[COUNT()])\n" +
+ " PhoenixTableScan(table=[[phoenix, IDXSALTED_NOSALT_TEST_TABLE]], filter=[>(CAST($0):INTEGER, 3)])\n")
+ .resultIs(0, new Object[][]{{999L}});
+ start1.sql("select mypk0, mypk1, col0 from " + NOSALT_TABLE_NAME + " where col0 <= 4")
+ .explainIs("PhoenixToEnumerableConverter\n" +
+ " PhoenixServerProject(MYPK0=[$1], MYPK1=[$2], COL0=[CAST($0):INTEGER])\n" +
+ " PhoenixTableScan(table=[[phoenix, IDXSALTED_NOSALT_TEST_TABLE]], filter=[<=(CAST($0):INTEGER, 4)])\n")
+ .resultIs(0, new Object[][] {
+ {2, 3, 4},
+ {1, 2, 3}});
+ start1.sql("select * from " + SALTED_TABLE_NAME + " where mypk0 < 3")
+ .explainIs("PhoenixToEnumerableConverter\n" +
+ " PhoenixTableScan(table=[[phoenix, SALTED_TEST_TABLE]], filter=[<($0, 3)])\n")
+ .resultIs(0, new Object[][] {
+ {1, 2, 3, 4},
+ {2, 3, 4, 5}});
+ start1.sql("select count(*) from " + SALTED_TABLE_NAME + " where col0 > 3")
+ .explainIs("PhoenixToEnumerableConverter\n" +
+ " PhoenixServerAggregate(group=[{}], EXPR$0=[COUNT()])\n" +
+ " PhoenixTableScan(table=[[phoenix, IDX_SALTED_TEST_TABLE]], filter=[>(CAST($0):INTEGER, 3)])\n")
+ .sameResultAsPhoenixStandalone(0)
+ /*.resultIs(0, new Object[][]{{999L}})*/;
+ start1.sql("select mypk0, mypk1, col0 from " + SALTED_TABLE_NAME + " where col0 <= 4")
+ .explainIs("PhoenixToEnumerableConverter\n" +
+ " PhoenixServerProject(MYPK0=[$1], MYPK1=[$2], COL0=[CAST($0):INTEGER])\n" +
+ " PhoenixTableScan(table=[[phoenix, IDX_SALTED_TEST_TABLE]], filter=[<=(CAST($0):INTEGER, 4)])\n")
+ .resultIs(0, new Object[][] {
+ {2, 3, 4},
+ {1, 2, 3}});
+ start1.sql("select count(*) from " + SALTED_TABLE_NAME + " where col1 > 4")
+ .explainIs("PhoenixToEnumerableConverter\n" +
+ " PhoenixServerAggregate(group=[{}], EXPR$0=[COUNT()])\n" +
+ " PhoenixTableScan(table=[[phoenix, IDXSALTED_SALTED_TEST_TABLE]], filter=[>(CAST($0):INTEGER, 4)])\n")
+ .sameResultAsPhoenixStandalone(0)
+ /*.resultIs(0, new Object[][]{{999L}})*/;
+ start1.sql("select * from " + SALTED_TABLE_NAME + " where col1 <= 5 order by col1")
+ .explainIs("PhoenixToEnumerableConverter\n" +
+ " PhoenixServerProject(MYPK0=[$1], MYPK1=[$2], COL0=[$3], COL1=[CAST($0):INTEGER])\n" +
+ " PhoenixTableScan(table=[[phoenix, IDXSALTED_SALTED_TEST_TABLE]], filter=[<=(CAST($0):INTEGER, 5)], scanOrder=[FORWARD])\n")
+ .resultIs(new Object[][] {
+ {1, 2, 3, 4},
+ {2, 3, 4, 5}});
+ start1.sql("select * from " + SALTED_TABLE_NAME + " s1, " + SALTED_TABLE_NAME + " s2 where s1.mypk1 = s2.mypk1 and s1.mypk0 > 500 and s2.col1 < 505")
+ .explainIs("PhoenixToEnumerableConverter\n" +
+ " PhoenixServerJoin(condition=[=($1, $5)], joinType=[inner])\n" +
+ " PhoenixTableScan(table=[[phoenix, SALTED_TEST_TABLE]], filter=[>($0, 500)])\n" +
+ " PhoenixServerProject(MYPK0=[$1], MYPK1=[$2], COL0=[$3], COL1=[CAST($0):INTEGER])\n" +
+ " PhoenixTableScan(table=[[phoenix, IDXSALTED_SALTED_TEST_TABLE]], filter=[<(CAST($0):INTEGER, 505)])\n")
+ .resultIs(0, new Object[][] {
+ {501, 502, 503, 504, 501, 502, 503, 504}});
+ start1.close();
+ }
+
+ @Test public void testMultiTenant() throws Exception {
+ Properties props = getConnectionProps(true, 1f);
+ final Start start = start(props);
+ props = getConnectionProps(true, 1f);
+ props.setProperty("TenantId", "15");
+ final Start startTenant15 = start(props);
+ props = getConnectionProps(true, 1f);
+ props.setProperty("TenantId", "10");
+ final Start startTenant10 = start(props);
+ props = getConnectionProps(true, 1f);
+ props.setProperty("TenantId", "20");
+ final Start startTenant20 = start(props);
+
+ start.sql("select * from " + MULTI_TENANT_TABLE + " where tenant_id = '10' and id <= '0004'")
+ .explainIs("PhoenixToEnumerableConverter\n" +
- " PhoenixTableScan(table=[[phoenix, MULTITENANT_TEST_TABLE]], filter=[AND(=(CAST($0):VARCHAR(2) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\" NOT NULL, '10'), <=($1, '0004'))])\n")
++ " PhoenixTableScan(table=[[phoenix, MULTITENANT_TEST_TABLE]], filter=[AND(=($0, CAST('10'):VARCHAR CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\" NOT NULL), <=($1, '0004'))])\n")
+ .resultIs(0, new Object[][] {
+ {"10", "0002", 3, 4, 5},
+ {"10", "0003", 4, 5, 6},
+ {"10", "0004", 5, 6, 7}});
+
+ start.sql("select * from " + MULTI_TENANT_TABLE + " where tenant_id = '20' and col1 < 8")
+ .explainIs("PhoenixToEnumerableConverter\n" +
+ " PhoenixServerProject(TENANT_ID=[$0], ID=[$2], COL0=[$3], COL1=[CAST($1):INTEGER], COL2=[$4])\n" +
- " PhoenixTableScan(table=[[phoenix, IDX_MULTITENANT_TEST_TABLE]], filter=[AND(=(CAST($0):VARCHAR(2) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\" NOT NULL, '20'), <(CAST($1):INTEGER, 8))])\n");
- // .resultIs(0, new Object[][] {
- // {"20", "0004", 5, 6, 7},
- // {"20", "0005", 6, 7, 8}});
++ " PhoenixTableScan(table=[[phoenix, IDX_MULTITENANT_TEST_TABLE]], filter=[AND(=($0, CAST('20'):VARCHAR CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\" NOT NULL), <(CAST($1):INTEGER, 8))])\n")
++ /*.resultIs(0, new Object[][] {
++ {"20", "0004", 5, 6, 7},
++ {"20", "0005", 6, 7, 8}})*/;
+
+ try {
+ start.sql("select * from " + MULTI_TENANT_VIEW1)
+ .explainIs("");
+ fail("Should have got SQLException.");
+ } catch (SQLException e) {
+ }
+
+ startTenant15.sql("select * from " + MULTI_TENANT_TABLE + " where id = '0284'")
+ .explainIs("PhoenixToEnumerableConverter\n" +
- " PhoenixTableScan(table=[[phoenix, MULTITENANT_TEST_TABLE]], filter=[=(CAST($0):VARCHAR(4) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\" NOT NULL, '0284')])\n")
++ " PhoenixTableScan(table=[[phoenix, MULTITENANT_TEST_TABLE]], filter=[=($0, CAST('0284'):VARCHAR CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\" NOT NULL)])\n")
+ .resultIs(0, new Object[][] {
+ {"0284", 285, 286, 287}});
+
+ startTenant15.sql("select * from " + MULTI_TENANT_TABLE + " where col1 > 1000")
+ .explainIs("PhoenixToEnumerableConverter\n" +
+ " PhoenixServerProject(ID=[$1], COL0=[$2], COL1=[CAST($0):INTEGER], COL2=[$3])\n" +
+ " PhoenixTableScan(table=[[phoenix, IDX_MULTITENANT_TEST_TABLE]], filter=[>(CAST($0):INTEGER, 1000)])\n")
+ .resultIs(0, new Object[][] {
+ {"0999", 1000, 1001, 1002},
+ {"1000", 1001, 1002, 1003},
+ {"1001", 1002, 1003, 1004},
+ {"1002", 1003, 1004, 1005}});
+
+ try {
+ startTenant15.sql("select * from " + MULTI_TENANT_VIEW1)
+ .explainIs("");
+ fail("Should have got SQLException.");
+ } catch (SQLException e) {
+ }
+
+ startTenant10.sql("select * from " + MULTI_TENANT_VIEW1 + " where id = '0512'")
+ .explainIs("PhoenixToEnumerableConverter\n" +
- " PhoenixTableScan(table=[[phoenix, MULTITENANT_TEST_TABLE]], filter=[=(CAST($0):VARCHAR(4) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\" NOT NULL, '0512')])\n")
++ " PhoenixTableScan(table=[[phoenix, MULTITENANT_TEST_TABLE]], filter=[=($0, CAST('0512'):VARCHAR CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\" NOT NULL)])\n")
+ .resultIs(0, new Object[][] {
+ {"0512", 513, 514, 515}});
+
+ startTenant10.sql("select * from " + MULTI_TENANT_TABLE + " where col1 <= 6")
+ .explainIs("PhoenixToEnumerableConverter\n" +
+ " PhoenixServerProject(ID=[$1], COL0=[$2], COL1=[CAST($0):INTEGER], COL2=[$3])\n" +
+ " PhoenixTableScan(table=[[phoenix, IDX_MULTITENANT_TEST_TABLE]], filter=[<=(CAST($0):INTEGER, 6)])\n")
+ .sameResultAsPhoenixStandalone(0)
+ /*.resultIs(0, new Object[][] {
+ {"0002", 3, 4, 5},
+ {"0003", 4, 5, 6},
+ {"0004", 5, 6, 7}})*/;
+
+ startTenant10.sql("select id, col0 from " + MULTI_TENANT_VIEW1 + " where col0 >= 1000")
+ .explainIs("PhoenixToEnumerableConverter\n" +
+ " PhoenixServerProject(ID=[$1], COL0=[CAST($0):INTEGER])\n" +
+ " PhoenixTableScan(table=[[phoenix, S1, IDX_MULTITENANT_TEST_VIEW1]], filter=[>=(CAST($0):INTEGER, 1000)])\n")
+ .sameResultAsPhoenixStandalone(0)
+ /*.resultIs(0, new Object[][] {
+ {"0999", 1000},
+ {"1000", 1001},
+ {"1001", 1002}})*/;
+
+ startTenant10.sql("select * from " + MULTI_TENANT_VIEW1 + " where col0 = 1000")
+ .explainIs("PhoenixToEnumerableConverter\n" +
+ " PhoenixServerProject(ID=[$1], COL0=[CAST($0):INTEGER], COL1=[$2], COL2=[$3])\n" +
+ " PhoenixTableScan(table=[[phoenix, S1, IDX_MULTITENANT_TEST_VIEW1]], filter=[=(CAST($0):INTEGER, 1000)], extendedColumns=[{2, 3}])\n")
+ .sameResultAsPhoenixStandalone(0)
+ /*.resultIs(0, new Object[][] {
+ {"0999", 1000, 1001, 1002}})*/;
+
+ startTenant10.sql("select id, col0, col2 from " + MULTI_TENANT_VIEW1 + " where col0 = 1000")
+ .explainIs("PhoenixToEnumerableConverter\n" +
+ " PhoenixServerProject(ID=[$1], COL0=[CAST($0):INTEGER], COL2=[$3])\n" +
+ " PhoenixTableScan(table=[[phoenix, S1, IDX_MULTITENANT_TEST_VIEW1]], filter=[=(CAST($0):INTEGER, 1000)], extendedColumns=[{3}])\n")
+ .sameResultAsPhoenixStandalone(0)
+ /*.resultIs(0, new Object[][] {
+ {"0999", 1000, 1002}})*/;
+
+ startTenant20.sql("select * from " + MULTI_TENANT_VIEW2 + " where id = '0765'")
+ .explainIs("PhoenixToEnumerableConverter\n" +
- " PhoenixTableScan(table=[[phoenix, MULTITENANT_TEST_TABLE]], filter=[AND(>($3, 7), =(CAST($0):VARCHAR(4) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\" NOT NULL, '0765'))])\n")
++ " PhoenixTableScan(table=[[phoenix, MULTITENANT_TEST_TABLE]], filter=[AND(>($3, 7), =($0, CAST('0765'):VARCHAR CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\" NOT NULL))])\n")
+ .resultIs(0, new Object[][] {
+ {"0765", 766, 767, 768}});
+
+ startTenant20.sql("select id, col0 from " + MULTI_TENANT_VIEW2 + " where col0 between 272 and 275")
+ .explainIs("PhoenixToEnumerableConverter\n" +
+ " PhoenixServerProject(ID=[$1], COL0=[CAST($0):INTEGER])\n" +
+ " PhoenixTableScan(table=[[phoenix, S2, IDX_MULTITENANT_TEST_VIEW2]], filter=[AND(>=(CAST($0):INTEGER, 272), <=(CAST($0):INTEGER, 275))])\n")
+ .sameResultAsPhoenixStandalone(0)
+ /*.resultIs(0, new Object[][] {
+ {"0271", 272},
+ {"0272", 273},
+ {"0273", 274},
+ {"0274", 275}})*/;
+
+ startTenant20.sql("select id, col0 from " + MULTI_TENANT_VIEW2 + " order by col0 limit 5")
+ .explainIs("PhoenixToEnumerableConverter\n" +
+ " PhoenixLimit(fetch=[5])\n" +
+ " PhoenixServerProject(ID=[$1], COL0=[CAST($0):INTEGER])\n" +
+ " PhoenixTableScan(table=[[phoenix, S2, IDX_MULTITENANT_TEST_VIEW2]], scanOrder=[FORWARD])\n")
+ .sameResultAsPhoenixStandalone()
+ /*.resultIs(new Object[][] {
+ {"0005", 6},
+ {"0006", 7},
+ {"0007", 8},
+ {"0008", 9},
+ {"0009", 10}})*/;
+
+ start.close();
+ startTenant15.close();
+ startTenant10.close();
+ startTenant20.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/db78bd6f/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseJoinIT.java
----------------------------------------------------------------------
diff --cc phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseJoinIT.java
index 1b3731c,152bdf0..4a779da
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseJoinIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseJoinIT.java
@@@ -60,14 -60,14 +60,14 @@@ public abstract class BaseJoinIT extend
" \"item_id\" varchar(10), " +
" price integer, " +
" quantity integer, " +
- " \"DATE\" timestamp)");
- " date timestamp) IMMUTABLE_ROWS=true");
++ " \"DATE\" timestamp) IMMUTABLE_ROWS=true");
builder.put(JOIN_CUSTOMER_TABLE_FULL_NAME, "create table " + JOIN_CUSTOMER_TABLE_FULL_NAME +
" (\"customer_id\" varchar(10) not null primary key, " +
" name varchar, " +
" phone varchar(12), " +
" address varchar, " +
" loc_id varchar(5), " +
- " \"DATE\" date)");
- " date date) IMMUTABLE_ROWS=true");
++ " \"DATE\" date) IMMUTABLE_ROWS=true");
builder.put(JOIN_ITEM_TABLE_FULL_NAME, "create table " + JOIN_ITEM_TABLE_FULL_NAME +
" (\"item_id\" varchar(10) not null primary key, " +
" name varchar, " +
http://git-wip-us.apache.org/repos/asf/phoenix/blob/db78bd6f/phoenix-core/src/it/java/org/apache/phoenix/end2end/UserDefinedFunctionsIT.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/db78bd6f/phoenix-core/src/main/java/org/apache/calcite/jdbc/PhoenixCalciteFactory.java
----------------------------------------------------------------------
diff --cc phoenix-core/src/main/java/org/apache/calcite/jdbc/PhoenixCalciteFactory.java
index 5e04281,0000000..f1d8048
mode 100644,000000..100644
--- a/phoenix-core/src/main/java/org/apache/calcite/jdbc/PhoenixCalciteFactory.java
+++ b/phoenix-core/src/main/java/org/apache/calcite/jdbc/PhoenixCalciteFactory.java
@@@ -1,738 -1,0 +1,710 @@@
+package org.apache.calcite.jdbc;
+
+import java.io.File;
+import java.io.InputStream;
+import java.io.Reader;
+import java.sql.DatabaseMetaData;
+import java.sql.Date;
+import java.sql.NClob;
+import java.sql.ResultSetMetaData;
+import java.sql.RowId;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.sql.SQLXML;
+import java.sql.Savepoint;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.ResultSet;
+import java.util.Calendar;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TimeZone;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.avatica.AvaticaConnection;
+import org.apache.calcite.avatica.AvaticaDatabaseMetaData;
+import org.apache.calcite.avatica.AvaticaFactory;
+import org.apache.calcite.avatica.AvaticaPreparedStatement;
+import org.apache.calcite.avatica.AvaticaResultSetMetaData;
+import org.apache.calcite.avatica.AvaticaStatement;
+import org.apache.calcite.avatica.Meta;
+import org.apache.calcite.avatica.Meta.Signature;
+import org.apache.calcite.avatica.Meta.StatementHandle;
+import org.apache.calcite.avatica.remote.AvaticaHttpClientFactory;
+import org.apache.calcite.avatica.remote.Service.Factory;
+import org.apache.calcite.avatica.remote.TypedValue;
+import org.apache.calcite.avatica.util.Casing;
+import org.apache.calcite.avatica.util.Quoting;
+import org.apache.calcite.config.CalciteConnectionConfig;
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.config.NullCollation;
+import org.apache.calcite.avatica.QueryState;
+import org.apache.calcite.avatica.UnregisteredDriver;
+import org.apache.calcite.jdbc.CalciteConnectionImpl;
+import org.apache.calcite.jdbc.CalciteFactory;
+import org.apache.calcite.jdbc.Driver;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.Ord;
+import org.apache.calcite.model.JsonSchema.Type;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.validate.SqlConformance;
+import org.apache.phoenix.calcite.CalciteUtils;
+import org.apache.phoenix.calcite.PhoenixSchema;
+import org.apache.phoenix.calcite.PhoenixSqlConformance;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.RuntimeContext;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+
+public class PhoenixCalciteFactory extends CalciteFactory {
+
+ public PhoenixCalciteFactory() {
+ this(4, 1);
+ }
+
+ protected PhoenixCalciteFactory(int major, int minor) {
+ super(major, minor);
+ }
+
+ public AvaticaConnection newConnection(UnregisteredDriver driver,
+ AvaticaFactory factory, String url, Properties info,
+ CalciteSchema rootSchema, JavaTypeFactory typeFactory) {
+ return new PhoenixCalciteConnection(
+ (Driver) driver, factory, url, info,
+ CalciteSchema.createRootSchema(true, false), typeFactory);
+ }
+
+ @Override
+ public AvaticaDatabaseMetaData newDatabaseMetaData(
+ AvaticaConnection connection) {
+ return new PhoenixCalciteDatabaseMetaData(
+ (PhoenixCalciteConnection) connection);
+ }
+
+ @Override
+ public AvaticaStatement newStatement(AvaticaConnection connection,
+ StatementHandle h, int resultSetType, int resultSetConcurrency,
+ int resultSetHoldability) throws SQLException {
+ return new PhoenixCalciteStatement((PhoenixCalciteConnection) connection,
+ h, resultSetType, resultSetConcurrency, resultSetHoldability);
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public AvaticaPreparedStatement newPreparedStatement(
+ AvaticaConnection connection, StatementHandle h,
+ Signature signature, int resultSetType, int resultSetConcurrency,
+ int resultSetHoldability) throws SQLException {
+ return new PhoenixCalcitePreparedStatement(
+ (PhoenixCalciteConnection) connection, h,
+ (CalcitePrepare.CalciteSignature) signature,
+ resultSetType, resultSetConcurrency, resultSetHoldability);
+ }
+
+ @Override
+ public CalciteResultSet newResultSet(AvaticaStatement statement, QueryState state,
+ Meta.Signature signature, TimeZone timeZone, Meta.Frame firstFrame) {
+ final ResultSetMetaData metaData =
+ newResultSetMetaData(statement, signature);
+ @SuppressWarnings("rawtypes")
+ final CalcitePrepare.CalciteSignature calciteSignature =
+ (CalcitePrepare.CalciteSignature) signature;
+ return new CalciteResultSet(statement, calciteSignature, metaData, timeZone,
+ firstFrame);
+ }
+
+ @Override
+ public ResultSetMetaData newResultSetMetaData(AvaticaStatement statement,
+ Meta.Signature signature) {
+ return new AvaticaResultSetMetaData(statement, null, signature);
+ }
+
+ private static class PhoenixCalciteConnection extends CalciteConnectionImpl {
+ private final Map<Meta.StatementHandle, ImmutableList<RuntimeContext>> runtimeContextMap =
+ new ConcurrentHashMap<Meta.StatementHandle, ImmutableList<RuntimeContext>>();
+
+ public PhoenixCalciteConnection(Driver driver, AvaticaFactory factory, String url,
+ Properties info, final CalciteSchema rootSchema,
+ JavaTypeFactory typeFactory) {
+ super(driver, factory, url, info, rootSchema, typeFactory);
+ }
+
+ @Override
+ public CalciteConnectionConfig config() {
+ final CalciteConnectionConfig config = super.config();
+ return new DelegateCalciteConnectionConfig(config) {
+ @Override
+ public SqlConformance conformance() {
+ return PhoenixSqlConformance.INSTANCE;
+ }
+ };
+ }
+
+ @Override
+ public CalciteStatement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+ try {
+ return super.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
+ } catch (SQLException e) {
+ throw CalciteUtils.unwrapSqlException(e);
+ }
+ }
+
+ @Override
+ public CalcitePreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+ try {
+ return super.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
+ } catch (SQLException e) {
+ throw CalciteUtils.unwrapSqlException(e);
+ }
+ }
+
+ public <T> Enumerable<T> enumerable(Meta.StatementHandle handle,
+ CalcitePrepare.CalciteSignature<T> signature) throws SQLException {
+ Map<String, Object> map = Maps.newLinkedHashMap();
+ AvaticaStatement statement = lookupStatement(handle);
+ final List<TypedValue> parameterValues =
+ TROJAN.getParameterValues(statement);
+ final Calendar calendar = Calendar.getInstance();
+ for (Ord<TypedValue> o : Ord.zip(parameterValues)) {
+ map.put("?" + o.i, o.e.toJdbc(calendar));
+ }
+ ImmutableList<RuntimeContext> ctxList = runtimeContextMap.get(handle);
+ if (ctxList == null) {
+ List<RuntimeContext> activeCtx = RuntimeContext.THREAD_LOCAL.get();
+ ctxList = ImmutableList.copyOf(activeCtx);
+ runtimeContextMap.put(handle, ctxList);
+ activeCtx.clear();
+ }
+ for (RuntimeContext runtimeContext : ctxList) {
+ runtimeContext.setBindParameterValues(map);
+ }
+ return super.enumerable(handle, signature);
+ }
+
+ @Override
+ public void abort(final Executor executor) throws SQLException {
+ call(new PhoenixConnectionCallable() {
+ @Override
+ public void call(PhoenixConnection conn) throws SQLException {
+ conn.abort(executor);
+ }});
+ }
+
+ @Override
+ public void rollback() throws SQLException {
+ call(new PhoenixConnectionCallable() {
+ @Override
+ public void call(PhoenixConnection conn) throws SQLException {
+ conn.rollback();
+ }});
+ }
+
+ @Override
+ public void setReadOnly(final boolean readOnly) throws SQLException {
+ call(new PhoenixConnectionCallable() {
+ @Override
+ public void call(PhoenixConnection conn) throws SQLException {
+ conn.setReadOnly(readOnly);
+ }});
+ super.setReadOnly(readOnly);
+ }
+
+ @Override
+ public void setTransactionIsolation(final int level) throws SQLException {
+ call(new PhoenixConnectionCallable() {
+ @Override
+ public void call(PhoenixConnection conn) throws SQLException {
+ conn.setTransactionIsolation(level);
+ }});
+ super.setTransactionIsolation(level);
+ }
+
+ @Override
+ public void clearWarnings() throws SQLException {
+ call(new PhoenixConnectionCallable() {
+ @Override
+ public void call(PhoenixConnection conn) throws SQLException {
+ conn.clearWarnings();
+ }});
+ super.clearWarnings();
+ }
+
+ @Override
+ public Savepoint setSavepoint() throws SQLException {
+ throw new SQLFeatureNotSupportedException();
+ }
+
+ @Override
+ public Savepoint setSavepoint(String name) throws SQLException {
+ throw new SQLFeatureNotSupportedException();
+ }
+
+ @Override
+ public void rollback(final Savepoint savepoint) throws SQLException {
+ throw new SQLFeatureNotSupportedException();
+ }
+
+ @Override
+ public void releaseSavepoint(Savepoint savepoint) throws SQLException {
+ throw new SQLFeatureNotSupportedException();
+ }
+
+ public void setAutoCommit(final boolean isAutoCommit) throws SQLException {
+ call(new PhoenixConnectionCallable() {
+ @Override
+ public void call(PhoenixConnection conn) throws SQLException {
+ conn.setAutoCommit(isAutoCommit);;
+ }});
+ }
+
+ public void commit() throws SQLException {
+ call(new PhoenixConnectionCallable() {
+ @Override
+ public void call(PhoenixConnection conn) throws SQLException {
+ conn.commit();
+ }});
+ }
+
+ public void close() throws SQLException {
+ call(new PhoenixConnectionCallable() {
+ @Override
+ public void call(PhoenixConnection conn) throws SQLException {
+ conn.close();
+ }});
+ super.close();
+ }
+
+ private void call(PhoenixConnectionCallable callable) throws SQLException {
+ for (String subSchemaName : getRootSchema().getSubSchemaNames()) {
+ try {
+ PhoenixSchema phoenixSchema = getRootSchema()
+ .getSubSchema(subSchemaName).unwrap(PhoenixSchema.class);
+ callable.call(phoenixSchema.pc);
+ } catch (ClassCastException e) {
+ }
+ }
+ }
+
+ private static interface PhoenixConnectionCallable {
+ void call(PhoenixConnection conn) throws SQLException;
+ }
+
+ @Override
+ public DatabaseMetaData getMetaData() throws SQLException {
+ PhoenixConnection pc = getPhoenixConnection(getRootSchema());
+ if(pc != null) {
+ return pc.getMetaData();
+ }
+ return super.getMetaData();
+ }
+
+ @Override
+ public Properties getClientInfo() throws SQLException {
+ PhoenixConnection pc = getPhoenixConnection(getRootSchema());
+ if(pc != null) {
+ return pc.getClientInfo();
+ }
+ return super.getClientInfo();
+ }
+
+ @Override
+ public String getClientInfo(String name) throws SQLException {
+ PhoenixConnection pc = getPhoenixConnection(getRootSchema());
+ if(pc != null) {
+ return pc.getClientInfo(name);
+ }
+ return super.getClientInfo(name);
+ }
+
+ private PhoenixConnection getPhoenixConnection(SchemaPlus rootSchema) {
+ for (String subSchemaName : getRootSchema().getSubSchemaNames()) {
+ try {
+ PhoenixSchema phoenixSchema =
+ getRootSchema().getSubSchema(subSchemaName).unwrap(PhoenixSchema.class);
+ return phoenixSchema.pc;
+ } catch (ClassCastException e) {
+ }
+ }
+ return null;
+ }
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T> T unwrap(Class<T> iface) throws SQLException {
+ if (iface.isInstance(this)) {
+ return (T) this;
+ }
+
+ if (iface.isAssignableFrom(PhoenixConnection.class)) {
+ SchemaPlus schema = getRootSchema().getSubSchema(this.getSchema());
+ try {
+ return (T) (schema.unwrap(PhoenixSchema.class).pc);
+ } catch (ClassCastException e) {
+ }
+ }
+
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.CLASS_NOT_UNWRAPPABLE)
+ .setMessage(this.getClass().getName() + " not unwrappable from " + iface.getName())
+ .build().buildException();
+ }
+ }
+
+ private static class PhoenixCalciteStatement extends CalciteStatement {
+ public PhoenixCalciteStatement(PhoenixCalciteConnection connection,
+ Meta.StatementHandle h, int resultSetType, int resultSetConcurrency,
+ int resultSetHoldability) {
+ super(connection, h, resultSetType, resultSetConcurrency,
+ resultSetHoldability);
+ }
+
+ @Override
+ public boolean execute(String sql) throws SQLException {
+ try {
+ return super.execute(sql);
+ } catch (SQLException e) {
+ throw CalciteUtils.unwrapSqlException(e);
+ }
+ }
+
+ @Override
+ public ResultSet executeQuery(String sql) throws SQLException{
+ try {
+ return super.executeQuery(sql);
+ } catch (SQLException e) {
+ throw CalciteUtils.unwrapSqlException(e);
+ }
+ }
+ }
+
+ private static class PhoenixCalcitePreparedStatement extends CalcitePreparedStatement {
+ @SuppressWarnings("rawtypes")
+ PhoenixCalcitePreparedStatement(PhoenixCalciteConnection connection,
+ Meta.StatementHandle h, CalcitePrepare.CalciteSignature signature,
+ int resultSetType, int resultSetConcurrency, int resultSetHoldability)
+ throws SQLException {
+ super(connection, h, signature, resultSetType, resultSetConcurrency,
+ resultSetHoldability);
+ }
+
+ @Override
+ public boolean execute(String sql) throws SQLException {
+ try {
+ return super.execute(sql);
+ } catch (SQLException e) {
+ throw CalciteUtils.unwrapSqlException(e);
+ }
+ }
+
+ @Override
+ public ResultSet executeQuery(String sql) throws SQLException{
+ try {
+ return super.executeQuery(sql);
+ } catch (SQLException e) {
+ throw CalciteUtils.unwrapSqlException(e);
+ }
+ }
+
- public void setTimestamp(int parameterIndex, Timestamp x, Calendar calendar)
- throws SQLException {
- if (x != null) {
- x = new Timestamp(getAdjustedTime(x.getTime(), calendar));
- }
- super.setTimestamp(parameterIndex, x, calendar);
- }
-
- public void setDate(int parameterIndex, Date x, Calendar calendar)
- throws SQLException {
- if (x != null) {
- x = new Date(getAdjustedTime(x.getTime(), calendar));
- }
- super.setDate(parameterIndex, x, calendar);
- }
-
- public void setTime(int parameterIndex, Time x, Calendar calendar)
- throws SQLException {
- if (x != null) {
- x = new Time(getAdjustedTime(x.getTime(), calendar));
- }
- super.setTime(parameterIndex, x, calendar);
- }
-
- private long getAdjustedTime(long v, Calendar calendar) {
- return (v - calendar.getTimeZone().getOffset(v));
- }
-
+ public void setRowId(
+ int parameterIndex,
+ RowId x) throws SQLException {
+ getSite(parameterIndex).setRowId(x);
+ }
+
+ public void setNString(
+ int parameterIndex, String value) throws SQLException {
+ getSite(parameterIndex).setNString(value);
+ }
+
+ public void setNCharacterStream(
+ int parameterIndex,
+ Reader value,
+ long length) throws SQLException {
+ getSite(parameterIndex)
+ .setNCharacterStream(value, length);
+ }
+
+ public void setNClob(
+ int parameterIndex,
+ NClob value) throws SQLException {
+ getSite(parameterIndex).setNClob(value);
+ }
+
+ public void setClob(
+ int parameterIndex,
+ Reader reader,
+ long length) throws SQLException {
+ getSite(parameterIndex)
+ .setClob(reader, length);
+ }
+
+ public void setBlob(
+ int parameterIndex,
+ InputStream inputStream,
+ long length) throws SQLException {
+ getSite(parameterIndex)
+ .setBlob(inputStream, length);
+ }
+
+ public void setNClob(
+ int parameterIndex,
+ Reader reader,
+ long length) throws SQLException {
+ getSite(parameterIndex).setNClob(reader, length);
+ }
+
+ public void setSQLXML(
+ int parameterIndex, SQLXML xmlObject) throws SQLException {
+ getSite(parameterIndex).setSQLXML(xmlObject);
+ }
+
+ public void setAsciiStream(
+ int parameterIndex,
+ InputStream x,
+ long length) throws SQLException {
+ getSite(parameterIndex)
+ .setAsciiStream(x, length);
+ }
+
+ public void setBinaryStream(
+ int parameterIndex,
+ InputStream x,
+ long length) throws SQLException {
+ getSite(parameterIndex)
+ .setBinaryStream(x, length);
+ }
+
+ public void setCharacterStream(
+ int parameterIndex,
+ Reader reader,
+ long length) throws SQLException {
+ getSite(parameterIndex)
+ .setCharacterStream(reader, length);
+ }
+
+ public void setAsciiStream(
+ int parameterIndex, InputStream x) throws SQLException {
+ getSite(parameterIndex).setAsciiStream(x);
+ }
+
+ public void setBinaryStream(
+ int parameterIndex, InputStream x) throws SQLException {
+ getSite(parameterIndex).setBinaryStream(x);
+ }
+
+ public void setCharacterStream(
+ int parameterIndex, Reader reader) throws SQLException {
+ getSite(parameterIndex)
+ .setCharacterStream(reader);
+ }
+
+ public void setNCharacterStream(
+ int parameterIndex, Reader value) throws SQLException {
+ getSite(parameterIndex)
+ .setNCharacterStream(value);
+ }
+
+ public void setClob(
+ int parameterIndex,
+ Reader reader) throws SQLException {
+ getSite(parameterIndex).setClob(reader);
+ }
+
+ public void setBlob(
+ int parameterIndex, InputStream inputStream) throws SQLException {
+ getSite(parameterIndex)
+ .setBlob(inputStream);
+ }
+
+ public void setNClob(
+ int parameterIndex, Reader reader) throws SQLException {
+ getSite(parameterIndex)
+ .setNClob(reader);
+ }
+ }
+
+ /** Implementation of database metadata for JDBC 4.1. */
+ private static class PhoenixCalciteDatabaseMetaData
+ extends AvaticaDatabaseMetaData {
+ PhoenixCalciteDatabaseMetaData(PhoenixCalciteConnection connection) {
+ super(connection);
+ }
+ }
+
+ private static class DelegateCalciteConnectionConfig implements CalciteConnectionConfig {
+ private final CalciteConnectionConfig delegate;
+
+ DelegateCalciteConnectionConfig(CalciteConnectionConfig delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public String authentication() {
+ return delegate.authentication();
+ }
+
+ @Override
+ public String avaticaPassword() {
+ return delegate.avaticaPassword();
+ }
+
+ @Override
+ public String avaticaUser() {
+ return delegate.avaticaUser();
+ }
+
+ @Override
+ public Factory factory() {
+ return delegate.factory();
+ }
+
+ @Override
+ public String httpClientClass() {
+ return delegate.httpClientClass();
+ }
+
+ @Override
+ public AvaticaHttpClientFactory httpClientFactory() {
+ return delegate.httpClientFactory();
+ }
+
+ @Override
+ public File kerberosKeytab() {
+ return delegate.kerberosKeytab();
+ }
+
+ @Override
+ public String kerberosPrincipal() {
+ return delegate.kerberosPrincipal();
+ }
+
+ @Override
+ public String schema() {
+ return delegate.schema();
+ }
+
+ @Override
+ public String serialization() {
+ return delegate.serialization();
+ }
+
+ @Override
+ public String timeZone() {
+ return delegate.timeZone();
+ }
+
+ @Override
+ public String url() {
+ return delegate.url();
+ }
+
+ @Override
+ public boolean autoTemp() {
+ return delegate.autoTemp();
+ }
+
+ @Override
+ public boolean materializationsEnabled() {
+ return delegate.materializationsEnabled();
+ }
+
+ @Override
+ public boolean createMaterializations() {
+ return delegate.createMaterializations();
+ }
+
+ @Override
+ public NullCollation defaultNullCollation() {
+ return delegate.defaultNullCollation();
+ }
+
+ @Override
+ public <T> T fun(Class<T> operatorTableClass, T defaultOperatorTable) {
+ return delegate.fun(operatorTableClass, defaultOperatorTable);
+ }
+
+ @Override
+ public String model() {
+ return delegate.model();
+ }
+
+ @Override
+ public Lex lex() {
+ return delegate.lex();
+ }
+
+ @Override
+ public Quoting quoting() {
+ return delegate.quoting();
+ }
+
+ @Override
+ public Casing unquotedCasing() {
+ return delegate.unquotedCasing();
+ }
+
+ @Override
+ public Casing quotedCasing() {
+ return delegate.quotedCasing();
+ }
+
+ @Override
+ public boolean caseSensitive() {
+ return delegate.caseSensitive();
+ }
+
+ @Override
+ public <T> T parserFactory(Class<T> parserFactoryClass, T defaultParserFactory) {
+ return delegate.parserFactory(parserFactoryClass, defaultParserFactory);
+ }
+
+ @Override
+ public <T> T schemaFactory(Class<T> schemaFactoryClass, T defaultSchemaFactory) {
+ return delegate.schemaFactory(schemaFactoryClass, defaultSchemaFactory);
+ }
+
+ @Override
+ public Type schemaType() {
+ return delegate.schemaType();
+ }
+
+ @Override
+ public boolean spark() {
+ return delegate.spark();
+ }
+
+ @Override
+ public boolean forceDecorrelate() {
+ return delegate.forceDecorrelate();
+ }
+
+ @Override
+ public <T> T typeSystem(Class<T> typeSystemClass, T defaultTypeSystem) {
+ return delegate.typeSystem(typeSystemClass, defaultTypeSystem);
+ }
+
+ @Override
+ public SqlConformance conformance() {
+ return delegate.conformance();
+ }
+
+ @Override
+ public boolean approximateDistinctCount() {
+ return delegate.approximateDistinctCount();
+ }
+
+ @Override
+ public boolean approximateTopN() {
+ return delegate.approximateTopN();
+ }
+
+ @Override
+ public boolean approximateDecimal() {
+ return delegate.approximateDecimal();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/db78bd6f/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java
----------------------------------------------------------------------
diff --cc phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java
index d88f226,0000000..315556a
mode 100644,000000..100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java
@@@ -1,579 -1,0 +1,579 @@@
+package org.apache.phoenix.calcite;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.ArrayListMultimap;
+
+import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.materialize.MaterializationService;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.*;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.impl.TableFunctionImpl;
+import org.apache.calcite.schema.impl.ViewTable;
+import org.apache.calcite.sql.ListJarsTable;
+import org.apache.phoenix.compile.ColumnResolver;
+import org.apache.phoenix.compile.FromCompiler;
+import org.apache.phoenix.compile.SequenceManager;
+import org.apache.phoenix.expression.function.FunctionExpression;
+import org.apache.phoenix.expression.function.UDFExpression;
+import org.apache.phoenix.expression.function.ByteBasedRegexpSplitFunction;
+import org.apache.phoenix.expression.function.ByteBasedRegexpSubstrFunction;
+import org.apache.phoenix.expression.function.ByteBasedRegexpReplaceFunction;
+import org.apache.phoenix.expression.function.StringBasedRegexpSplitFunction;
+import org.apache.phoenix.expression.function.StringBasedRegexpSubstrFunction;
+import org.apache.phoenix.expression.function.StringBasedRegexpReplaceFunction;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.parse.ColumnDef;
+import org.apache.phoenix.parse.NamedTableNode;
+import org.apache.phoenix.parse.PFunction;
+import org.apache.phoenix.parse.SequenceValueParseNode;
+import org.apache.phoenix.parse.TableName;
+import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction;
+import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunctionInfo;
+import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunctionArgInfo;
+import org.apache.phoenix.parse.FunctionParseNode.FunctionClassType;
+import org.apache.phoenix.parse.ParseNodeFactory;
+import org.apache.phoenix.parse.PFunction.FunctionArgument;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.MetaDataClient;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.schema.PTable.ViewType;
+import org.apache.phoenix.schema.PTableImpl;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.Sequence;
+import org.apache.phoenix.schema.TableNotFoundException;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PDataTypeFactory;
+import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.SchemaUtil;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Implementation of Calcite's {@link Schema} SPI for Phoenix.
+ *
+ * TODO
+ * 1) change this to non-caching mode??
+ * 2) how to deal with define indexes and views since they require a CalciteSchema
+ * instance??
+ *
+ */
+public class PhoenixSchema implements Schema {
+ public static final Factory FACTORY = new Factory();
+
+ public final PhoenixConnection pc;
+
+ protected final String name;
+ protected final String schemaName;
+ protected final SchemaPlus parentSchema;
+ protected final MetaDataClient client;
+ protected final SequenceManager sequenceManager;
+ protected final Map<String, Schema> subSchemas;
+ protected final Map<String, Table> tables;
+ protected final Map<String, Function> views;
+ protected final Set<TableRef> viewTables;
+ protected final UDFExpression exp = new UDFExpression();
+ private final static Function listJarsFunction = TableFunctionImpl
+ .create(ListJarsTable.LIST_JARS_TABLE_METHOD);
+ private final Multimap<String, Function> builtinFunctions = ArrayListMultimap.create();
+ private RelDataTypeFactory typeFactory;
+
+ protected PhoenixSchema(String name, String schemaName,
+ SchemaPlus parentSchema, PhoenixConnection pc) {
+ this.name = name;
+ this.schemaName = schemaName;
+ this.parentSchema = parentSchema;
+ this.pc = pc;
+ this.client = new MetaDataClient(pc);
+ this.subSchemas = Maps.newHashMap();
+ this.tables = Maps.newHashMap();
+ this.views = Maps.newHashMap();
+ this.views.put("ListJars", listJarsFunction);
+ this.viewTables = Sets.newHashSet();
+ try {
+ PhoenixStatement stmt = (PhoenixStatement) pc.createStatement();
+ this.sequenceManager = new SequenceManager(stmt);
+ } catch (SQLException e){
+ throw new RuntimeException(e);
+ }
+ registerBuiltinFunctions();
+ }
+
+ protected PhoenixSchema(String name, String schemaName,
+ SchemaPlus parentSchema, PhoenixConnection pc, RelDataTypeFactory typeFactory) {
+ this(name, schemaName, parentSchema, pc);
+ this.setTypeFactory(typeFactory);
+ }
+
+ private static Schema create(SchemaPlus parentSchema,
+ String name, Map<String, Object> operand) {
+ String url = (String) operand.get("url");
+ final Properties properties = new Properties();
+ for (Map.Entry<String, Object> entry : operand.entrySet()) {
+ properties.setProperty(entry.getKey(), String.valueOf(entry.getValue()));
+ }
+ try {
+ Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
+ final Connection connection =
+ DriverManager.getConnection(url, properties);
+ final PhoenixConnection phoenixConnection =
+ connection.unwrap(PhoenixConnection.class);
+ return new PhoenixSchema(name, null, parentSchema, phoenixConnection);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+
+ /**
+ * Registering Phoenix Builtin Functions in Calcite
+ *
+ * Registration of functions is coordinated from registerBuiltinFunctions() which builds the builtinFunctions map.
+ * The helper functions use mechanisms to convert Phoenix function information for use by Calcite.
+ *
+ * The builtinFunctions map is ultimately used by Calcite during validation and planning
+ * PhoenixSchema.getFunctions() - Matches function signatures during validation
+ * CalciteUtils.EXPRESSION_MAP - Maps a RexNode onto a builtin function
+ */
+ private void registerBuiltinFunctions(){
+ if(!builtinFunctions.isEmpty()) {
+ return;
+ }
+ final boolean useByteBasedRegex =
+ pc.getQueryServices().getProps().getBoolean(QueryServices.USE_BYTE_BASED_REGEX_ATTRIB,
+ QueryServicesOptions.DEFAULT_USE_BYTE_BASED_REGEX);
+ final List<String> ignoredRegexFunctions = useByteBasedRegex ?
+ Lists.newArrayList(
+ StringBasedRegexpReplaceFunction.class.getName(),
+ StringBasedRegexpSplitFunction.class.getName(),
+ StringBasedRegexpSubstrFunction.class.getName()) :
+ Lists.newArrayList(
+ ByteBasedRegexpReplaceFunction.class.getName(),
+ ByteBasedRegexpSubstrFunction.class.getName(),
+ ByteBasedRegexpSplitFunction.class.getName());
+
+ Multimap<String, BuiltInFunctionInfo> infoMap = ParseNodeFactory.getBuiltInFunctionMultimap();
+ List<BuiltInFunctionInfo> aliasFunctions = Lists.newArrayList();
+ for (BuiltInFunctionInfo info : infoMap.values()) {
+ //TODO: Support aggregate functions
+ if(!CalciteUtils.TRANSLATED_BUILT_IN_FUNCTIONS.contains(info.getName()) && !info.isAggregate()) {
+ if (info.getClassType() == FunctionClassType.ALIAS) {
+ aliasFunctions.add(info);
+ continue;
+ }
+ if(ignoredRegexFunctions.contains(info.getFunc().getName())){
+ continue;
+ }
+ builtinFunctions.putAll(info.getName(), convertBuiltinFunction(info));
+ }
+ }
+ // Single depth alias functions only
+ for(BuiltInFunctionInfo info : aliasFunctions) {
+ // Point the alias function to its derived functions
+ for (Class<? extends FunctionExpression> func : info.getDerivedFunctions()) {
+ BuiltInFunction d = func.getAnnotation(BuiltInFunction.class);
+ Collection<Function> targetFunction = builtinFunctions.get(d.name());
+
+ // Target function not implemented
+ if(targetFunction.isEmpty()) {
+ for(BuiltInFunctionInfo derivedInfo : infoMap.get(d.name())) {
+ targetFunction.addAll(convertBuiltinFunction(derivedInfo));
+ }
+ }
+ builtinFunctions.putAll(info.getName(), targetFunction);
+ }
+ }
+ }
+
+ // Converts phoenix function information to a list of Calcite function signatures
+ private static List<PhoenixScalarFunction> convertBuiltinFunction(BuiltInFunctionInfo functionInfo){
+ List<List<FunctionArgument>> overloadedArgs = PhoenixSchema.overloadArguments(functionInfo.getArgs());
+ List<PhoenixScalarFunction> functionList = Lists.newArrayListWithExpectedSize(overloadedArgs.size());
+ Class<? extends FunctionExpression> clazz = functionInfo.getFunc();
+
+ try {
+ for (List<FunctionArgument> argumentList : overloadedArgs) {
+ List<FunctionParameter> parameters = Lists.newArrayListWithExpectedSize(argumentList.size());
+ PDataType returnType = evaluateReturnType(clazz, argumentList);
+
+ for (final FunctionArgument arg : argumentList) {
+ parameters.add(
+ new FunctionParameter() {
+ public int getOrdinal() {
+ return arg.getArgPosition();
+ }
+
+ public String getName() {
+ return "arg" + arg.getArgPosition();
+ }
+
+ @SuppressWarnings("rawtypes")
+ public RelDataType getType(RelDataTypeFactory typeFactory) {
+ PDataType dataType =
+ arg.isArrayType() ? PDataType.fromTypeId(PDataType.sqlArrayType(SchemaUtil
+ .normalizeIdentifier(SchemaUtil.normalizeIdentifier(arg
+ .getArgumentType())))) : PDataType.fromSqlTypeName(SchemaUtil
+ .normalizeIdentifier(arg.getArgumentType()));
+ return typeFactory.createJavaType(dataType.getJavaClass());
+ }
+
+ public boolean isOptional() {
+ return arg.getDefaultValue() != null;
+ }
+ });
+ }
+ functionList.add(new PhoenixScalarFunction(functionInfo, parameters, returnType));
+ }
+ } catch (Exception e){
+ throw new RuntimeException("Builtin function " + functionInfo.getName() + " could not be registered", e);
+ }
+ return functionList;
+ }
+
+ // Dynamically evaluates the return type of a built in function given a list of input arguments
+ private static PDataType evaluateReturnType(Class<? extends FunctionExpression> f, List<FunctionArgument> argumentList) {
+ BuiltInFunction d = f.getAnnotation(BuiltInFunction.class);
+ try {
+ // Direct evaluation of the return type
+ FunctionExpression func = f.newInstance();
+ return func.getDataType();
+ } catch (Exception e) {
+ if (d.classType() == FunctionClassType.ALIAS || d.classType() == FunctionClassType.ABSTRACT) {
+ // should never happen
+ throw new RuntimeException();
+ }
+ // Grab the primary argument
+ assert(argumentList.size() != 0);
+ return PDataType.fromSqlTypeName(argumentList.get(0).getArgumentType());
+ }
+ }
+
+ // Using Phoenix argument information, determine all possible function signatures
+ private static List<List<PFunction.FunctionArgument>> overloadArguments(BuiltInFunctionArgInfo[] args){
+ List<List<PFunction.FunctionArgument>> overloadedArgs = Lists.newArrayList();
+ int solutions = 1;
+ for(int i = 0; i < args.length; solutions *= args[i].getAllowedTypes().length, i++);
+ for(int i = 0; i < solutions; i++) {
+ int j = 1;
+ short k = 0;
+ overloadedArgs.add(new ArrayList<PFunction.FunctionArgument>());
+ for(BuiltInFunctionArgInfo arg : args) {
+ Class<? extends PDataType>[] temp = arg.getAllowedTypes();
+ PDataType dataType = PDataTypeFactory.getInstance().instanceFromClass(temp[(i/j)%temp.length]);
+ overloadedArgs.get(i).add( new PFunction.FunctionArgument(
+ dataType.toString(),
+ dataType.isArrayType(),
+ arg.isConstant(),
+ arg.getDefaultValue(),
+ arg.getMinValue(),
+ arg.getMaxValue(),
+ k));
+ k++;
+ j *= temp.length;
+ }
+ }
+ return overloadedArgs;
+ }
+
+ @Override
+ public Table getTable(String name) {
+ Table table = tables.get(name);
+ if (table != null) {
+ return table;
+ }
+
+ try {
+ ColumnResolver x = FromCompiler.getResolver(
+ NamedTableNode.create(
+ null,
+ TableName.create(schemaName, name),
+ ImmutableList.<ColumnDef>of()), pc);
+ final List<TableRef> tables = x.getTables();
+ assert tables.size() == 1;
+ TableRef tableRef = tables.get(0);
+ if (!isView(tableRef.getTable())) {
+ tableRef = fixTableMultiTenancy(tableRef);
+ table = new PhoenixTable(pc, tableRef, getTypeFactory());
+ }
+ } catch (TableNotFoundException e) {
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+
+ if (table == null) {
+ table = resolveSequence(name);
+ }
+
+ if (table != null) {
+ tables.put(name, table);
+ }
+ return table;
+ }
+
+ @Override
+ public Set<String> getTableNames() {
+ return tables.keySet();
+ }
+
+ @Override
+ public Collection<Function> getFunctions(String name) {
+ assert(!builtinFunctions.isEmpty());
+ if(!builtinFunctions.get(name).isEmpty()){
+ return builtinFunctions.get(name);
+ }
+ Function func = views.get(name);
+ if (func != null) {
+ return ImmutableList.of(func);
+ }
+ try {
+ List<String> functionNames = new ArrayList<String>(1);
+ functionNames.add(name);
+ ColumnResolver resolver = FromCompiler.getResolverForFunction(pc, functionNames);
+ List<PFunction> pFunctions = resolver.getFunctions();
+ assert !pFunctions.isEmpty();
+ List<Function> funcs = new ArrayList<Function>(pFunctions.size());
+ for (PFunction pFunction : pFunctions) {
+ funcs.add(new PhoenixScalarFunction(pFunction));
+ }
+ return ImmutableList.copyOf(funcs);
+ } catch (SQLException e) {
+ }
+ try {
+ ColumnResolver x = FromCompiler.getResolver(
+ NamedTableNode.create(
+ null,
+ TableName.create(schemaName, name),
+ ImmutableList.<ColumnDef>of()), pc);
+ final List<TableRef> tables = x.getTables();
+ assert tables.size() == 1;
+ final TableRef tableRef = tables.get(0);
+ final PTable pTable = tableRef.getTable();
+ if (isView(pTable)) {
+ String viewSql = pTable.getViewStatement();
+ if (viewSql == null) {
+ viewSql = "select * from "
+ + SchemaUtil.getEscapedFullTableName(
+ pTable.getPhysicalName().getString());
+ }
+ SchemaPlus schema = parentSchema.getSubSchema(this.name);
+ SchemaPlus viewSqlSchema =
+ this.schemaName == null ? schema : parentSchema;
+ func = ViewTable.viewMacro(schema, viewSql,
+ CalciteSchema.from(viewSqlSchema).path(null),
+ null, pTable.getViewType() == ViewType.UPDATABLE);
+ views.put(name, func);
+ viewTables.add(tableRef);
+ }
+ } catch (TableNotFoundException e) {
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+
+ return func == null ? Collections.<Function>emptyList() : ImmutableList.of(func);
+ }
+
+ @Override
+ public Set<String> getFunctionNames() {
+ return views.keySet();
+ }
+
+ @Override
+ public Schema getSubSchema(String name) {
+ if (schemaName != null) {
+ return null;
+ }
+
+ Schema schema = subSchemas.get(name);
+ if (schema != null) {
+ return schema;
+ }
+
+ //TODO We should call FromCompiler.getResolverForSchema() here after
+ // all schemas are required to be explicitly created.
+ if (getTable(name) != null || !getFunctions(name).isEmpty()) {
+ return null;
+ }
+ schema = new PhoenixSchema(name, name, parentSchema.getSubSchema(this.name), pc, typeFactory);
+ subSchemas.put(name, schema);
+ return schema;
+ }
+
+ @Override
+ public Set<String> getSubSchemaNames() {
+ return subSchemas.keySet();
+ }
+
+ @Override
+ public Expression getExpression(SchemaPlus parentSchema, String name) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isMutable() {
+ return true;
+ }
+
+ @Override
+ public boolean contentsHaveChangedSince(long lastCheck, long now) {
+ return lastCheck != now;
+ }
+
+ public void clear() {
+ tables.clear();
+ views.clear();
+ this.views.put("ListJars", listJarsFunction);
+ viewTables.clear();
+ }
+
+ public void defineIndexesAsMaterializations() {
+ SchemaPlus schema = parentSchema.getSubSchema(this.name);
+ SchemaPlus viewSqlSchema =
+ this.schemaName == null ? schema : parentSchema;
+ CalciteSchema calciteSchema = CalciteSchema.from(schema);
+ List<String> path = CalciteSchema.from(viewSqlSchema).path(null);
+ try {
+ List<PhoenixTable> phoenixTables = Lists.newArrayList();
+ for (Table table : tables.values()) {
+ if (table instanceof PhoenixTable) {
+ phoenixTables.add((PhoenixTable) table);
+ }
+ }
+ for (PhoenixTable phoenixTable : phoenixTables) {
+ TableRef tableRef = phoenixTable.tableMapping.getTableRef();
+ for (PTable index : tableRef.getTable().getIndexes()) {
+ TableRef indexTableRef = new TableRef(null, index,
+ tableRef.getTimeStamp(), tableRef.getLowerBoundTimeStamp(),
+ false);
+ addMaterialization(indexTableRef, path, calciteSchema);
+ }
+ }
+ for (TableRef tableRef : viewTables) {
+ final PTable pTable = tableRef.getTable();
+ for (PTable index : pTable.getIndexes()) {
+ if (index.getParentName().equals(pTable.getName())) {
+ TableRef indexTableRef = new TableRef(null, index,
+ tableRef.getTimeStamp(), tableRef.getLowerBoundTimeStamp(),
+ false);
+ addMaterialization(indexTableRef, path, calciteSchema);
+ }
+ }
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void addMaterialization(TableRef indexTableRef, List<String> path,
+ CalciteSchema calciteSchema) throws SQLException {
+ indexTableRef = fixTableMultiTenancy(indexTableRef);
+ final PhoenixTable table = new PhoenixTable(pc, indexTableRef, getTypeFactory());
+ final PTable index = indexTableRef.getTable();
+ tables.put(index.getTableName().getString(), table);
+ StringBuffer sb = new StringBuffer();
+ sb.append("SELECT");
+ for (PColumn column : table.getColumns()) {
+ String indexColumnName = column.getName().getString();
+ String dataColumnName = IndexUtil.getDataColumnName(indexColumnName);
+ sb.append(",").append(SchemaUtil.getEscapedFullColumnName(dataColumnName));
+ sb.append(" ").append(SchemaUtil.getEscapedFullColumnName(indexColumnName));
+ }
+ sb.setCharAt(6, ' '); // replace first comma with space.
+ sb.append(" FROM ").append(SchemaUtil.getEscapedFullTableName(index.getParentName().getString()));
+ MaterializationService.instance().defineMaterialization(
+ calciteSchema, null, sb.toString(), path, index.getTableName().getString(), true, true);
+ }
+
+ private boolean isView(PTable table) {
+ return table.getType() == PTableType.VIEW
+ && table.getViewType() != ViewType.MAPPED;
+ }
+
+ private TableRef fixTableMultiTenancy(TableRef tableRef) throws SQLException {
+ if (pc.getTenantId() != null || !tableRef.getTable().isMultiTenant()) {
+ return tableRef;
+ }
+ PTable table = tableRef.getTable();
+ table = PTableImpl.makePTable(
+ table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), table.getTimeStamp(),
+ table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), PTableImpl.getColumnsToClone(table), table.getParentSchemaName(), table.getParentTableName(),
+ table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
+ table.isWALDisabled(), false, table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
+ table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getBaseColumnCount(), table.getIndexDisableTimestamp(),
- table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema());
++ table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter());
+ return new TableRef(null, table, tableRef.getTimeStamp(),
+ tableRef.getLowerBoundTimeStamp(), tableRef.hasDynamicCols());
+ }
+
+ private PhoenixSequence resolveSequence(String name) {
+ try {
+ sequenceManager.newSequenceReference(pc.getTenantId(),
+ TableName.createNormalized(schemaName, name) ,
+ null, SequenceValueParseNode.Op.NEXT_VALUE);
+ sequenceManager.validateSequences(Sequence.ValueOp.VALIDATE_SEQUENCE);
+ } catch (SQLException e){
+ return null;
+ } finally {
+ sequenceManager.reset();
+ }
+
+ return new PhoenixSequence(schemaName, name, pc);
+ }
+
+ public RelDataTypeFactory getTypeFactory() {
+ return typeFactory;
+ }
+
+ public void setTypeFactory(RelDataTypeFactory typeFactory) {
+ this.typeFactory = typeFactory;
+ }
+
+ /** Schema factory that creates a
+ * {@link org.apache.phoenix.calcite.PhoenixSchema}.
+ * This allows you to create a Phoenix schema inside a model.json file.
+ *
+ * <pre>{@code
+ * {
+ * version: '1.0',
+ * defaultSchema: 'HR',
+ * schemas: [
+ * {
+ * name: 'HR',
+ * type: 'custom',
+ * factory: 'org.apache.phoenix.calcite.PhoenixSchema.Factory',
+ * operand: {
+ * url: "jdbc:phoenix:localhost",
+ * user: "scott",
+ * password: "tiger"
+ * }
+ * }
+ * ]
+ * }
+ * }</pre>
+ */
+ public static class Factory implements SchemaFactory {
+ public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> operand) {
+ return PhoenixSchema.create(parentSchema, name, operand);
+ }
+ }
+}