You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tl...@apache.org on 2021/03/22 09:23:53 UTC
[ignite] branch sql-calcite updated: IGNITE-14336 Calcite: fix
add/drop columns schema changes
This is an automated email from the ASF dual-hosted git repository.
tledkov pushed a commit to branch sql-calcite
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/sql-calcite by this push:
new 905eaa8 IGNITE-14336 Calcite: fix add/drop columns schema changes
905eaa8 is described below
commit 905eaa8d24c38a23404d1deb71b120dfa66556ee
Author: korlov42 <ko...@gridgain.com>
AuthorDate: Mon Mar 22 12:23:40 2021 +0300
IGNITE-14336 Calcite: fix add/drop columns schema changes
---
.../query/calcite/prepare/QueryPlanCacheImpl.java | 18 ++--
.../query/calcite/schema/SchemaHolderImpl.java | 21 ++--
.../query/calcite/CalciteQueryProcessorTest.java | 109 ++++++++++++++++++++-
.../query/schema/SchemaChangeListener.java | 26 +++--
.../processors/query/h2/SchemaManager.java | 75 +++++++++-----
5 files changed, 201 insertions(+), 48 deletions(-)
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCacheImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCacheImpl.java
index 53dec31..7a960d3 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCacheImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCacheImpl.java
@@ -96,37 +96,43 @@ public class QueryPlanCacheImpl extends AbstractService implements QueryPlanCach
}
/** {@inheritDoc} */
- @Override public void onSchemaDrop(String schemaName) {
+ @Override public void onSchemaDropped(String schemaName) {
clear();
}
/** {@inheritDoc} */
- @Override public void onSqlTypeDrop(String schemaName, GridQueryTypeDescriptor typeDescriptor) {
+ @Override public void onSqlTypeDropped(String schemaName, GridQueryTypeDescriptor typeDescriptor) {
clear();
}
/** {@inheritDoc} */
- @Override public void onIndexCreate(String schemaName, String tblName, String idxName,
+ @Override public void onIndexCreated(String schemaName, String tblName, String idxName,
GridQueryIndexDescriptor idxDesc, GridIndex<?> idx) {
clear();
}
/** {@inheritDoc} */
- @Override public void onIndexDrop(String schemaName, String tblName, String idxName) {
+ @Override public void onIndexDropped(String schemaName, String tblName, String idxName) {
clear();
}
/** {@inheritDoc} */
- @Override public void onSchemaCreate(String schemaName) {
+ @Override public void onSchemaCreated(String schemaName) {
// No-op
}
/** {@inheritDoc} */
- @Override public void onSqlTypeCreate(
+ @Override public void onSqlTypeCreated(
String schemaName,
GridQueryTypeDescriptor typeDesc,
GridCacheContextInfo<?, ?> cacheInfo
) {
// No-op
}
+
+ /** {@inheritDoc} */
+ @Override public void onSqlTypeUpdated(String schemaName, GridQueryTypeDescriptor typeDesc,
+ GridCacheContextInfo<?, ?> cacheInfo) {
+ clear();
+ }
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java
index e8caf29..71d7164 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java
@@ -158,19 +158,19 @@ public class SchemaHolderImpl extends AbstractService implements SchemaHolder, S
}
/** {@inheritDoc} */
- @Override public synchronized void onSchemaCreate(String schemaName) {
+ @Override public synchronized void onSchemaCreated(String schemaName) {
igniteSchemas.putIfAbsent(schemaName, new IgniteSchema(schemaName));
rebuild();
}
/** {@inheritDoc} */
- @Override public synchronized void onSchemaDrop(String schemaName) {
+ @Override public synchronized void onSchemaDropped(String schemaName) {
igniteSchemas.remove(schemaName);
rebuild();
}
/** {@inheritDoc} */
- @Override public synchronized void onSqlTypeCreate(
+ @Override public synchronized void onSqlTypeCreated(
String schemaName,
GridQueryTypeDescriptor typeDesc,
GridCacheContextInfo<?, ?> cacheInfo
@@ -187,6 +187,15 @@ public class SchemaHolderImpl extends AbstractService implements SchemaHolder, S
rebuild();
}
+ /** {@inheritDoc} */
+ @Override public void onSqlTypeUpdated(
+ String schemaName,
+ GridQueryTypeDescriptor typeDesc,
+ GridCacheContextInfo<?, ?> cacheInfo
+ ) {
+ onSqlTypeCreated(schemaName, typeDesc, cacheInfo);
+ }
+
/** */
private static Object affinityIdentity(CacheConfiguration<?,?> ccfg) {
if (ccfg.getCacheMode() == CacheMode.PARTITIONED)
@@ -195,7 +204,7 @@ public class SchemaHolderImpl extends AbstractService implements SchemaHolder, S
}
/** {@inheritDoc} */
- @Override public synchronized void onSqlTypeDrop(
+ @Override public synchronized void onSqlTypeDropped(
String schemaName,
GridQueryTypeDescriptor typeDesc
) {
@@ -207,7 +216,7 @@ public class SchemaHolderImpl extends AbstractService implements SchemaHolder, S
}
/** {@inheritDoc} */
- @Override public synchronized void onIndexCreate(String schemaName, String tblName, String idxName,
+ @Override public synchronized void onIndexCreated(String schemaName, String tblName, String idxName,
GridQueryIndexDescriptor idxDesc, @Nullable GridIndex<?> gridIdx) {
IgniteSchema schema = igniteSchemas.get(schemaName);
assert schema != null;
@@ -249,7 +258,7 @@ public class SchemaHolderImpl extends AbstractService implements SchemaHolder, S
}
/** {@inheritDoc} */
- @Override public synchronized void onIndexDrop(String schemaName, String tblName, String idxName) {
+ @Override public synchronized void onIndexDropped(String schemaName, String tblName, String idxName) {
IgniteSchema schema = igniteSchemas.get(schemaName);
assert schema != null;
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index c1a1062..e5a94a9 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.query.calcite;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
@@ -42,8 +43,6 @@ import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.query.QueryEngine;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
-import org.apache.ignite.internal.processors.query.calcite.schema.TableDescriptor;
-import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
@@ -52,6 +51,9 @@ import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
/**
*
*/
@@ -849,6 +851,109 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
assertEquals(ImmutableIntList.of(3), tblMap.get("MY_TBL_2").descriptor().distribution().getKeys());
}
+ /**
+ * Verifies that table modification events are passed to a calcite schema modification listener.
+ */
+ @Test
+ public void testIgniteSchemaAwaresAlterTableCommand() {
+ String selectAllQry = "select * from test_tbl";
+
+ execute(client, "drop table if exists test_tbl");
+ execute(client, "create table test_tbl(id int primary key, val varchar)");
+
+ CalciteQueryProcessor qryProc = Commons.lookupComponent(client.context(), CalciteQueryProcessor.class);
+
+ {
+ List<String> names = deriveColumnNamesFromCursor(
+ qryProc.query(null, "PUBLIC", selectAllQry).get(0)
+ );
+
+ assertThat(names, equalTo(F.asList("ID", "VAL")));
+ }
+
+ {
+ execute(client, "alter table test_tbl add column new_col int");
+
+ List<String> names = deriveColumnNamesFromCursor(
+ qryProc.query(null, "PUBLIC", selectAllQry).get(0)
+ );
+
+ assertThat(names, equalTo(F.asList("ID", "VAL", "NEW_COL")));
+ }
+
+ {
+ try {
+ execute(client, "alter table test_tbl add column new_col int");
+ }
+ catch (Exception ignored) {
+ // it's expected because column with such name already exists
+ }
+
+ List<String> names = deriveColumnNamesFromCursor(
+ qryProc.query(null, "PUBLIC", selectAllQry).get(0)
+ );
+
+ assertThat(names, equalTo(F.asList("ID", "VAL", "NEW_COL")));
+ }
+
+ {
+ execute(client, "alter table test_tbl add column if not exists new_col int");
+
+ List<String> names = deriveColumnNamesFromCursor(
+ qryProc.query(null, "PUBLIC", selectAllQry).get(0)
+ );
+
+ assertThat(names, equalTo(F.asList("ID", "VAL", "NEW_COL")));
+ }
+
+ {
+ execute(client, "alter table test_tbl drop column new_col");
+
+ List<String> names = deriveColumnNamesFromCursor(
+ qryProc.query(null, "PUBLIC", selectAllQry).get(0)
+ );
+
+ assertThat(names, equalTo(F.asList("ID", "VAL")));
+ }
+
+ {
+ try {
+ execute(client, "alter table test_tbl drop column new_col");
+ }
+ catch (Exception ignored) {
+ // it's expected since the column was already removed
+ }
+
+ List<String> names = deriveColumnNamesFromCursor(
+ qryProc.query(null, "PUBLIC", selectAllQry).get(0)
+ );
+
+ assertThat(names, equalTo(F.asList("ID", "VAL")));
+ }
+
+ {
+ execute(client, "alter table test_tbl drop column if exists new_col");
+
+ List<String> names = deriveColumnNamesFromCursor(
+ qryProc.query(null, "PUBLIC", selectAllQry).get(0)
+ );
+
+ assertThat(names, equalTo(F.asList("ID", "VAL")));
+ }
+ }
+
+ /** */
+ private static List<String> deriveColumnNamesFromCursor(FieldsQueryCursor cursor) {
+ List<String> names = new ArrayList<>(cursor.getColumnsCount());
+
+ assertNotNull(cursor.getAll());
+
+ for (int i = 0; i < cursor.getColumnsCount(); i++)
+ names.add(cursor.getFieldName(i));
+
+ return names;
+ }
+
/** for test purpose only. */
public void testThroughput() {
IgniteCache<Integer, Developer> developer = client.getOrCreateCache(new CacheConfiguration<Integer, Developer>()
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaChangeListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaChangeListener.java
index 78f03aa..52c29bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaChangeListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaChangeListener.java
@@ -32,30 +32,40 @@ public interface SchemaChangeListener {
*
* @param schemaName Schema name.
*/
- void onSchemaCreate(String schemaName);
+ void onSchemaCreated(String schemaName);
/**
* Callback method.
*
* @param schemaName Schema name.
*/
- void onSchemaDrop(String schemaName);
+ void onSchemaDropped(String schemaName);
/**
* Callback method.
+ *
+ * @param schemaName Schema name.
+ * @param typeDesc Type descriptor.
+ * @param cacheInfo Cache info.
+ */
+ void onSqlTypeCreated(String schemaName, GridQueryTypeDescriptor typeDesc, GridCacheContextInfo<?, ?> cacheInfo);
+
+ /**
+ * Callback method.
+ *
* @param schemaName Schema name.
- * @param typeDesc type descriptor.
+ * @param typeDesc Type descriptor.
* @param cacheInfo Cache info.
*/
- void onSqlTypeCreate(String schemaName, GridQueryTypeDescriptor typeDesc, GridCacheContextInfo<?, ?> cacheInfo);
+ void onSqlTypeUpdated(String schemaName, GridQueryTypeDescriptor typeDesc, GridCacheContextInfo<?, ?> cacheInfo);
/**
* Callback method.
*
* @param schemaName Schema name.
- * @param typeDesc type descriptor.
+ * @param typeDesc Type descriptor.
*/
- void onSqlTypeDrop(String schemaName, GridQueryTypeDescriptor typeDesc);
+ void onSqlTypeDropped(String schemaName, GridQueryTypeDescriptor typeDesc);
/**
* Callback on index creation.
@@ -66,7 +76,7 @@ public interface SchemaChangeListener {
* @param idxDesc Index descriptor.
* @param idx Index.
*/
- void onIndexCreate(String schemaName, String tblName, String idxName, GridQueryIndexDescriptor idxDesc, @Nullable GridIndex<?> idx);
+ void onIndexCreated(String schemaName, String tblName, String idxName, GridQueryIndexDescriptor idxDesc, @Nullable GridIndex<?> idx);
/**
* Callback on index drop.
@@ -75,5 +85,5 @@ public interface SchemaChangeListener {
* @param tblName Table name.
* @param idxName Index name.
*/
- void onIndexDrop(String schemaName, String tblName, String idxName);
+ void onIndexDropped(String schemaName, String tblName, String idxName);
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/SchemaManager.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/SchemaManager.java
index 4c1f432..8320526 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/SchemaManager.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/SchemaManager.java
@@ -378,7 +378,7 @@ public class SchemaManager {
tbl.table().setRemoveIndexOnDestroy(rmvIdx);
dropTable(tbl);
- lsnr.onSqlTypeDrop(schemaName, tbl.type());
+ lsnr.onSqlTypeDropped(schemaName, tbl.type());
}
catch (Exception e) {
U.error(log, "Failed to drop table on cache stop (will ignore): " + tbl.fullTableName(), e);
@@ -444,7 +444,7 @@ public class SchemaManager {
*/
private void createSchema0(String schema) throws IgniteCheckedException {
connMgr.executeSystemStatement("CREATE SCHEMA IF NOT EXISTS " + H2Utils.withQuotes(schema));
- lsnr.onSchemaCreate(schema);
+ lsnr.onSchemaCreated(schema);
if (log.isDebugEnabled())
log.debug("Created H2 schema for index database: " + schema);
@@ -560,7 +560,7 @@ public class SchemaManager {
GridH2Table h2Tbl = H2TableEngine.createTable(conn.connection(), sql, rowDesc, tbl);
- lsnr.onSqlTypeCreate(schemaName, tbl.type(), tbl.cacheInfo());
+ lsnr.onSqlTypeCreated(schemaName, tbl.type(), tbl.cacheInfo());
registerSystemIndexes(h2Tbl, schemaName, tbl);
@@ -595,7 +595,7 @@ public class SchemaManager {
QuerySysIndexDescriptorImpl desc = new QuerySysIndexDescriptorImpl(idxName, idxCols);
- lsnr.onIndexCreate(schemaName, tbl.tableName(), idxName, desc, (GridIndex<?>)idx);
+ lsnr.onIndexCreated(schemaName, tbl.tableName(), idxName, desc, (GridIndex<?>)idx);
}
}
@@ -640,7 +640,7 @@ public class SchemaManager {
*/
private void dropSchema(String schema) throws IgniteCheckedException {
connMgr.executeSystemStatement("DROP SCHEMA IF EXISTS " + H2Utils.withQuotes(schema));
- lsnr.onSchemaDrop(schema);
+ lsnr.onSchemaDropped(schema);
if (log.isDebugEnabled())
log.debug("Dropped H2 schema for index database: " + schema);
@@ -674,7 +674,7 @@ public class SchemaManager {
GridQueryIndexDescriptor idxDesc = desc.type().indexes().get(h2Idx.getName());
- lsnr.onIndexCreate(schemaName, desc.tableName(), h2Idx.getName(), idxDesc, h2Idx);
+ lsnr.onIndexCreated(schemaName, desc.tableName(), h2Idx.getName(), idxDesc, h2Idx);
}
/**
@@ -726,7 +726,7 @@ public class SchemaManager {
throw e;
}
- lsnr.onIndexCreate(schemaName, desc.tableName(), h2Idx.getName(), idxDesc, h2Idx);
+ lsnr.onIndexCreated(schemaName, desc.tableName(), h2Idx.getName(), idxDesc, h2Idx);
}
/**
@@ -749,7 +749,7 @@ public class SchemaManager {
connMgr.executeStatement(schemaName, sql);
- lsnr.onIndexDrop(schemaName, tbl.getName(), idxName);
+ lsnr.onIndexDropped(schemaName, tbl.getName(), idxName);
}
/**
@@ -778,6 +778,8 @@ public class SchemaManager {
}
desc.table().addColumns(cols, ifColNotExists);
+
+ lsnr.onSqlTypeUpdated(schemaName, desc.type(), desc.table().cacheInfo());
}
/**
@@ -806,6 +808,8 @@ public class SchemaManager {
}
desc.table().dropColumns(cols, ifColExists);
+
+ lsnr.onSqlTypeUpdated(schemaName, desc.type(), desc.table().cacheInfo());
}
/**
@@ -905,27 +909,34 @@ public class SchemaManager {
/** */
private static final class NoOpSchemaChangeListener implements SchemaChangeListener {
/** {@inheritDoc} */
- @Override public void onSchemaCreate(String schemaName) {}
+ @Override public void onSchemaCreated(String schemaName) {}
/** {@inheritDoc} */
- @Override public void onSchemaDrop(String schemaName) {}
+ @Override public void onSchemaDropped(String schemaName) {}
/** {@inheritDoc} */
- @Override public void onIndexCreate(String schemaName, String tblName, String idxName,
+ @Override public void onIndexCreated(String schemaName, String tblName, String idxName,
GridQueryIndexDescriptor idxDesc, GridIndex<?> idx) {}
/** {@inheritDoc} */
- @Override public void onIndexDrop(String schemaName, String tblName, String idxName) {}
+ @Override public void onIndexDropped(String schemaName, String tblName, String idxName) {}
+
+ /** {@inheritDoc} */
+ @Override public void onSqlTypeCreated(
+ String schemaName,
+ GridQueryTypeDescriptor typeDesc,
+ GridCacheContextInfo<?, ?> cacheInfo
+ ) {}
/** {@inheritDoc} */
- @Override public void onSqlTypeCreate(
+ @Override public void onSqlTypeUpdated(
String schemaName,
GridQueryTypeDescriptor typeDesc,
GridCacheContextInfo<?, ?> cacheInfo
) {}
/** {@inheritDoc} */
- @Override public void onSqlTypeDrop(String schemaName, GridQueryTypeDescriptor typeDescriptor) {}
+ @Override public void onSqlTypeDropped(String schemaName, GridQueryTypeDescriptor typeDescriptor) {}
}
/** */
@@ -933,42 +944,54 @@ public class SchemaManager {
/** */
private final List<SchemaChangeListener> lsnrs;
+ /**
+ * @param lsnrs Lsnrs.
+ */
private CompoundSchemaChangeListener(List<SchemaChangeListener> lsnrs) {
this.lsnrs = lsnrs;
}
/** {@inheritDoc} */
- @Override public void onSchemaCreate(String schemaName) {
- lsnrs.forEach(lsnr -> lsnr.onSchemaCreate(schemaName));
+ @Override public void onSchemaCreated(String schemaName) {
+ lsnrs.forEach(lsnr -> lsnr.onSchemaCreated(schemaName));
}
/** {@inheritDoc} */
- @Override public void onSchemaDrop(String schemaName) {
- lsnrs.forEach(lsnr -> lsnr.onSchemaCreate(schemaName));
+ @Override public void onSchemaDropped(String schemaName) {
+ lsnrs.forEach(lsnr -> lsnr.onSchemaCreated(schemaName));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onSqlTypeCreated(
+ String schemaName,
+ GridQueryTypeDescriptor typeDesc,
+ GridCacheContextInfo<?, ?> cacheInfo
+ ) {
+ lsnrs.forEach(lsnr -> lsnr.onSqlTypeCreated(schemaName, typeDesc, cacheInfo));
}
/** {@inheritDoc} */
- @Override public void onSqlTypeCreate(
+ @Override public void onSqlTypeUpdated(
String schemaName,
GridQueryTypeDescriptor typeDesc,
GridCacheContextInfo<?, ?> cacheInfo
) {
- lsnrs.forEach(lsnr -> lsnr.onSqlTypeCreate(schemaName, typeDesc, cacheInfo));
+ lsnrs.forEach(lsnr -> lsnr.onSqlTypeUpdated(schemaName, typeDesc, cacheInfo));
}
/** {@inheritDoc} */
- @Override public void onSqlTypeDrop(String schemaName, GridQueryTypeDescriptor typeDescriptor) {
- lsnrs.forEach(lsnr -> lsnr.onSqlTypeDrop(schemaName, typeDescriptor));
+ @Override public void onSqlTypeDropped(String schemaName, GridQueryTypeDescriptor typeDescriptor) {
+ lsnrs.forEach(lsnr -> lsnr.onSqlTypeDropped(schemaName, typeDescriptor));
}
/** {@inheritDoc} */
- @Override public void onIndexCreate(String schemaName, String tblName, String idxName, GridQueryIndexDescriptor idxDesc, GridIndex<?> idx) {
- lsnrs.forEach(lsnr -> lsnr.onIndexCreate(schemaName, tblName, idxName, idxDesc, idx));
+ @Override public void onIndexCreated(String schemaName, String tblName, String idxName, GridQueryIndexDescriptor idxDesc, GridIndex<?> idx) {
+ lsnrs.forEach(lsnr -> lsnr.onIndexCreated(schemaName, tblName, idxName, idxDesc, idx));
}
/** {@inheritDoc} */
- @Override public void onIndexDrop(String schemaName, String tblName, String idxName) {
- lsnrs.forEach(lsnr -> lsnr.onIndexDrop(schemaName, tblName, idxName));
+ @Override public void onIndexDropped(String schemaName, String tblName, String idxName) {
+ lsnrs.forEach(lsnr -> lsnr.onIndexDropped(schemaName, tblName, idxName));
}
}
}