You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by gj...@apache.org on 2021/11/10 13:17:01 UTC
[phoenix] branch 4.x updated: PHOENIX-6227 - Option for DDL changes
to export to external schema repository (#1341)
This is an automated email from the ASF dual-hosted git repository.
gjacoby pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push:
new bfbe8d9 PHOENIX-6227 - Option for DDL changes to export to external schema repository (#1341)
bfbe8d9 is described below
commit bfbe8d9a3cffca518d73462b15356a4e353eb686
Author: Geoffrey Jacoby <gj...@apache.org>
AuthorDate: Wed Nov 10 08:16:53 2021 -0500
PHOENIX-6227 - Option for DDL changes to export to external schema repository (#1341)
---
.../end2end/AlterMultiTenantTableWithViewsIT.java | 63 +++-
.../org/apache/phoenix/end2end/AlterTableIT.java | 96 +++++
.../org/apache/phoenix/end2end/CreateTableIT.java | 21 ++
.../apache/phoenix/end2end/LogicalTableNameIT.java | 32 +-
.../phoenix/end2end/SchemaRegistryFailureIT.java | 135 +++++++
.../it/java/org/apache/phoenix/end2end/ViewIT.java | 94 ++++-
.../apache/phoenix/end2end/WALAnnotationIT.java | 85 ++---
.../coprocessor/IndexToolVerificationResult.java | 1 -
.../phoenix/coprocessor/MetaDataEndpointImpl.java | 416 ++++++++++++++++-----
.../phoenix/coprocessor/MetaDataProtocol.java | 7 +-
.../UngroupedAggregateRegionScanner.java | 15 +-
.../apache/phoenix/exception/SQLExceptionCode.java | 4 +-
.../org/apache/phoenix/execute/MutationState.java | 13 +-
.../phoenix/jdbc/PhoenixDatabaseMetaData.java | 3 +
.../phoenix/query/ConnectionQueryServicesImpl.java | 7 +-
.../org/apache/phoenix/query/QueryConstants.java | 2 +
.../org/apache/phoenix/schema/DelegateTable.java | 5 +
.../org/apache/phoenix/schema/MetaDataClient.java | 11 +-
.../java/org/apache/phoenix/schema/PTable.java | 6 +
.../java/org/apache/phoenix/schema/PTableImpl.java | 58 ++-
.../export/DefaultSchemaRegistryRepository.java | 91 +++++
.../phoenix/schema/export/DefaultSchemaWriter.java | 42 +++
.../phoenix/schema/export/SchemaImporter.java | 39 ++
.../schema/export/SchemaRegistryRepository.java | 70 ++++
.../export/SchemaRegistryRepositoryFactory.java | 67 ++++
.../apache/phoenix/schema/export/SchemaWriter.java | 51 +++
.../phoenix/schema/export/SchemaWriterFactory.java | 41 ++
.../java/org/apache/phoenix/util/MetaDataUtil.java | 146 +++++++-
.../java/org/apache/phoenix/util/ScanUtil.java | 14 +-
.../java/org/apache/phoenix/util/ViewUtil.java | 65 +++-
.../org/apache/phoenix/util/WALAnnotationUtil.java | 13 +-
.../src/main/protobuf/MetaDataService.proto | 1 +
phoenix-core/src/main/protobuf/PTable.proto | 1 +
33 files changed, 1477 insertions(+), 238 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterMultiTenantTableWithViewsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterMultiTenantTableWithViewsIT.java
index 92dbaaf..7abec5a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterMultiTenantTableWithViewsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterMultiTenantTableWithViewsIT.java
@@ -45,11 +45,13 @@ import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.schema.ColumnNotFoundException;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableImpl;
import org.apache.phoenix.schema.PTableKey;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.ViewUtil;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.phoenix.thirdparty.com.google.common.base.Objects;
@@ -82,7 +84,66 @@ public class AlterMultiTenantTableWithViewsIT extends SplitSystemCatalogIT {
assertFalse(rs.next());
assertEquals(values.length, i - 1);
}
-
+
+ @Test
+ public void testCreateAndAlterViewsWithChangeDetectionEnabled() throws Exception {
+ String tenantId = "TE_" + generateUniqueName();
+ String schemaName = "S_" + generateUniqueName();
+ String tableName = "T_" + generateUniqueName();
+ String globalViewName = "GV_" + generateUniqueName();
+ String tenantViewName = "TV_" + generateUniqueName();
+ String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+ String fullGlobalViewName = SchemaUtil.getTableName(schemaName, globalViewName);
+ String fullTenantViewName = SchemaUtil.getTableName(schemaName, tenantViewName);
+
+ PTable globalView = null;
+ PTable alteredGlobalView = null;
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String ddl = "CREATE TABLE " + fullTableName +
+ " (id char(1) NOT NULL," + " col1 integer NOT NULL," + " col2 bigint NOT NULL," +
+ " CONSTRAINT NAME_PK PRIMARY KEY (id, col1, col2)) " +
+ "MULTI_TENANT=true, CHANGE_DETECTION_ENABLED=true";
+ conn.createStatement().execute(ddl);
+ PTable table = PhoenixRuntime.getTableNoCache(conn, fullTableName);
+ assertTrue(table.isChangeDetectionEnabled());
+ AlterTableIT.verifySchemaExport(table, getUtility().getConfiguration());
+
+ String globalViewDdl = "CREATE VIEW " + fullGlobalViewName +
+ " (id2 CHAR(12) NOT NULL PRIMARY KEY, col3 BIGINT NULL) " +
+ " AS SELECT * FROM " + fullTableName + " CHANGE_DETECTION_ENABLED=true";
+
+ conn.createStatement().execute(globalViewDdl);
+ globalView = PhoenixRuntime.getTableNoCache(conn, fullGlobalViewName);
+ assertTrue(globalView.isChangeDetectionEnabled());
+ // base column count doesn't get set properly
+ PTableImpl.Builder builder = PTableImpl.builderFromExisting(globalView);
+ builder.setBaseColumnCount(table.getColumns().size());
+ globalView = builder.setColumns(globalView.getColumns()).build();
+ AlterTableIT.verifySchemaExport(globalView, getUtility().getConfiguration());
+
+ String alterViewDdl = "ALTER VIEW " + fullGlobalViewName + " ADD id3 VARCHAR(12) NULL "
+ + "PRIMARY KEY, " + " col4 BIGINT NULL";
+ conn.createStatement().execute(alterViewDdl);
+ alteredGlobalView = PhoenixRuntime.getTableNoCache(conn, fullGlobalViewName);
+
+ assertTrue(alteredGlobalView.isChangeDetectionEnabled());
+ AlterTableIT.verifySchemaExport(alteredGlobalView, getUtility().getConfiguration());
+ }
+
+ Properties props = new Properties();
+ props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+ try (Connection tenantConn = DriverManager.getConnection(getUrl(), props)) {
+ String tenantViewDdl = "CREATE VIEW " + fullTenantViewName +
+ " (col5 VARCHAR NULL) " +
+ " AS SELECT * FROM " + fullGlobalViewName + " CHANGE_DETECTION_ENABLED=true";
+ tenantConn.createStatement().execute(tenantViewDdl);
+ PTable tenantView = PhoenixRuntime.getTableNoCache(tenantConn, fullTenantViewName);
+ assertTrue(tenantView.isChangeDetectionEnabled());
+ PTable tenantViewWithParents = ViewUtil.addDerivedColumnsFromParent(tenantView, alteredGlobalView);
+ AlterTableIT.verifySchemaExport(tenantViewWithParents, getUtility().getConfiguration());
+ }
+ }
+
@Test
public void testAddDropColumnToBaseTablePropagatesToEntireViewHierarchy() throws Exception {
String baseTable = SchemaUtil.getTableName(SCHEMA1, generateUniqueName());
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
index 3c63e6d..85d92cb 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
@@ -36,6 +36,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
@@ -46,6 +47,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
@@ -64,6 +66,9 @@ import org.apache.phoenix.schema.PTable.EncodedCQCounter;
import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
import org.apache.phoenix.schema.PTableKey;
import org.apache.phoenix.schema.TableNotFoundException;
+import org.apache.phoenix.schema.export.DefaultSchemaRegistryRepository;
+import org.apache.phoenix.schema.export.DefaultSchemaWriter;
+import org.apache.phoenix.schema.export.SchemaRegistryRepositoryFactory;
import org.apache.phoenix.transaction.TransactionFactory;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.IndexUtil;
@@ -71,6 +76,7 @@ import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -230,7 +236,97 @@ public class AlterTableIT extends ParallelStatsDisabledIT {
}
}
+ @Test
+ public void testAlterTableUpdatesSchemaRegistry() throws Exception {
+ String schemaName = generateUniqueName();
+ String tableName = generateUniqueName();
+ String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String createDdl = "CREATE TABLE " + fullTableName +
+ " (id char(1) NOT NULL," + " col1 integer NOT NULL," + " col2 bigint NOT NULL," +
+ " CONSTRAINT NAME_PK PRIMARY KEY (id, col1, col2)) " +
+ "CHANGE_DETECTION_ENABLED=true, SCHEMA_VERSION='OLD'";
+ conn.createStatement().execute(createDdl);
+ PTable table = PhoenixRuntime.getTableNoCache(conn, fullTableName);
+ assertEquals("OLD", table.getSchemaVersion());
+ String expectedSchemaId = String.format("global*%s*%s*OLD", schemaName, tableName);
+ assertEquals(expectedSchemaId, table.getExternalSchemaId());
+
+ String alterVersionDdl = "ALTER TABLE " + fullTableName + " SET SCHEMA_VERSION='NEW'";
+ conn.createStatement().execute(alterVersionDdl);
+
+ String alterDdl = "ALTER TABLE " + fullTableName +
+ " ADD col3 VARCHAR NULL";
+
+ conn.createStatement().execute(alterDdl);
+ PTable newTable = PhoenixRuntime.getTableNoCache(conn, fullTableName);
+ verifySchemaExport(newTable, getUtility().getConfiguration());
+ }
+ }
+
+ @Test
+ public void testAlterChangeDetectionActivatesSchemaRegistryExport() throws Exception {
+ String schemaName = generateUniqueName();
+ String tableName = generateUniqueName();
+ String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String createDdl = "CREATE TABLE " + fullTableName + " (id char(1) NOT NULL," + " col1 integer NOT NULL," + " col2 bigint NOT NULL,"
+ + " CONSTRAINT NAME_PK PRIMARY KEY (id, col1, col2)) "
+ + " SCHEMA_VERSION='OLD'";
+ conn.createStatement().execute(createDdl);
+ PTable table = PhoenixRuntime.getTableNoCache(conn, fullTableName);
+ Assert.assertNull(table.getExternalSchemaId());
+ String alterDdl = "ALTER TABLE " + fullTableName + " SET CHANGE_DETECTION_ENABLED=true";
+ conn.createStatement().execute(alterDdl);
+ PTable alteredTable = PhoenixRuntime.getTableNoCache(conn, fullTableName);
+ assertTrue(alteredTable.isChangeDetectionEnabled());
+ verifySchemaExport(alteredTable, getUtility().getConfiguration());
+ }
+ }
+ @Test
+ public void testChangeDetectionFalseDoesntExportToSchemaRegistry() throws Exception {
+ String schemaName = generateUniqueName();
+ String tableName = generateUniqueName();
+ String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String createDdl = "CREATE TABLE " + fullTableName + " (id char(1) NOT NULL," + " col1 integer NOT NULL," + " col2 bigint NOT NULL,"
+ + " CONSTRAINT NAME_PK PRIMARY KEY (id, col1, col2)) "
+ + "CHANGE_DETECTION_ENABLED=false, SCHEMA_VERSION='OLD'";
+ conn.createStatement().execute(createDdl);
+ PTable table = PhoenixRuntime.getTableNoCache(conn, fullTableName);
+ assertFalse(table.isChangeDetectionEnabled());
+ assertNull(table.getExternalSchemaId());
+ }
+ }
+
+ public static void verifySchemaExport(PTable newTable, Configuration conf) throws IOException {
+ assertEquals(DefaultSchemaRegistryRepository.getSchemaId(newTable),
+ newTable.getExternalSchemaId());
+ String expectedSchemaText = new DefaultSchemaWriter().exportSchema(newTable);
+ String actualSchemaText = SchemaRegistryRepositoryFactory.getSchemaRegistryRepository(
+ conf).getSchemaById(newTable.getExternalSchemaId());
+
+ //filter out table and column timestamp fields, which can vary by a few ms because
+ //HBase assigns the real server timestamp after we update the schema registry
+ String pattern = "(?i)\\s*timestamp:\\s\\d*";
+ expectedSchemaText = expectedSchemaText.replaceAll(pattern, "");
+ actualSchemaText = actualSchemaText.replaceAll(pattern, "");
+
+ //external schema id can be different because it's assigned at the registry AFTER
+ //we save it
+ String externalSchemaPattern = "(?i)\\s*externalSchemaId:\\s\".*\"";
+ expectedSchemaText = expectedSchemaText.replaceAll(externalSchemaPattern, "");
+ actualSchemaText = actualSchemaText.replaceAll(externalSchemaPattern, "");
+
+ //reconstructing the complete view sometimes messes up the base column count. It's not
+ //needed in an external schema registry. TODO: fix the base column count anyway
+ String baseColumnCountPattern = "(?i)\\s*baseColumnCount:\\s\".*\"";
+ expectedSchemaText = expectedSchemaText.replaceAll(baseColumnCountPattern, "");
+ actualSchemaText = expectedSchemaText.replaceAll(baseColumnCountPattern, "");
+
+ assertEquals(expectedSchemaText, actualSchemaText);
+ }
@Test
public void testSetPropertyAndAddColumnForNewColumnFamily() throws Exception {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java
index 8d6a372..e41b6ae 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java
@@ -61,6 +61,7 @@ import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.SchemaNotFoundException;
import org.apache.phoenix.schema.TableAlreadyExistsException;
import org.apache.phoenix.schema.TableNotFoundException;
+import org.apache.phoenix.schema.export.DefaultSchemaRegistryRepository;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
@@ -758,6 +759,26 @@ public class CreateTableIT extends ParallelStatsDisabledIT {
}
@Test
+ public void testCreateChangeDetectionEnabledTable() throws Exception {
+ //create a table with CHANGE_DETECTION_ENABLED and verify both that it's set properly
+ //on the PTable, and that it gets persisted to the external schema registry
+
+ String schemaName = generateUniqueName();
+ String tableName = generateUniqueName();
+ String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String ddl = "CREATE TABLE " + fullTableName +
+ " (id char(1) NOT NULL," + " col1 integer NOT NULL," + " col2 bigint NOT NULL," +
+ " CONSTRAINT NAME_PK PRIMARY KEY (id, col1, col2)) " +
+ "CHANGE_DETECTION_ENABLED=true";
+ conn.createStatement().execute(ddl);
+ PTable table = PhoenixRuntime.getTableNoCache(conn, fullTableName);
+ assertTrue(table.isChangeDetectionEnabled());
+ AlterTableIT.verifySchemaExport(table, getUtility().getConfiguration());
+ }
+ }
+
+ @Test
public void testCreateIndexWithDifferentStorageAndEncoding() throws Exception {
verifyIndexSchemeChange(false, false);
verifyIndexSchemeChange(false, true);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameIT.java
index c4f3c2e..729a8d7 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameIT.java
@@ -26,6 +26,8 @@ import org.apache.phoenix.end2end.join.HashJoinGlobalIndexIT;
import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.SchemaUtil;
@@ -51,9 +53,7 @@ import static org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.INVA
import static org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.VALID_ROW_COUNT;
import static org.apache.phoenix.util.MetaDataUtil.VIEW_INDEX_TABLE_PREFIX;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
+import static org.junit.Assert.*;
@RunWith(Parameterized.class)
@Category(NeedsOwnMiniClusterTest.class)
@@ -430,6 +430,32 @@ public class LogicalTableNameIT extends LogicalTableNameBaseIT {
}
@Test
+ public void testChangeDetectionAfterTableNameChange() throws Exception {
+ try(Connection conn = getConnection(props)) {
+ String schemaName = "S_" + generateUniqueName();
+ String tableName = "T_" + generateUniqueName();
+ String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+ createTable(conn, fullTableName);
+ String alterDdl = "ALTER TABLE " + fullTableName + " SET CHANGE_DETECTION_ENABLED=true";
+ conn.createStatement().execute(alterDdl);
+
+ PTable table = PhoenixRuntime.getTableNoCache(conn, fullTableName);
+ assertTrue(table.isChangeDetectionEnabled());
+ assertNotNull(table.getExternalSchemaId());
+ AlterTableIT.verifySchemaExport(table, getUtility().getConfiguration());
+
+ String newTableName = "T_" + generateUniqueName();
+ String fullNewTableName = SchemaUtil.getTableName(schemaName, newTableName);
+ LogicalTableNameIT.createAndPointToNewPhysicalTable(conn, fullTableName, fullNewTableName, false);
+
+ //logical table name should still be the same for PTable lookup
+ PTable newTable = PhoenixRuntime.getTableNoCache(conn, fullTableName);
+ //but the schema in the registry should match the new PTable
+ AlterTableIT.verifySchemaExport(newTable, getUtility().getConfiguration());
+ }
+ }
+
+ @Test
public void testHashJoin() throws Exception {
if (immutable || createChildAfterRename) {
return;
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SchemaRegistryFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SchemaRegistryFailureIT.java
new file mode 100644
index 0000000..980def2
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SchemaRegistryFailureIT.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.regionserver.ScanInfoUtil;
+import org.apache.phoenix.coprocessor.MetaDataEndpointImpl;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.TableNotFoundException;
+import org.apache.phoenix.schema.export.SchemaRegistryRepository;
+import org.apache.phoenix.schema.export.SchemaWriter;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Map;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class SchemaRegistryFailureIT extends ParallelStatsDisabledIT{
+
+ @BeforeClass
+ public static synchronized void doSetup() throws Exception {
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+ props.put(ScanInfoUtil.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(60*60)); // An hour
+ props.put(SchemaRegistryRepository.SCHEMA_REGISTRY_IMPL_KEY,
+ ExplodingSchemaRegistryRepository.class.getName());
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+
+ @Test
+ public void testFailedCreateRollback() throws Exception {
+ String schemaName = "S_" + generateUniqueName();
+ String tableName = "T_" + generateUniqueName();
+ String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String ddl = "CREATE TABLE " + fullTableName + " (id char(1) NOT NULL," +
+ " col1 integer NOT NULL," + " col2 bigint NOT NULL,"
+ + " CONSTRAINT NAME_PK PRIMARY KEY (id, col1, col2)) "
+ + "MULTI_TENANT=true, CHANGE_DETECTION_ENABLED=true";
+ try {
+ conn.createStatement().execute(ddl);
+ Assert.fail("Should have thrown SQLException");
+ } catch (SQLException e) {
+ Assert.assertEquals(SQLExceptionCode.ERROR_WRITING_TO_SCHEMA_REGISTRY.getErrorCode(),
+ e.getErrorCode());
+ }
+
+ try {
+ PTable table = PhoenixRuntime.getTable(conn, fullTableName);
+ Assert.fail("Shouldn't have found the table because it shouldn't have been created");
+ } catch (TableNotFoundException tnfe) {
+ //eat the exception, which is what we expect
+ }
+ }
+ }
+
+ @Test
+ public void testFailedAlterRollback() throws Exception {
+ String schemaName = "S_" + generateUniqueName();
+ String tableName = "T_" + generateUniqueName();
+ String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String ddl = "CREATE TABLE " + fullTableName + " (id char(1) NOT NULL,"
+ + " col1 integer NOT NULL," + " col2 bigint NOT NULL,"
+ + " CONSTRAINT NAME_PK PRIMARY KEY (id, col1, col2)) " + "MULTI_TENANT=true";
+
+ conn.createStatement().execute(ddl);
+
+ String alterDdl = "ALTER TABLE " + fullTableName + " SET CHANGE_DETECTION_ENABLED=true";
+ try {
+ conn.createStatement().execute(alterDdl);
+ Assert.fail("Should have failed because of schema registry exception");
+ } catch (SQLException se) {
+ Assert.assertEquals(SQLExceptionCode.ERROR_WRITING_TO_SCHEMA_REGISTRY.getErrorCode(),
+ se.getErrorCode());
+ }
+ PTable table = PhoenixRuntime.getTable(conn, fullTableName);
+ Assert.assertFalse(table.isChangeDetectionEnabled());
+ }
+ }
+
+ public static class ExplodingSchemaRegistryRepository implements SchemaRegistryRepository {
+
+ //need a real default constructor for reflection
+ public ExplodingSchemaRegistryRepository() {
+
+ }
+
+ @Override public void init(Configuration conf) throws IOException {
+
+ }
+
+ @Override public String exportSchema(SchemaWriter writer, PTable table) throws IOException {
+ throw new IOException("I always explode!");
+ }
+
+ @Override public String getSchemaById(String schemaId) {
+ return null;
+ }
+
+ @Override public String getSchemaByTable(PTable table) {
+ return null;
+ }
+
+ @Override public void close() throws IOException {
+
+ }
+ }
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java
index 594aa99..8b1a93b 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java
@@ -44,6 +44,7 @@ import java.util.Map;
import java.util.Properties;
import org.apache.curator.shaded.com.google.common.collect.Lists;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.compile.ExplainPlan;
@@ -55,15 +56,15 @@ import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableImpl;
import org.apache.phoenix.schema.ReadOnlyTableException;
+import org.apache.phoenix.schema.export.DefaultSchemaRegistryRepository;
+import org.apache.phoenix.schema.export.DefaultSchemaWriter;
+import org.apache.phoenix.schema.export.SchemaRegistryRepository;
+import org.apache.phoenix.schema.export.SchemaRegistryRepositoryFactory;
import org.apache.phoenix.transaction.PhoenixTransactionProvider.Feature;
import org.apache.phoenix.transaction.TransactionFactory;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
-import org.apache.phoenix.util.MetaDataUtil;
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.apache.phoenix.util.SchemaUtil;
-import org.apache.phoenix.util.TestUtil;
+import org.apache.phoenix.util.*;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -345,6 +346,56 @@ public class ViewIT extends SplitSystemCatalogIT {
}
@Test
+ public void testCreateViewsWithChangeDetectionEnabled() throws Exception {
+ String tenantId = "T_" + generateUniqueName();
+ String schemaName = "S_" + generateUniqueName();
+ String tableName = "T_" + generateUniqueName();
+ String globalViewName = "GV_" + generateUniqueName();
+ String tenantViewName = "TV_" + generateUniqueName();
+ String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+ String fullGlobalViewName = SchemaUtil.getTableName(schemaName, globalViewName);
+ String fullTenantViewName = SchemaUtil.getTableName(schemaName, tenantViewName);
+
+ PTable globalView = null;
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String ddl = "CREATE TABLE " + fullTableName +
+ " (id char(1) NOT NULL," + " col1 integer NOT NULL," + " col2 bigint NOT NULL," +
+ " CONSTRAINT NAME_PK PRIMARY KEY (id, col1, col2)) " +
+ "MULTI_TENANT=true, CHANGE_DETECTION_ENABLED=true";
+ conn.createStatement().execute(ddl);
+ PTable table = PhoenixRuntime.getTableNoCache(conn, fullTableName);
+ assertTrue(table.isChangeDetectionEnabled());
+ AlterTableIT.verifySchemaExport(table, getUtility().getConfiguration());
+
+ String globalViewDdl = "CREATE VIEW " + fullGlobalViewName +
+ " (id2 CHAR(12) PRIMARY KEY, col3 BIGINT NULL) " +
+ " AS SELECT * FROM " + fullTableName + " CHANGE_DETECTION_ENABLED=true";
+
+ conn.createStatement().execute(globalViewDdl);
+ globalView = PhoenixRuntime.getTableNoCache(conn, fullGlobalViewName);
+ assertTrue(globalView.isChangeDetectionEnabled());
+ PTable globalViewWithParents = ViewUtil.addDerivedColumnsFromParent(globalView, table);
+ // base column count doesn't get set properly
+ PTableImpl.Builder builder = PTableImpl.builderFromExisting(globalViewWithParents);
+ builder.setBaseColumnCount(table.getColumns().size());
+ globalViewWithParents = builder.setColumns(globalViewWithParents.getColumns()).build();
+ AlterTableIT.verifySchemaExport(globalViewWithParents, getUtility().getConfiguration());
+ }
+ Properties props = new Properties();
+ props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+ try (Connection tenantConn = DriverManager.getConnection(getUrl(), props)) {
+ String tenantViewDdl = "CREATE VIEW " + fullTenantViewName +
+ " (id3 VARCHAR PRIMARY KEY, col4 VARCHAR NULL) " +
+ " AS SELECT * FROM " + fullGlobalViewName + " CHANGE_DETECTION_ENABLED=true";
+ tenantConn.createStatement().execute(tenantViewDdl);
+ PTable tenantView = PhoenixRuntime.getTableNoCache(tenantConn, fullTenantViewName);
+ assertTrue(tenantView.isChangeDetectionEnabled());
+ PTable tenantViewWithParents = ViewUtil.addDerivedColumnsFromParent(tenantView, globalView);
+ AlterTableIT.verifySchemaExport(tenantViewWithParents, getUtility().getConfiguration());
+ }
+ }
+
+ @Test
public void testCreateViewTimestamp() throws Exception {
String tenantId = null;
createViewTimestampHelper(tenantId);
@@ -384,6 +435,37 @@ public class ViewIT extends SplitSystemCatalogIT {
}
}
+ @Test
+ public void testCreateChangeDetectionEnabledTable() throws Exception {
+ //create a view with CHANGE_DETECTION_ENABLED and verify both that it's set properly
+ //on the PTable, and that it gets persisted to the external schema registry
+
+ String schemaName = generateUniqueName();
+ String tableName = generateUniqueName();
+ String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+ String viewName = generateUniqueName();
+ String fullViewName = SchemaUtil.getTableName(schemaName, viewName);
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String ddl = "CREATE TABLE " + fullTableName +
+ " (id char(1) NOT NULL," + " col1 integer NOT NULL," + " col2 bigint NOT NULL," +
+ " CONSTRAINT NAME_PK PRIMARY KEY (id, col1, col2)) " +
+ "CHANGE_DETECTION_ENABLED=true";
+ conn.createStatement().execute(ddl);
+
+ String viewDdl = "CREATE VIEW " + fullViewName + " AS SELECT * FROM " + fullTableName
+ + " CHANGE_DETECTION_ENABLED=true";
+ conn.createStatement().execute(viewDdl);
+ PTable view = PhoenixRuntime.getTableNoCache(conn, fullViewName);
+ assertTrue(view.isChangeDetectionEnabled());
+ assertEquals(DefaultSchemaRegistryRepository.getSchemaId(view),
+ view.getExternalSchemaId());
+ String schemaText = new DefaultSchemaWriter().exportSchema(view);
+ SchemaRegistryRepository repo = SchemaRegistryRepositoryFactory.getSchemaRegistryRepository(
+ getUtility().getConfiguration());
+ //assertEquals(schemaText, repo.getSchemaById(view.getExternalSchemaId()));
+ }
+ }
+
private void testViewUsesTableIndex(boolean localIndex) throws Exception {
ResultSet rs;
// Use unique name for table with local index as otherwise we run
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/WALAnnotationIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/WALAnnotationIT.java
index d27bc71..f30a357 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/WALAnnotationIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/WALAnnotationIT.java
@@ -17,7 +17,6 @@
*/
package org.apache.phoenix.end2end;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.coprocessor.BaseWALObserver;
@@ -39,13 +38,10 @@ import org.apache.phoenix.query.PhoenixTestBuilder;
import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.ReadOnlyProps;
-import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
-import org.junit.Assert;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -106,10 +102,8 @@ public class WALAnnotationIT extends BaseTest {
Assume.assumeTrue(HbaseCompatCapabilities.hasPreWALAppend());
SchemaBuilder builder = new SchemaBuilder(getUrl());
boolean createGlobalIndex = false;
- long ddlTimestamp = upsertAndDeleteHelper(builder, createGlobalIndex);
- assertAnnotation(2, builder.getPhysicalTableName(false), null,
- builder.getTableOptions().getSchemaName(),
- builder.getDataOptions().getTableName(), PTableType.TABLE, ddlTimestamp);
+ String externalSchemaId = upsertAndDeleteHelper(builder, createGlobalIndex);
+ assertAnnotation(2, builder.getPhysicalTableName(false), externalSchemaId);
}
@Test
@@ -180,18 +174,16 @@ public class WALAnnotationIT extends BaseTest {
Assume.assumeTrue(HbaseCompatCapabilities.hasPreWALAppend());
SchemaBuilder builder = new SchemaBuilder(getUrl());
boolean createGlobalIndex = true;
- long ddlTimestamp = upsertAndDeleteHelper(builder, createGlobalIndex);
- assertAnnotation(2, builder.getPhysicalTableName(false), null,
- builder.getTableOptions().getSchemaName(),
- builder.getDataOptions().getTableName(), PTableType.TABLE, ddlTimestamp);
+ String externalSchemaId = upsertAndDeleteHelper(builder, createGlobalIndex);
+ assertAnnotation(2, builder.getPhysicalTableName(false), externalSchemaId);
assertAnnotation(0, builder.getPhysicalTableIndexName(false),
- null, null, null, null, ddlTimestamp);
+ externalSchemaId);
}
// Note that local secondary indexes aren't supported because they go in the same WALEdit as the
// "base" table data they index.
- private long upsertAndDeleteHelper(SchemaBuilder builder, boolean createGlobalIndex) throws Exception {
+ private String upsertAndDeleteHelper(SchemaBuilder builder, boolean createGlobalIndex) throws Exception {
try (Connection conn = getConnection()) {
SchemaBuilder.TableOptions tableOptions = getTableOptions();
@@ -206,7 +198,7 @@ public class WALAnnotationIT extends BaseTest {
conn.createStatement().execute(upsertSql);
conn.commit();
PTable table = PhoenixRuntime.getTableNoCache(conn, builder.getEntityTableName());
- assertEquals("Change Detection Enabled is false!", true, table.isChangeDetectionEnabled());
+ assertTrue("Change Detection Enabled is false!", table.isChangeDetectionEnabled());
// Deleting by entire PK gets executed as more like an UPSERT VALUES than an UPSERT
// SELECT (i.e, it generates the Mutations and then pushes them to server, rather than
// running a select query and deleting the mutations returned)
@@ -218,7 +210,7 @@ public class WALAnnotationIT extends BaseTest {
// last had columns added or removed. It is NOT the timestamp of a particular mutation
// We need it in the annotation to match up with schema object in an external schema
// repo.
- return table.getLastDDLTimestamp();
+ return table.getExternalSchemaId();
}
}
@@ -258,16 +250,10 @@ public class WALAnnotationIT extends BaseTest {
int expectedAnnotations) throws SQLException, IOException {
PTable baseTable = PhoenixRuntime.getTableNoCache(conn,
baseBuilder.getEntityTableName());
- assertAnnotation(expectedAnnotations, baseBuilder.getPhysicalTableName(false), null,
- baseBuilder.getTableOptions().getSchemaName(),
- baseBuilder.getDataOptions().getTableName(),
- PTableType.TABLE,
- baseTable.getLastDDLTimestamp());
+ assertAnnotation(expectedAnnotations, baseBuilder.getPhysicalTableName(false), baseTable.getExternalSchemaId());
PTable targetTable = PhoenixRuntime.getTableNoCache(conn,
targetBuilder.getEntityTableName());
- assertAnnotation(expectedAnnotations, targetBuilder.getPhysicalTableName(false), null,
- targetBuilder.getTableOptions().getSchemaName(), targetBuilder.getDataOptions().getTableName(),
- PTableType.TABLE, targetTable.getLastDDLTimestamp());
+ assertAnnotation(expectedAnnotations, targetBuilder.getPhysicalTableName(false), targetTable.getExternalSchemaId());
}
@Test
@@ -287,10 +273,8 @@ public class WALAnnotationIT extends BaseTest {
" (OID, KP, COL1, COL2, COL3) SELECT * FROM " + targetBuilder.getEntityTableName();
conn.createStatement().execute(sql);
PTable table = PhoenixRuntime.getTableNoCache(conn, targetBuilder.getEntityTableName());
- assertAnnotation(1, targetBuilder.getPhysicalTableName(false), null,
- targetBuilder.getTableOptions().getSchemaName(),
- targetBuilder.getDataOptions().getTableName(),
- PTableType.TABLE, table.getLastDDLTimestamp());
+ assertAnnotation(1, targetBuilder.getPhysicalTableName(false),
+ table.getExternalSchemaId());
}
}
@@ -344,9 +328,7 @@ public class WALAnnotationIT extends BaseTest {
conn.createStatement().execute(sql);
conn.commit();
PTable table = PhoenixRuntime.getTableNoCache(conn, builder.getEntityTableName());
- assertAnnotation(2, table.getPhysicalName().getString(), null,
- table.getSchemaName().getString(),
- table.getTableName().getString(), PTableType.TABLE, table.getLastDDLTimestamp());
+ assertAnnotation(2, table.getPhysicalName().getString(), table.getExternalSchemaId());
}
}
@@ -373,9 +355,7 @@ public class WALAnnotationIT extends BaseTest {
conn.createStatement().execute(deleteSql);
conn.commit();
PTable view = PhoenixRuntime.getTableNoCache(conn, builder.getEntityGlobalViewName());
- assertAnnotation(2, view.getPhysicalName().getString(), null,
- view.getSchemaName().getString(),
- view.getTableName().getString(), PTableType.VIEW, view.getLastDDLTimestamp());
+ assertAnnotation(2, view.getPhysicalName().getString(), view.getExternalSchemaId());
}
}
@@ -432,13 +412,11 @@ public class WALAnnotationIT extends BaseTest {
conn.createStatement().execute(deleteSql);
conn.commit();
PTable view = PhoenixRuntime.getTableNoCache(conn, builder.getEntityTenantViewName());
- assertAnnotation(2, view.getPhysicalName().getString(), tenant,
- view.getSchemaName().getString(),
- view.getTableName().getString(), PTableType.VIEW, view.getLastDDLTimestamp());
+ assertAnnotation(2, view.getPhysicalName().getString(), view.getExternalSchemaId());
if (createIndex) {
assertAnnotation(0,
MetaDataUtil.getViewIndexPhysicalName(builder.getEntityTableName()),
- tenant, null, null, null, view.getLastDDLTimestamp());
+ view.getExternalSchemaId());
}
}
@@ -466,7 +444,7 @@ public class WALAnnotationIT extends BaseTest {
SchemaBuilder.TableOptions tableOptions = getTableOptions();
builder.withTableOptions(tableOptions).withTableIndexDefaults().build();
PTable table = PhoenixRuntime.getTableNoCache(conn, builder.getEntityTableName());
- assertEquals("Change Detection Enabled is false!", true, table.isChangeDetectionEnabled());
+ assertTrue("Change Detection Enabled is false!", table.isChangeDetectionEnabled());
Long ddlTimestamp = table.getLastDDLTimestamp();
String upsertSql = "UPSERT INTO " + builder.getEntityTableName() + " VALUES" +
" ('a', 'b', 'c', 'd')";
@@ -483,11 +461,9 @@ public class WALAnnotationIT extends BaseTest {
" ('a', 'b', 'c', 'd') ON DUPLICATE KEY UPDATE " + onDupClause;
conn.createStatement().execute(upsertSql);
conn.commit();
- assertAnnotation(2, builder.getPhysicalTableName(false), null,
- builder.getTableOptions().getSchemaName(),
- builder.getDataOptions().getTableName(), PTableType.TABLE, ddlTimestamp);
+ assertAnnotation(2, builder.getPhysicalTableName(false), table.getExternalSchemaId());
assertAnnotation(0, builder.getPhysicalTableIndexName(false),
- null, null, null, null, ddlTimestamp);
+ table.getExternalSchemaId());
}
}
@@ -509,29 +485,16 @@ public class WALAnnotationIT extends BaseTest {
observer.clearAnnotations();
}
- private void assertAnnotation(int numOccurrences, String physicalTableName, String tenant,
- String schemaName,
- String logicalTableName,
- PTableType tableType, long ddlTimestamp) throws IOException {
+ private void assertAnnotation(int numOccurrences, String physicalTableName,
+ String externalSchemaId) throws IOException {
int foundCount = 0;
int notFoundCount = 0;
List<Map<String, byte[]>> entries =
getEntriesForTable(TableName.valueOf(physicalTableName));
for (Map<String, byte[]> m : entries) {
- byte[] tenantBytes = m.get(MutationState.MutationMetadataType.TENANT_ID.toString());
- byte[] schemaBytes = m.get(MutationState.MutationMetadataType.SCHEMA_NAME.toString());
- byte[] logicalTableBytes =
- m.get(MutationState.MutationMetadataType.LOGICAL_TABLE_NAME.toString());
- byte[] tableTypeBytes = m.get(MutationState.MutationMetadataType.TABLE_TYPE.toString());
- byte[] timestampBytes = m.get(MutationState.MutationMetadataType.TIMESTAMP.toString());
- assertNotNull(timestampBytes);
- long timestamp = Bytes.toLong(timestampBytes);
- if (Objects.equals(tenant, Bytes.toString(tenantBytes)) &&
- Objects.equals(schemaName, Bytes.toString(schemaBytes)) &&
- Objects.equals(logicalTableName, Bytes.toString(logicalTableBytes)) &&
- Objects.equals(tableType.toString(), Bytes.toString(tableTypeBytes)) &&
- Objects.equals(ddlTimestamp, timestamp)
- && timestamp < HConstants.LATEST_TIMESTAMP) {
+ byte[] externalSchemaIdBytes = m.get(MutationState.MutationMetadataType.EXTERNAL_SCHEMA_ID.toString());
+ assertNotNull(externalSchemaIdBytes);
+ if (Objects.equals(externalSchemaId, Bytes.toString(externalSchemaIdBytes))) {
foundCount++;
} else {
notFoundCount++;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java
index acdedab..8c8fb3b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java
@@ -17,7 +17,6 @@
*/
package org.apache.phoenix.coprocessor;
-import com.sun.org.apache.xpath.internal.operations.Bool;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Scan;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 49d5fbb..dae26ff 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -36,6 +36,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAM
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_VALUE_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DISABLE_WAL_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ENCODING_SCHEME_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.EXTERNAL_SCHEMA_ID_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_ROWS_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_STATE_BYTES;
@@ -82,6 +83,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT_BYT
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE_BYTES;
import static org.apache.phoenix.query.QueryConstants.VIEW_MODIFIED_PROPERTY_TAG_TYPE;
import static org.apache.phoenix.schema.PTableType.INDEX;
+import static org.apache.phoenix.schema.PTableType.VIEW;
import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
import static org.apache.phoenix.util.SchemaUtil.getVarCharLength;
import static org.apache.phoenix.util.SchemaUtil.getVarChars;
@@ -91,6 +93,7 @@ import static org.apache.phoenix.util.ViewUtil.getSystemTableForChildLinks;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.PrivilegedExceptionAction;
+import java.sql.Connection;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
@@ -127,9 +130,7 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
-import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.ipc.RpcServer.Call;
import org.apache.hadoop.hbase.ipc.RpcUtil;
@@ -182,6 +183,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixResultSet;
import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
import org.apache.phoenix.metrics.Metrics;
import org.apache.phoenix.parse.LiteralParseNode;
import org.apache.phoenix.parse.PFunction;
@@ -217,6 +219,10 @@ import org.apache.phoenix.schema.SequenceKey;
import org.apache.phoenix.schema.SequenceNotFoundException;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.TableNotFoundException;
+import org.apache.phoenix.schema.export.SchemaRegistryRepository;
+import org.apache.phoenix.schema.export.SchemaRegistryRepositoryFactory;
+import org.apache.phoenix.schema.export.SchemaWriter;
+import org.apache.phoenix.schema.export.SchemaWriterFactory;
import org.apache.phoenix.schema.task.SystemTaskParams;
import org.apache.phoenix.schema.task.Task;
import org.apache.phoenix.schema.types.PBinary;
@@ -344,6 +350,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
CHANGE_DETECTION_ENABLED_BYTES);
private static final Cell SCHEMA_VERSION_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY,
TABLE_FAMILY_BYTES, SCHEMA_VERSION_BYTES);
+ private static final Cell EXTERNAL_SCHEMA_ID_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY,
+ TABLE_FAMILY_BYTES, EXTERNAL_SCHEMA_ID_BYTES);
private static final List<Cell> TABLE_KV_COLUMNS = Lists.newArrayList(
EMPTY_KEYVALUE_KV,
@@ -381,7 +389,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
PHOENIX_TTL_HWM_KV,
LAST_DDL_TIMESTAMP_KV,
CHANGE_DETECTION_ENABLED_KV,
- SCHEMA_VERSION_KV
+ SCHEMA_VERSION_KV,
+ EXTERNAL_SCHEMA_ID_KV
);
static {
@@ -425,6 +434,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
private static final int CHANGE_DETECTION_ENABLED_INDEX =
TABLE_KV_COLUMNS.indexOf(CHANGE_DETECTION_ENABLED_KV);
private static final int SCHEMA_VERSION_INDEX = TABLE_KV_COLUMNS.indexOf(SCHEMA_VERSION_KV);
+ private static final int EXTERNAL_SCHEMA_ID_INDEX =
+ TABLE_KV_COLUMNS.indexOf(EXTERNAL_SCHEMA_ID_KV);
// KeyValues for Column
private static final KeyValue DECIMAL_DIGITS_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DECIMAL_DIGITS_BYTES);
private static final KeyValue COLUMN_SIZE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_SIZE_BYTES);
@@ -600,7 +611,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
@Override
public void stop(CoprocessorEnvironment env) throws IOException {
- // nothing to do
+ SchemaRegistryRepositoryFactory.close();
}
@Override
@@ -806,7 +817,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
private void addColumnToTable(List<Cell> results, PName colName, PName famName,
Cell[] colKeyValues, List<PColumn> columns, boolean isSalted, int baseColumnCount,
- boolean isRegularView) {
+ boolean isRegularView, long timestamp) {
int i = 0;
int j = 0;
while (i < results.size() && j < COLUMN_KV_COLUMNS.size()) {
@@ -911,7 +922,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
new PColumnImpl(colName, famName, dataType, maxLength, scale, isNullable,
position - 1, sortOrder, arraySize, viewConstant, isViewReferenced,
expressionStr, isRowTimestamp, false, columnQualifierBytes,
- results.get(0).getTimestamp());
+ timestamp);
columns.add(column);
}
@@ -1008,11 +1019,51 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
if (results.isEmpty()) {
return null;
}
+ List<Cell> tableCellList = results;
+ results = Lists.newArrayList();
+ List<List<Cell>> allColumnCellList = Lists.newArrayList();
+
+ do {
+ if (results.size() > 0) {
+ allColumnCellList.add(results);
+ results = Lists.newArrayList();
+ }
+ } while (scanner.next(results));
+ if (results != null && results.size() > 0) {
+ allColumnCellList.add(results);
+ }
+
+ return getTableFromCells(tableCellList, allColumnCellList, clientTimeStamp, clientVersion);
+ }
+
+ private PTable getTableFromCells(List<Cell> tableCellList, List<List<Cell>> allColumnCellList,
+ long clientTimeStamp, int clientVersion)
+ throws IOException, SQLException {
+ return getTableFromCells(tableCellList, allColumnCellList, clientTimeStamp, clientVersion, null);
+ }
+
+ /**
+ * Utility method to get a PTable from the HBase Cells either read from SYSTEM.CATALOG or
+ * generated by a DDL statement. Optionally, an existing PTable can be provided so that its
+ * properties can be merged with the "new" PTable created from the Cell. This is useful when
+ * generating an updated PTable following an ALTER DDL statement
+ * @param tableCellList Cells from the header row containing table level properties
+ * @param allColumnCellList Cells from column or link rows
+ * @param clientTimeStamp client-provided timestamp
+ * @param clientVersion client-provided version
+ * @param oldTable Optional parameters containing properties for an existing PTable
+ * @return
+ * @throws IOException
+ * @throws SQLException
+ */
+ private PTable getTableFromCells(List<Cell> tableCellList, List<List<Cell>> allColumnCellList,
+ long clientTimeStamp, int clientVersion, PTable oldTable)
+ throws IOException, SQLException {
Cell[] tableKeyValues = new Cell[TABLE_KV_COLUMNS.size()];
Cell[] colKeyValues = new Cell[COLUMN_KV_COLUMNS.size()];
// Create PTable based on KeyValues from scan
- Cell keyValue = results.get(0);
+ Cell keyValue = tableCellList.get(0);
byte[] keyBuffer = keyValue.getRowArray();
int keyLength = keyValue.getRowLength();
int keyOffset = keyValue.getRowOffset();
@@ -1033,20 +1084,24 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
// This will prevent the client from continually looking for the current
// table when we know that there will never be one since we disallow updates
// unless the table is the latest
- // If we already have a table newer than the one we just found and
- // the client timestamp is less that the existing table time stamp,
- // bump up the timeStamp to right before the client time stamp, since
- // we know it can't possibly change.
+
long timeStamp = keyValue.getTimestamp();
- // long timeStamp = tableTimeStamp > keyValue.getTimestamp() &&
- // clientTimeStamp < tableTimeStamp
- // ? clientTimeStamp-1
- // : keyValue.getTimestamp();
+
+ PTableImpl.Builder builder = null;
+ if (oldTable != null) {
+ builder = PTableImpl.builderFromExisting(oldTable);
+ builder.setColumns(oldTable.getColumns());
+ } else {
+ builder = new PTableImpl.Builder();
+ }
+ builder.setTenantId(tenantId);
+ builder.setSchemaName(schemaName);
+ builder.setTableName(tableName);
int i = 0;
int j = 0;
- while (i < results.size() && j < TABLE_KV_COLUMNS.size()) {
- Cell kv = results.get(i);
+ while (i < tableCellList.size() && j < TABLE_KV_COLUMNS.size()) {
+ Cell kv = tableCellList.get(i);
Cell searchKv = TABLE_KV_COLUMNS.get(j);
int cmp =
Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(),
@@ -1069,7 +1124,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
|| tableKeyValues[COLUMN_COUNT_INDEX] == null) {
// since we allow SYSTEM.CATALOG to split in certain cases there might be child links or
// other metadata rows that are invalid and should be ignored
- Cell cell = results.get(0);
+ Cell cell = tableCellList.get(0);
LOGGER.error("Found invalid metadata rows for rowkey " +
Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
return null;
@@ -1079,18 +1134,25 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
PTableType tableType =
PTableType
.fromSerializedValue(tableTypeKv.getValueArray()[tableTypeKv.getValueOffset()]);
+ builder.setType(tableType);
+
Cell tableSeqNumKv = tableKeyValues[TABLE_SEQ_NUM_INDEX];
long tableSeqNum =
PLong.INSTANCE.getCodec().decodeLong(tableSeqNumKv.getValueArray(),
tableSeqNumKv.getValueOffset(), SortOrder.getDefault());
+ builder.setSequenceNumber(tableSeqNum);
+
Cell columnCountKv = tableKeyValues[COLUMN_COUNT_INDEX];
int columnCount =
PInteger.INSTANCE.getCodec().decodeInt(columnCountKv.getValueArray(),
columnCountKv.getValueOffset(), SortOrder.getDefault());
+
Cell pkNameKv = tableKeyValues[PK_NAME_INDEX];
PName pkName =
pkNameKv != null ? newPName(pkNameKv.getValueArray(), pkNameKv.getValueOffset(),
pkNameKv.getValueLength()) : null;
+ builder.setPkName(pkName != null ? pkName : oldTable != null ? oldTable.getPKName() : null);
+
Cell saltBucketNumKv = tableKeyValues[SALT_BUCKETS_INDEX];
Integer saltBucketNum =
saltBucketNumKv != null ? (Integer) PInteger.INSTANCE.getCodec().decodeInt(
@@ -1098,37 +1160,62 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
if (saltBucketNum != null && saltBucketNum.intValue() == 0) {
saltBucketNum = null; // Zero salt buckets means not salted
}
+ builder.setBucketNum(saltBucketNum != null ? saltBucketNum : oldTable != null ? oldTable.getBucketNum() : null);
+
+ //data table name is used to find the parent table for indexes later
Cell dataTableNameKv = tableKeyValues[DATA_TABLE_NAME_INDEX];
PName dataTableName =
dataTableNameKv != null ? newPName(dataTableNameKv.getValueArray(),
dataTableNameKv.getValueOffset(), dataTableNameKv.getValueLength()) : null;
+
Cell physicalTableNameKv = tableKeyValues[PHYSICAL_TABLE_NAME_INDEX];
PName physicalTableName =
physicalTableNameKv != null ? newPName(physicalTableNameKv.getValueArray(),
physicalTableNameKv.getValueOffset(), physicalTableNameKv.getValueLength()) : null;
+ builder.setPhysicalTableName(physicalTableName != null ? physicalTableName : oldTable != null ? oldTable.getPhysicalName(true) : null);
Cell indexStateKv = tableKeyValues[INDEX_STATE_INDEX];
PIndexState indexState =
indexStateKv == null ? null : PIndexState.fromSerializedValue(indexStateKv
.getValueArray()[indexStateKv.getValueOffset()]);
+ builder.setState(indexState != null ? indexState : oldTable != null ? oldTable.getIndexState() : null);
Cell immutableRowsKv = tableKeyValues[IMMUTABLE_ROWS_INDEX];
- boolean isImmutableRows =
- immutableRowsKv == null ? false : (Boolean) PBoolean.INSTANCE.toObject(
- immutableRowsKv.getValueArray(), immutableRowsKv.getValueOffset(),
- immutableRowsKv.getValueLength());
+ boolean isImmutableRows = immutableRowsKv != null && (Boolean) PBoolean.INSTANCE.toObject(
+ immutableRowsKv.getValueArray(), immutableRowsKv.getValueOffset(),
+ immutableRowsKv.getValueLength());
+ builder.setImmutableRows(immutableRowsKv != null ? isImmutableRows :
+ oldTable != null && oldTable.isImmutableRows());
+
Cell defaultFamilyNameKv = tableKeyValues[DEFAULT_COLUMN_FAMILY_INDEX];
PName defaultFamilyName = defaultFamilyNameKv != null ? newPName(defaultFamilyNameKv.getValueArray(), defaultFamilyNameKv.getValueOffset(), defaultFamilyNameKv.getValueLength()) : null;
+ builder.setDefaultFamilyName(defaultFamilyName != null ? defaultFamilyName : oldTable != null ? oldTable.getDefaultFamilyName() : null);
+
Cell viewStatementKv = tableKeyValues[VIEW_STATEMENT_INDEX];
String viewStatement = viewStatementKv != null ? (String) PVarchar.INSTANCE.toObject(viewStatementKv.getValueArray(), viewStatementKv.getValueOffset(),
viewStatementKv.getValueLength()) : null;
+ builder.setViewStatement(viewStatement != null ? viewStatement : oldTable != null ? oldTable.getViewStatement() : null);
+
Cell disableWALKv = tableKeyValues[DISABLE_WAL_INDEX];
boolean disableWAL = disableWALKv == null ? PTable.DEFAULT_DISABLE_WAL : Boolean.TRUE.equals(
PBoolean.INSTANCE.toObject(disableWALKv.getValueArray(), disableWALKv.getValueOffset(), disableWALKv.getValueLength()));
+ builder.setDisableWAL(disableWALKv != null ? disableWAL :
+ oldTable != null && oldTable.isWALDisabled());
+
Cell multiTenantKv = tableKeyValues[MULTI_TENANT_INDEX];
- boolean multiTenant = multiTenantKv == null ? false : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(multiTenantKv.getValueArray(), multiTenantKv.getValueOffset(), multiTenantKv.getValueLength()));
+ boolean multiTenant = multiTenantKv != null && Boolean.TRUE.equals(
+ PBoolean.INSTANCE.toObject(multiTenantKv.getValueArray(),
+ multiTenantKv.getValueOffset(), multiTenantKv.getValueLength()));
+ builder.setMultiTenant(multiTenantKv != null ? multiTenant :
+ oldTable != null && oldTable.isMultiTenant());
+
Cell storeNullsKv = tableKeyValues[STORE_NULLS_INDEX];
- boolean storeNulls = storeNullsKv == null ? false : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(storeNullsKv.getValueArray(), storeNullsKv.getValueOffset(), storeNullsKv.getValueLength()));
+ boolean storeNulls = storeNullsKv != null && Boolean.TRUE.equals(
+ PBoolean.INSTANCE.toObject(storeNullsKv.getValueArray(), storeNullsKv.getValueOffset(),
+ storeNullsKv.getValueLength()));
+ builder.setStoreNulls(storeNullsKv != null ? storeNulls :
+ oldTable != null && oldTable.getStoreNulls());
+
Cell transactionalKv = tableKeyValues[TRANSACTIONAL_INDEX];
Cell transactionProviderKv = tableKeyValues[TRANSACTION_PROVIDER_INDEX];
TransactionFactory.Provider transactionProvider = null;
@@ -1148,115 +1235,199 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
transactionProviderKv.getValueOffset(),
SortOrder.getDefault()));
}
+ builder.setTransactionProvider(transactionProviderKv != null || transactionalKv != null
+ ? transactionProvider : oldTable != null ? oldTable.getTransactionProvider() : null);
+
Cell viewTypeKv = tableKeyValues[VIEW_TYPE_INDEX];
ViewType viewType = viewTypeKv == null ? null : ViewType.fromSerializedValue(viewTypeKv.getValueArray()[viewTypeKv.getValueOffset()]);
- PDataType viewIndexIdType = getViewIndexIdType(tableKeyValues);
+ builder.setViewType(viewType != null ? viewType : oldTable != null ? oldTable.getViewType() : null);
+
+ PDataType viewIndexIdType = oldTable != null ? oldTable.getviewIndexIdType() :
+ getViewIndexIdType(tableKeyValues);
+ builder.setViewIndexIdType(viewIndexIdType);
+
Long viewIndexId = getViewIndexId(tableKeyValues, viewIndexIdType);
+ builder.setViewIndexId(viewIndexId != null ? viewIndexId : oldTable != null ? oldTable.getViewIndexId() : null);
+
Cell indexTypeKv = tableKeyValues[INDEX_TYPE_INDEX];
IndexType indexType = indexTypeKv == null ? null : IndexType.fromSerializedValue(indexTypeKv.getValueArray()[indexTypeKv.getValueOffset()]);
+ builder.setIndexType(indexType != null ? indexType : oldTable != null ? oldTable.getIndexType() : null);
+
Cell baseColumnCountKv = tableKeyValues[BASE_COLUMN_COUNT_INDEX];
int baseColumnCount = baseColumnCountKv == null ? 0 : PInteger.INSTANCE.getCodec().decodeInt(baseColumnCountKv.getValueArray(),
baseColumnCountKv.getValueOffset(), SortOrder.getDefault());
+ builder.setBaseColumnCount(baseColumnCountKv != null ? baseColumnCount : oldTable != null ? oldTable.getBaseColumnCount() : 0);
+
Cell rowKeyOrderOptimizableKv = tableKeyValues[ROW_KEY_ORDER_OPTIMIZABLE_INDEX];
- boolean rowKeyOrderOptimizable = rowKeyOrderOptimizableKv == null ? false : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(rowKeyOrderOptimizableKv.getValueArray(), rowKeyOrderOptimizableKv.getValueOffset(), rowKeyOrderOptimizableKv.getValueLength()));
+ boolean rowKeyOrderOptimizable = rowKeyOrderOptimizableKv != null && Boolean.TRUE.equals(
+ PBoolean.INSTANCE.toObject(rowKeyOrderOptimizableKv.getValueArray(),
+ rowKeyOrderOptimizableKv.getValueOffset(),
+ rowKeyOrderOptimizableKv.getValueLength()));
+ builder.setRowKeyOrderOptimizable(rowKeyOrderOptimizableKv != null ? rowKeyOrderOptimizable :
+ oldTable != null && oldTable.rowKeyOrderOptimizable());
+
Cell updateCacheFrequencyKv = tableKeyValues[UPDATE_CACHE_FREQUENCY_INDEX];
long updateCacheFrequency = updateCacheFrequencyKv == null ? 0 :
PLong.INSTANCE.getCodec().decodeLong(updateCacheFrequencyKv.getValueArray(),
updateCacheFrequencyKv.getValueOffset(), SortOrder.getDefault());
+ builder.setUpdateCacheFrequency(updateCacheFrequencyKv != null ? updateCacheFrequency : oldTable != null ? oldTable.getUpdateCacheFrequency() : 0);
// Check the cell tag to see whether the view has modified this property
final byte[] tagUpdateCacheFreq = (updateCacheFrequencyKv == null) ?
HConstants.EMPTY_BYTE_ARRAY : CellUtil.getTagArray(updateCacheFrequencyKv);
boolean viewModifiedUpdateCacheFrequency = (PTableType.VIEW.equals(tableType)) &&
Bytes.contains(tagUpdateCacheFreq, VIEW_MODIFIED_PROPERTY_BYTES);
+ builder.setViewModifiedUpdateCacheFrequency(!Bytes.equals(tagUpdateCacheFreq,
+ HConstants.EMPTY_BYTE_ARRAY) ? viewModifiedUpdateCacheFrequency :
+ oldTable != null && oldTable.hasViewModifiedUpdateCacheFrequency());
+
Cell indexDisableTimestampKv = tableKeyValues[INDEX_DISABLE_TIMESTAMP];
long indexDisableTimestamp = indexDisableTimestampKv == null ? 0L : PLong.INSTANCE.getCodec().decodeLong(indexDisableTimestampKv.getValueArray(),
indexDisableTimestampKv.getValueOffset(), SortOrder.getDefault());
+ builder.setIndexDisableTimestamp(indexDisableTimestampKv != null ?
+ indexDisableTimestamp : oldTable != null ? oldTable.getIndexDisableTimestamp() : 0L);
+
Cell isNamespaceMappedKv = tableKeyValues[IS_NAMESPACE_MAPPED_INDEX];
- boolean isNamespaceMapped = isNamespaceMappedKv == null ? false
- : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(isNamespaceMappedKv.getValueArray(),
+ boolean isNamespaceMapped = isNamespaceMappedKv != null && Boolean.TRUE.equals(
+ PBoolean.INSTANCE.toObject(isNamespaceMappedKv.getValueArray(),
isNamespaceMappedKv.getValueOffset(), isNamespaceMappedKv.getValueLength()));
+ builder.setNamespaceMapped(isNamespaceMappedKv != null ? isNamespaceMapped :
+ oldTable != null && oldTable.isNamespaceMapped());
+
Cell autoPartitionSeqKv = tableKeyValues[AUTO_PARTITION_SEQ_INDEX];
String autoPartitionSeq = autoPartitionSeqKv != null ? (String) PVarchar.INSTANCE.toObject(autoPartitionSeqKv.getValueArray(), autoPartitionSeqKv.getValueOffset(),
autoPartitionSeqKv.getValueLength()) : null;
+ builder.setAutoPartitionSeqName(autoPartitionSeq != null
+ ? autoPartitionSeq : oldTable != null ? oldTable.getAutoPartitionSeqName() : null);
+
Cell isAppendOnlySchemaKv = tableKeyValues[APPEND_ONLY_SCHEMA_INDEX];
- boolean isAppendOnlySchema = isAppendOnlySchemaKv == null ? false
- : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(isAppendOnlySchemaKv.getValueArray(),
+ boolean isAppendOnlySchema = isAppendOnlySchemaKv != null && Boolean.TRUE.equals(
+ PBoolean.INSTANCE.toObject(isAppendOnlySchemaKv.getValueArray(),
isAppendOnlySchemaKv.getValueOffset(), isAppendOnlySchemaKv.getValueLength()));
+ builder.setAppendOnlySchema(isAppendOnlySchemaKv != null ? isAppendOnlySchema :
+ oldTable != null && oldTable.isAppendOnlySchema());
+
Cell storageSchemeKv = tableKeyValues[STORAGE_SCHEME_INDEX];
//TODO: change this once we start having other values for storage schemes
ImmutableStorageScheme storageScheme = storageSchemeKv == null ? ImmutableStorageScheme.ONE_CELL_PER_COLUMN : ImmutableStorageScheme
.fromSerializedValue((byte) PTinyint.INSTANCE.toObject(storageSchemeKv.getValueArray(),
storageSchemeKv.getValueOffset(), storageSchemeKv.getValueLength()));
+ builder.setImmutableStorageScheme(storageSchemeKv != null ? storageScheme :
+ oldTable != null ? oldTable.getImmutableStorageScheme() : ImmutableStorageScheme.ONE_CELL_PER_COLUMN);
+
Cell encodingSchemeKv = tableKeyValues[QUALIFIER_ENCODING_SCHEME_INDEX];
QualifierEncodingScheme encodingScheme = encodingSchemeKv == null ? QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : QualifierEncodingScheme
.fromSerializedValue((byte) PTinyint.INSTANCE.toObject(encodingSchemeKv.getValueArray(),
encodingSchemeKv.getValueOffset(), encodingSchemeKv.getValueLength()));
+ builder.setQualifierEncodingScheme(encodingSchemeKv != null ? encodingScheme :
+ oldTable != null ? oldTable.getEncodingScheme() : QualifierEncodingScheme.NON_ENCODED_QUALIFIERS);
+
Cell useStatsForParallelizationKv = tableKeyValues[USE_STATS_FOR_PARALLELIZATION_INDEX];
- Boolean useStatsForParallelization = useStatsForParallelizationKv == null ? null : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(useStatsForParallelizationKv.getValueArray(), useStatsForParallelizationKv.getValueOffset(), useStatsForParallelizationKv.getValueLength()));
+ Boolean useStatsForParallelization = useStatsForParallelizationKv == null ? null :
+ Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(useStatsForParallelizationKv.getValueArray(), useStatsForParallelizationKv.getValueOffset(), useStatsForParallelizationKv.getValueLength()));
+ builder.setUseStatsForParallelization(useStatsForParallelization != null ?
+ useStatsForParallelization : oldTable != null ? oldTable.useStatsForParallelization() : null);
Cell phoenixTTLKv = tableKeyValues[PHOENIX_TTL_INDEX];
long phoenixTTL = phoenixTTLKv == null ? PHOENIX_TTL_NOT_DEFINED :
PLong.INSTANCE.getCodec().decodeLong(phoenixTTLKv.getValueArray(),
phoenixTTLKv.getValueOffset(), SortOrder.getDefault());
+ builder.setPhoenixTTL(phoenixTTLKv != null ? phoenixTTL :
+ oldTable != null ? oldTable.getPhoenixTTL() : PHOENIX_TTL_NOT_DEFINED);
Cell phoenixTTLHWMKv = tableKeyValues[PHOENIX_TTL_HWM_INDEX];
long phoenixTTLHWM = phoenixTTLHWMKv == null ? MIN_PHOENIX_TTL_HWM :
PLong.INSTANCE.getCodec().decodeLong(phoenixTTLHWMKv.getValueArray(),
phoenixTTLHWMKv.getValueOffset(), SortOrder.getDefault());
+ builder.setPhoenixTTLHighWaterMark(phoenixTTLHWMKv != null ? phoenixTTLHWM :
+ oldTable != null ? oldTable.getPhoenixTTLHighWaterMark() : MIN_PHOENIX_TTL_HWM);
// Check the cell tag to see whether the view has modified this property
final byte[] tagPhoenixTTL = (phoenixTTLKv == null) ?
HConstants.EMPTY_BYTE_ARRAY : CellUtil.getTagArray(phoenixTTLKv);
boolean viewModifiedPhoenixTTL = (PTableType.VIEW.equals(tableType)) &&
Bytes.contains(tagPhoenixTTL, VIEW_MODIFIED_PROPERTY_BYTES);
+ builder.setViewModifiedPhoenixTTL(oldTable != null ?
+ oldTable.hasViewModifiedPhoenixTTL() || viewModifiedPhoenixTTL : viewModifiedPhoenixTTL);
Cell lastDDLTimestampKv = tableKeyValues[LAST_DDL_TIMESTAMP_INDEX];
Long lastDDLTimestamp = lastDDLTimestampKv == null ?
null : PLong.INSTANCE.getCodec().decodeLong(lastDDLTimestampKv.getValueArray(),
lastDDLTimestampKv.getValueOffset(), SortOrder.getDefault());
+ builder.setLastDDLTimestamp(lastDDLTimestampKv != null ? lastDDLTimestamp :
+ oldTable != null ? oldTable.getLastDDLTimestamp() : null);
Cell changeDetectionEnabledKv = tableKeyValues[CHANGE_DETECTION_ENABLED_INDEX];
boolean isChangeDetectionEnabled = changeDetectionEnabledKv != null
&& Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(changeDetectionEnabledKv.getValueArray(),
changeDetectionEnabledKv.getValueOffset(),
changeDetectionEnabledKv.getValueLength()));
+ builder.setIsChangeDetectionEnabled(changeDetectionEnabledKv != null ?
+ isChangeDetectionEnabled : oldTable != null && oldTable.isChangeDetectionEnabled());
Cell schemaVersionKv = tableKeyValues[SCHEMA_VERSION_INDEX];
String schemaVersion = schemaVersionKv != null ? (String) PVarchar.INSTANCE.toObject(
schemaVersionKv.getValueArray(), schemaVersionKv.getValueOffset(), schemaVersionKv.getValueLength())
: null;
+ builder.setSchemaVersion(schemaVersion != null ?
+ schemaVersion : oldTable != null ? oldTable.getSchemaVersion() : null);
+
+ Cell externalSchemaIdKv = tableKeyValues[EXTERNAL_SCHEMA_ID_INDEX];
+ String externalSchemaId = externalSchemaIdKv != null ?
+ (String) PVarchar.INSTANCE.toObject(externalSchemaIdKv.getValueArray(),
+ externalSchemaIdKv.getValueOffset(), externalSchemaIdKv.getValueLength())
+ : null;
+ builder.setExternalSchemaId(externalSchemaId != null ? externalSchemaId :
+ oldTable != null ? oldTable.getExternalSchemaId() : null);
// Check the cell tag to see whether the view has modified this property
final byte[] tagUseStatsForParallelization = (useStatsForParallelizationKv == null) ?
HConstants.EMPTY_BYTE_ARRAY : CellUtil.getTagArray(useStatsForParallelizationKv);
boolean viewModifiedUseStatsForParallelization = (PTableType.VIEW.equals(tableType)) &&
Bytes.contains(tagUseStatsForParallelization, VIEW_MODIFIED_PROPERTY_BYTES);
+ builder.setViewModifiedUseStatsForParallelization(viewModifiedUseStatsForParallelization ||
+ (oldTable != null && oldTable.hasViewModifiedUseStatsForParallelization()));
+ boolean setPhysicalName = false;
List<PColumn> columns = Lists.newArrayListWithExpectedSize(columnCount);
List<PTable> indexes = Lists.newArrayList();
List<PName> physicalTables = Lists.newArrayList();
PName parentTableName = tableType == INDEX ? dataTableName : null;
PName parentSchemaName = tableType == INDEX ? schemaName : null;
PName parentLogicalName = null;
- EncodedCQCounter cqCounter =
- (!EncodedColumnsUtil.usesEncodedColumnNames(encodingScheme) || tableType == PTableType.VIEW) ? PTable.EncodedCQCounter.NULL_COUNTER
- : new EncodedCQCounter();
+ EncodedCQCounter cqCounter = null;
+ if (oldTable != null) {
+ cqCounter = oldTable.getEncodedCQCounter();
+ } else {
+ cqCounter = (!EncodedColumnsUtil.usesEncodedColumnNames(encodingScheme) || tableType == PTableType.VIEW) ?
+ PTable.EncodedCQCounter.NULL_COUNTER :
+ new EncodedCQCounter();
+ }
+
+ if (timeStamp == HConstants.LATEST_TIMESTAMP) {
+ timeStamp = lastDDLTimestamp != null ? lastDDLTimestamp : clientTimeStamp;
+ }
+ builder.setTimeStamp(timeStamp);
+
+
boolean isRegularView = (tableType == PTableType.VIEW && viewType != ViewType.MAPPED);
- while (true) {
- results.clear();
- scanner.next(results);
- if (results.isEmpty()) {
- break;
- }
- Cell colKv = results.get(LINK_TYPE_INDEX);
+ for (List<Cell> columnCellList : allColumnCellList) {
+
+ Cell colKv = columnCellList.get(LINK_TYPE_INDEX);
int colKeyLength = colKv.getRowLength();
+
PName colName = newPName(colKv.getRowArray(), colKv.getRowOffset() + offset, colKeyLength - offset);
+ if (colName == null) {
+ continue;
+ }
int colKeyOffset = offset + colName.getBytes().length + 1;
PName famName = newPName(colKv.getRowArray(), colKv.getRowOffset() + colKeyOffset, colKeyLength - colKeyOffset);
+
if (isQualifierCounterKV(colKv)) {
- Integer value = PInteger.INSTANCE.getCodec().decodeInt(colKv.getValueArray(), colKv.getValueOffset(), SortOrder.ASC);
- cqCounter.setValue(famName.getString(), value);
+ if (famName != null) {
+ Integer value = PInteger.INSTANCE.getCodec().decodeInt(colKv.getValueArray(), colKv.getValueOffset(), SortOrder.ASC);
+ cqCounter.setValue(famName.getString(), value);
+ }
} else if (Bytes.compareTo(LINK_TYPE_BYTES, 0, LINK_TYPE_BYTES.length, colKv.getQualifierArray(), colKv.getQualifierOffset(), colKv.getQualifierLength()) == 0) {
LinkType linkType = LinkType.fromSerializedValue(colKv.getValueArray()[colKv.getValueOffset()]);
if (linkType == LinkType.INDEX_TABLE) {
@@ -1288,17 +1459,21 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
clientTimeStamp);
if (tablePhysicalName == null) {
physicalTables.add(famName);
+ setPhysicalName = true;
} else {
physicalTables.add(SchemaUtil.getPhysicalHBaseTableName(schemaName, tablePhysicalName, isNamespaceMapped));
+ setPhysicalName = true;
}
} else {
physicalTables.add(famName);
+ setPhysicalName = true;
}
// If this is a view index, then one of the link is IDX_VW -> _IDX_ PhysicalTable link. Since famName is _IDX_ and we can't get this table hence it is null, we need to use actual view name
parentLogicalName = (tableType == INDEX ? SchemaUtil.getTableName(parentSchemaName, parentTableName) : famName);
} else {
String parentPhysicalTableName = parentTable.getPhysicalName().getString();
physicalTables.add(PNameFactory.newName(parentPhysicalTableName));
+ setPhysicalName = true;
parentLogicalName = SchemaUtil.getTableName(parentTable.getSchemaName(), parentTable.getTableName());
}
} else if (linkType == LinkType.PARENT_TABLE) {
@@ -1309,63 +1484,40 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
addExcludedColumnToTable(columns, colName, famName, colKv.getTimestamp());
}
} else {
- addColumnToTable(results, colName, famName, colKeyValues, columns, saltBucketNum != null, baseColumnCount, isRegularView);
+ long columnTimestamp =
+ columnCellList.get(0).getTimestamp() != HConstants.LATEST_TIMESTAMP ?
+ columnCellList.get(0).getTimestamp() : timeStamp;
+ addColumnToTable(columnCellList, colName, famName, colKeyValues, columns,
+ saltBucketNum != null, baseColumnCount, isRegularView, columnTimestamp);
}
}
+ builder.setEncodedCQCounter(cqCounter);
+
+ builder.setIndexes(indexes != null ? indexes : oldTable != null
+ ? oldTable.getIndexes() : Collections.<PTable>emptyList());
+
+ if (physicalTables == null) {
+ builder.setPhysicalNames(oldTable != null ? oldTable.getPhysicalNames()
+ : ImmutableList.<PName>of());
+ } else {
+ builder.setPhysicalNames(ImmutableList.copyOf(physicalTables));
+ }
+ if (!setPhysicalName && oldTable != null) {
+ builder.setPhysicalTableName(oldTable.getPhysicalName());
+ }
+
+ builder.setExcludedColumns(ImmutableList.<PColumn>of());
+ builder.setBaseTableLogicalName(parentLogicalName != null ?
+ parentLogicalName : oldTable != null ? oldTable.getBaseTableLogicalName() : null);
+ builder.setParentTableName(parentTableName != null ?
+ parentTableName : oldTable != null ? oldTable.getParentTableName() : null);
+ builder.setParentSchemaName(parentSchemaName != null ? parentSchemaName :
+ oldTable != null ? oldTable.getParentSchemaName() : null);
+
+ builder.addOrSetColumns(columns);
// Avoid querying the stats table because we're holding the rowLock here. Issuing an RPC to a remote
// server while holding this lock is a bad idea and likely to cause contention.
- return new PTableImpl.Builder()
- .setType(tableType)
- .setState(indexState)
- .setTimeStamp(timeStamp)
- .setIndexDisableTimestamp(indexDisableTimestamp)
- .setSequenceNumber(tableSeqNum)
- .setImmutableRows(isImmutableRows)
- .setViewStatement(viewStatement)
- .setDisableWAL(disableWAL)
- .setMultiTenant(multiTenant)
- .setStoreNulls(storeNulls)
- .setViewType(viewType)
- .setViewIndexIdType(viewIndexIdType)
- .setViewIndexId(viewIndexId)
- .setIndexType(indexType)
- .setTransactionProvider(transactionProvider)
- .setUpdateCacheFrequency(updateCacheFrequency)
- .setNamespaceMapped(isNamespaceMapped)
- .setAutoPartitionSeqName(autoPartitionSeq)
- .setAppendOnlySchema(isAppendOnlySchema)
- .setImmutableStorageScheme(storageScheme == null ?
- ImmutableStorageScheme.ONE_CELL_PER_COLUMN : storageScheme)
- .setQualifierEncodingScheme(encodingScheme == null ?
- QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : encodingScheme)
- .setBaseColumnCount(baseColumnCount)
- .setEncodedCQCounter(cqCounter)
- .setUseStatsForParallelization(useStatsForParallelization)
- .setPhoenixTTL(phoenixTTL)
- .setPhoenixTTLHighWaterMark(phoenixTTLHWM)
- .setExcludedColumns(ImmutableList.<PColumn>of())
- .setTenantId(tenantId)
- .setSchemaName(schemaName)
- .setTableName(tableName)
- .setPhysicalTableName(physicalTableName)
- .setPkName(pkName)
- .setDefaultFamilyName(defaultFamilyName)
- .setRowKeyOrderOptimizable(rowKeyOrderOptimizable)
- .setBucketNum(saltBucketNum)
- .setIndexes(indexes == null ? Collections.<PTable>emptyList() : indexes)
- .setParentSchemaName(parentSchemaName)
- .setParentTableName(parentTableName)
- .setBaseTableLogicalName(parentLogicalName)
- .setPhysicalNames(physicalTables == null ?
- ImmutableList.<PName>of() : ImmutableList.copyOf(physicalTables))
- .setViewModifiedUpdateCacheFrequency(viewModifiedUpdateCacheFrequency)
- .setViewModifiedUseStatsForParallelization(viewModifiedUseStatsForParallelization)
- .setViewModifiedPhoenixTTL(viewModifiedPhoenixTTL)
- .setLastDDLTimestamp(lastDDLTimestamp)
- .setIsChangeDetectionEnabled(isChangeDetectionEnabled)
- .setSchemaVersion(schemaVersion)
- .setColumns(columns)
- .build();
+ return builder.build();
}
private Long getViewIndexId(Cell[] tableKeyValues, PDataType viewIndexIdType) {
@@ -1854,6 +2006,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaName, tableName);
ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(tableKey);
long clientTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata);
+ boolean isChangeDetectionEnabled = MetaDataUtil.getChangeDetectionEnabled(tableMetadata);
+
PTable table = null;
// Get as of latest timestamp so we can detect if we have a newer table that already
// exists without making an additional query
@@ -2171,7 +2325,25 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
if (MetaDataUtil.isTableTypeDirectlyQueried(tableType)) {
tableMetadata.add(MetaDataUtil.getLastDDLTimestampUpdate(tableKey,
clientTimeStamp, EnvironmentEdgeManager.currentTimeMillis()));
+
+ //and if we're doing change detection on this table or view, notify the
+ //external schema registry and get its schema id
+ if (isChangeDetectionEnabled) {
+ try {
+ exportSchema(tableMetadata, tableKey, clientTimeStamp, clientVersion, null);
+ } catch (IOException ie){
+ //If we fail to write to the schema registry, fail the entire
+ //CREATE TABLE or VIEW operation so we stay consistent
+ LOGGER.error("Error writing schema to external schema registry", ie);
+ builder.setReturnCode(
+ MetaDataProtos.MutationCode.ERROR_WRITING_TO_SCHEMA_REGISTRY);
+ builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
+ done.run(builder.build());
+ return;
+ }
+ }
}
+
// When we drop a view we first drop the view metadata and then drop the parent->child linking row
List<Mutation> localMutations =
Lists.newArrayListWithExpectedSize(tableMetadata.size());
@@ -2240,6 +2412,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
if (newTable != null) {
builder.setTable(PTableImpl.toProto(newTable));
}
+
done.run(builder.build());
} finally {
releaseRowLocks(region, locks);
@@ -2251,6 +2424,47 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
}
}
+ private void exportSchema(List<Mutation> tableMetadata, byte[] tableKey, long clientTimestamp,
+ int clientVersion, PTable oldTable) throws SQLException, IOException {
+ List<Cell> tableCellList = MetaDataUtil.getTableCellsFromMutations(tableMetadata);
+
+ List<List<Cell>> allColumnsCellList = MetaDataUtil.getColumnAndLinkCellsFromMutations(tableMetadata);
+ //getTableFromCells assumes the Cells are sorted as they would be when reading from HBase
+ Collections.sort(tableCellList, KeyValue.COMPARATOR);
+ for (List<Cell> columnCellList : allColumnsCellList) {
+ Collections.sort(columnCellList, KeyValue.COMPARATOR);
+ }
+
+ PTable newTable = getTableFromCells(tableCellList, allColumnsCellList, clientTimestamp,
+ clientVersion, oldTable);
+ PTable parentTable = null;
+ //if this is a view, we need to get the columns from its parent table / view
+ if (newTable != null && newTable.getType().equals(PTableType.VIEW)) {
+ try (PhoenixConnection conn = (PhoenixConnection)
+ ConnectionUtil.getInputConnection(env.getConfiguration())) {
+ newTable = ViewUtil.addDerivedColumnsAndIndexesFromAncestors(conn, newTable);
+ }
+ }
+ Configuration conf = env.getConfiguration();
+ SchemaRegistryRepository exporter = SchemaRegistryRepositoryFactory.
+ getSchemaRegistryRepository(conf);
+ if (exporter != null) {
+ SchemaWriter schemaWriter = SchemaWriterFactory.getSchemaWriter(conf);
+ //we export to an external schema registry, then put the schema id
+ //to lookup the schema in the registry into SYSTEM.CATALOG so we
+ //can look it up later (and use it in WAL annotations)
+
+ //Note that if we succeed here but the write to SYSTEM.CATALOG fails,
+ //we can have "orphaned" rows in the schema registry because there's
+ //no way to make this fully atomic.
+ String externalSchemaId =
+ exporter.exportSchema(schemaWriter, newTable);
+ tableMetadata.add(MetaDataUtil.getExternalSchemaIdUpdate(tableKey,
+ externalSchemaId));
+
+ }
+ }
+
private long getViewIndexSequenceValue(PhoenixConnection connection, String tenantIdStr, PTable parentTable) throws SQLException {
int nSequenceSaltBuckets = connection.getQueryServices().getSequenceSaltBuckets();
// parentTable is parent of the view index which is the view. View table name is _IDX_+logical name of base table
@@ -2986,6 +3200,16 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
}
}
+ if (table.isChangeDetectionEnabled() || MetaDataUtil.getChangeDetectionEnabled(tableMetadata)) {
+ try {
+ exportSchema(tableMetadata, key, clientTimeStamp, clientVersion, table);
+ } catch (Exception e) {
+ LOGGER.error("Error writing to schema registry", e);
+ result = new MetaDataMutationResult(MutationCode.ERROR_WRITING_TO_SCHEMA_REGISTRY,
+ EnvironmentEdgeManager.currentTimeMillis(), table);
+ return result;
+ }
+ }
Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
GlobalCache.getInstance(this.env).getMetaDataCache();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
index 6d9f09f..9d002c0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
@@ -95,7 +95,7 @@ public abstract class MetaDataProtocol extends MetaDataService {
public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_14_0 = MIN_TABLE_TIMESTAMP + 28;
public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_15_0 = MIN_TABLE_TIMESTAMP + 29;
public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_16_0 = MIN_TABLE_TIMESTAMP + 33;
- public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_17_0 = MIN_TABLE_TIMESTAMP + 35;
+ public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_17_0 = MIN_TABLE_TIMESTAMP + 36;
// MIN_SYSTEM_TABLE_TIMESTAMP needs to be set to the max of all the MIN_SYSTEM_TABLE_TIMESTAMP_* constants
public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_SYSTEM_TABLE_TIMESTAMP_4_17_0;
// Version below which we should disallow usage of mutable secondary indexing.
@@ -181,7 +181,8 @@ public abstract class MetaDataProtocol extends MetaDataService {
UNABLE_TO_UPDATE_PARENT_TABLE,
UNABLE_TO_DELETE_CHILD_LINK,
UNABLE_TO_UPSERT_TASK,
- NO_OP
+ ERROR_WRITING_TO_SCHEMA_REGISTRY,
+ NO_OP,
}
public static class SharedTableState {
@@ -382,7 +383,7 @@ public abstract class MetaDataProtocol extends MetaDataService {
public static MetaDataMutationResult constructFromProto(MetaDataResponse proto) {
MetaDataMutationResult result = new MetaDataMutationResult();
- result.returnCode = MutationCode.values()[proto.getReturnCode().ordinal()];
+ result.returnCode = MutationCode.values()[proto.getReturnCode().getNumber()];
result.mutationTime = proto.getMutationTime();
if (proto.hasTable()) {
result.wasUpdated = true;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
index 73eb93f..4f4ddba 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
@@ -658,19 +658,10 @@ public class UngroupedAggregateRegionScanner extends BaseRegionScanner {
private void annotateDataMutations(UngroupedAggregateRegionObserver.MutationList mutationsList,
Scan scan) {
- byte[] tenantId =
- scan.getAttribute(MutationState.MutationMetadataType.TENANT_ID.toString());
- byte[] schemaName =
- scan.getAttribute(MutationState.MutationMetadataType.SCHEMA_NAME.toString());
- byte[] logicalTableName =
- scan.getAttribute(MutationState.MutationMetadataType.LOGICAL_TABLE_NAME.toString());
- byte[] tableType =
- scan.getAttribute(MutationState.MutationMetadataType.TABLE_TYPE.toString());
- byte[] ddlTimestamp =
- scan.getAttribute(MutationState.MutationMetadataType.TIMESTAMP.toString());
-
+ byte[] externalSchemaRegistryId = scan.getAttribute(
+ MutationState.MutationMetadataType.EXTERNAL_SCHEMA_ID.toString());
for (Mutation m : mutationsList) {
- annotateMutation(m, tenantId, schemaName, logicalTableName, tableType, ddlTimestamp);
+ annotateMutation(m, externalSchemaRegistryId);
}
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index b6e088b..2e0c55f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -561,7 +561,9 @@ public enum SQLExceptionCode {
"CASCADE INDEX feature is not supported for local index"),
INVALID_REGION_SPLIT_POLICY(908, "43M19",
- "REGION SPLIT POLICY is incorrect.");
+ "REGION SPLIT POLICY is incorrect."),
+ ERROR_WRITING_TO_SCHEMA_REGISTRY(909, "4320",
+ "Error writing DDL change to external schema registry");
private final int errorCode;
private final String sqlState;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 78eca85..4f67764 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -780,12 +780,13 @@ public class MutationState implements SQLCloseable {
byte[] schemaName = table.getSchemaName() != null ? table.getSchemaName().getBytes() : null;
byte[] tableName = table.getTableName() != null ? table.getTableName().getBytes() : null;
byte[] tableType = table.getType().getValue().getBytes();
+ byte[] externalSchemaRegistryId = table.getExternalSchemaId() != null ?
+ Bytes.toBytes(table.getExternalSchemaId()) : null;
//Note that we use the _HBase_ byte encoding for a Long, not the Phoenix one, so that
//downstream consumers don't need to have the Phoenix codecs.
byte[] lastDDLTimestamp =
table.getLastDDLTimestamp() != null ? Bytes.toBytes(table.getLastDDLTimestamp()) : null;
- WALAnnotationUtil.annotateMutation(mutation, tenantId, schemaName,
- tableName, tableType, lastDDLTimestamp);
+ WALAnnotationUtil.annotateMutation(mutation, externalSchemaRegistryId);
}
/**
@@ -1047,11 +1048,17 @@ public class MutationState implements SQLCloseable {
}
public enum MutationMetadataType {
+ @Deprecated
TENANT_ID,
+ @Deprecated
SCHEMA_NAME,
+ @Deprecated
LOGICAL_TABLE_NAME,
+ @Deprecated
TIMESTAMP,
- TABLE_TYPE
+ @Deprecated
+ TABLE_TYPE,
+ EXTERNAL_SCHEMA_ID
}
private static class TableInfo {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 05fb75a..36322e1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -379,6 +379,9 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
public static final String SCHEMA_VERSION = "SCHEMA_VERSION";
public static final byte[] SCHEMA_VERSION_BYTES = Bytes.toBytes(SCHEMA_VERSION);
+ public static final String EXTERNAL_SCHEMA_ID = "EXTERNAL_SCHEMA_ID";
+ public static final byte[] EXTERNAL_SCHEMA_ID_BYTES = Bytes.toBytes(EXTERNAL_SCHEMA_ID);
+
public static final String SYSTEM_CHILD_LINK_TABLE = "CHILD_LINK";
public static final String SYSTEM_CHILD_LINK_NAME = SchemaUtil.getTableName(SYSTEM_CATALOG_SCHEMA, SYSTEM_CHILD_LINK_TABLE);
public static final byte[] SYSTEM_CHILD_LINK_NAME_BYTES = Bytes.toBytes(SYSTEM_CHILD_LINK_NAME);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 14d182d..820209a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -3887,13 +3887,16 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
if (currentServerSideTableTimeStamp < MIN_SYSTEM_TABLE_TIMESTAMP_4_17_0) {
metaConnection = addColumnsIfNotExists(metaConnection,
- PhoenixDatabaseMetaData.SYSTEM_CATALOG, MIN_SYSTEM_TABLE_TIMESTAMP_4_17_0 -1,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG, MIN_SYSTEM_TABLE_TIMESTAMP_4_17_0 -2,
PhoenixDatabaseMetaData.PHYSICAL_TABLE_NAME
+ " " + PVarchar.INSTANCE.getSqlTypeName());
metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
- MIN_SYSTEM_TABLE_TIMESTAMP_4_17_0,
+ MIN_SYSTEM_TABLE_TIMESTAMP_4_17_0 -1,
PhoenixDatabaseMetaData.SCHEMA_VERSION + " " + PVarchar.INSTANCE.getSqlTypeName());
+ metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+ MIN_SYSTEM_TABLE_TIMESTAMP_4_17_0,
+ PhoenixDatabaseMetaData.EXTERNAL_SCHEMA_ID + " " + PVarchar.INSTANCE.getSqlTypeName());
}
return metaConnection;
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 3424fd5..c608e51 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -69,6 +69,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DISABLE_WAL;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ENCODING_SCHEME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.EXCEPTION_TRACE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.EXPLAIN_PLAN;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.EXTERNAL_SCHEMA_ID;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.FUNCTION_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GLOBAL_SCAN_DETAILS;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GUIDE_POSTS_ROW_COUNT;
@@ -316,6 +317,7 @@ public interface QueryConstants {
LAST_DDL_TIMESTAMP + " BIGINT, \n" +
CHANGE_DETECTION_ENABLED + " BOOLEAN, \n" +
SCHEMA_VERSION + " VARCHAR, \n" +
+ EXTERNAL_SCHEMA_ID + " VARCHAR, \n" +
// Column metadata (will be null for table row)
DATA_TYPE + " INTEGER," +
COLUMN_SIZE + " INTEGER," +
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
index 9f2f980..10f8287 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
@@ -375,6 +375,11 @@ public class DelegateTable implements PTable {
return delegate.getSchemaVersion();
}
+ @Override
+ public String getExternalSchemaId() {
+ return delegate.getExternalSchemaId();
+ }
+
@Override public Map<String, String> getPropertyValues() { return delegate.getPropertyValues(); }
@Override public Map<String, String> getDefaultPropertyValues() { return delegate.getDefaultPropertyValues(); }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index ff1035b..65e31f8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -17,13 +17,12 @@
*/
package org.apache.phoenix.schema;
+import static org.apache.phoenix.exception.SQLExceptionCode.*;
import static org.apache.phoenix.thirdparty.com.google.common.collect.Sets.newLinkedHashSet;
import static org.apache.phoenix.thirdparty.com.google.common.collect.Sets.newLinkedHashSetWithExpectedSize;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.RUN_UPDATE_STATS_ASYNC_ATTRIB;
import static org.apache.phoenix.coprocessor.tasks.IndexRebuildTask.INDEX_NAME;
import static org.apache.phoenix.coprocessor.tasks.IndexRebuildTask.REBUILD_ALL;
-import static org.apache.phoenix.exception.SQLExceptionCode.INSUFFICIENT_MULTI_TENANT_COLUMNS;
-import static org.apache.phoenix.exception.SQLExceptionCode.PARENT_TABLE_NOT_FOUND;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.APPEND_ONLY_SCHEMA;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARG_POSITION;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARRAY_SIZE;
@@ -3229,6 +3228,8 @@ public class MetaDataClient {
result.getTable().getLastDDLTimestamp() : null)
.setIsChangeDetectionEnabled(isChangeDetectionEnabledProp)
.setSchemaVersion(schemaVersion)
+ .setExternalSchemaId(result.getTable() != null ?
+ result.getTable().getExternalSchemaId() : null)
.build();
result = new MetaDataMutationResult(code, result.getMutationTime(), table, true);
addTableToCache(result);
@@ -3349,6 +3350,9 @@ public class MetaDataClient {
throwsSQLExceptionUtil(String.valueOf(code), SchemaUtil.getSchemaNameFromFullName(
parent.getPhysicalName().getString()),SchemaUtil.getTableNameFromFullName(
parent.getPhysicalName().getString()));
+ case ERROR_WRITING_TO_SCHEMA_REGISTRY:
+ throw new SQLExceptionInfo.Builder(ERROR_WRITING_TO_SCHEMA_REGISTRY)
+ .setSchemaName(schemaName).setTableName(tableName).build().buildException();
default:
// Cannot use SQLExecptionInfo here since not all mutation codes have their
// corresponding codes in the enum SQLExceptionCode
@@ -3649,6 +3653,9 @@ public class MetaDataClient {
.setSchemaName(schemaName).setTableName(tableName).build().buildException();
case TABLE_ALREADY_EXISTS:
break;
+ case ERROR_WRITING_TO_SCHEMA_REGISTRY:
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.ERROR_WRITING_TO_SCHEMA_REGISTRY).
+ setSchemaName(schemaName).setTableName(tableName).build().buildException();
default:
throw new SQLExceptionInfo.Builder(SQLExceptionCode.UNEXPECTED_MUTATION_CODE).setSchemaName(schemaName)
.setTableName(tableName).setMessage("mutation code: " + mutationCode).build().buildException();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index 2411ba5..dfa92da 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -854,6 +854,12 @@ public interface PTable extends PMetaDataEntity {
String getSchemaVersion();
/**
+ * @return String provided by an external schema registry to be used to lookup the schema for
+ * a Phoenix table or view in the registry.
+ */
+ String getExternalSchemaId();
+
+ /**
* Class to help track encoded column qualifier counters per column family.
*/
public class EncodedCQCounter {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index 1f22976..70efc26 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -71,7 +71,6 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@@ -204,6 +203,7 @@ public class PTableImpl implements PTable {
private final boolean isChangeDetectionEnabled;
private Map<String, String> propertyValues;
private String schemaVersion;
+ private String externalSchemaId;
public static class Builder {
private PTableKey key;
@@ -266,6 +266,7 @@ public class PTableImpl implements PTable {
private boolean isChangeDetectionEnabled = false;
private Map<String, String> propertyValues = new HashMap<>();
private String schemaVersion;
+ private String externalSchemaId;
// Used to denote which properties a view has explicitly modified
private BitSet viewModifiedPropSet = new BitSet(3);
@@ -524,6 +525,10 @@ public class PTableImpl implements PTable {
if (physicalTableName != null) {
propertyValues.put(PHYSICAL_TABLE_NAME, String.valueOf(physicalTableName));
}
+ if (this.physicalTableName.equals(PName.EMPTY_NAME) && physicalTableName == null) {
+ //don't override a "blank" PName with null.
+ return this;
+ }
this.physicalTableName = physicalTableName;
return this;
}
@@ -615,6 +620,32 @@ public class PTableImpl implements PTable {
return this;
}
+ public Builder addOrSetColumns(Collection<PColumn> changedColumns) {
+ if (this.columns == null || this.columns.size() == 0) {
+ //no need to merge, just take the changes as the complete set of PColumns
+ this.columns = changedColumns;
+ } else {
+ //We have to merge the old and new columns, keeping the columns in the original order
+ List<PColumn> existingColumnList = Lists.newArrayList(this.columns);
+ List<PColumn> columnsToAdd = Lists.newArrayList();
+ //create a new list that's almost a copy of this.columns, but everywhere there's
+ //a "newer" PColumn of an existing column in the parameter, replace it with the
+ //newer version
+ for (PColumn newColumn : changedColumns) {
+ int indexOf = existingColumnList.indexOf(newColumn);
+ if (indexOf != -1) {
+ existingColumnList.set(indexOf, newColumn);
+ } else {
+ columnsToAdd.add(newColumn);
+ }
+ }
+ //now tack on any completely new columns at the end
+ existingColumnList.addAll(columnsToAdd);
+ this.columns = existingColumnList;
+ }
+ return this;
+ }
+
public Builder setLastDDLTimestamp(Long lastDDLTimestamp) {
this.lastDDLTimestamp = lastDDLTimestamp;
return this;
@@ -634,6 +665,13 @@ public class PTableImpl implements PTable {
return this;
}
+ public Builder setExternalSchemaId(String externalSchemaId) {
+ if (externalSchemaId != null) {
+ this.externalSchemaId = externalSchemaId;
+ }
+ return this;
+ }
+
/**
* Populate derivable attributes of the PTable
* @return PTableImpl.Builder object
@@ -902,6 +940,7 @@ public class PTableImpl implements PTable {
this.lastDDLTimestamp = builder.lastDDLTimestamp;
this.isChangeDetectionEnabled = builder.isChangeDetectionEnabled;
this.schemaVersion = builder.schemaVersion;
+ this.externalSchemaId = builder.externalSchemaId;
}
// When cloning table, ignore the salt column as it will be added back in the constructor
@@ -925,7 +964,7 @@ public class PTableImpl implements PTable {
* Get a PTableImpl.Builder from an existing PTable
* @param table Original PTable
*/
- private static PTableImpl.Builder builderFromExisting(PTable table) {
+ public static PTableImpl.Builder builderFromExisting(PTable table) {
return new PTableImpl.Builder()
.setType(table.getType())
.setState(table.getIndexState())
@@ -978,7 +1017,8 @@ public class PTableImpl implements PTable {
.setPhoenixTTLHighWaterMark(table.getPhoenixTTLHighWaterMark())
.setLastDDLTimestamp(table.getLastDDLTimestamp())
.setIsChangeDetectionEnabled(table.isChangeDetectionEnabled())
- .setSchemaVersion(table.getSchemaVersion());
+ .setSchemaVersion(table.getSchemaVersion())
+ .setExternalSchemaId(table.getExternalSchemaId());
}
@Override
@@ -1875,6 +1915,11 @@ public class PTableImpl implements PTable {
if (table.hasSchemaVersion()) {
schemaVersion = (String) PVarchar.INSTANCE.toObject(table.getSchemaVersion().toByteArray());
}
+ String externalSchemaId = null;
+ if (table.hasExternalSchemaId()) {
+ externalSchemaId =
+ (String) PVarchar.INSTANCE.toObject(table.getExternalSchemaId().toByteArray());
+ }
try {
return new PTableImpl.Builder()
.setType(tableType)
@@ -1929,6 +1974,7 @@ public class PTableImpl implements PTable {
.setLastDDLTimestamp(lastDDLTimestamp)
.setIsChangeDetectionEnabled(isChangeDetectionEnabled)
.setSchemaVersion(schemaVersion)
+ .setExternalSchemaId(externalSchemaId)
.build();
} catch (SQLException e) {
throw new RuntimeException(e); // Impossible
@@ -2056,6 +2102,7 @@ public class PTableImpl implements PTable {
}
builder.setChangeDetectionEnabled(table.isChangeDetectionEnabled());
builder.setSchemaVersion(ByteStringer.wrap(PVarchar.INSTANCE.toBytes(table.getSchemaVersion())));
+ builder.setExternalSchemaId(ByteStringer.wrap(PVarchar.INSTANCE.toBytes(table.getExternalSchemaId())));
return builder.build();
}
@@ -2188,6 +2235,11 @@ public class PTableImpl implements PTable {
return schemaVersion;
}
+ @Override
+ public String getExternalSchemaId() {
+ return externalSchemaId;
+ }
+
private static final class KVColumnFamilyQualifier {
@Nonnull
private final String colFamilyName;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/export/DefaultSchemaRegistryRepository.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/export/DefaultSchemaRegistryRepository.java
new file mode 100644
index 0000000..35b514b
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/export/DefaultSchemaRegistryRepository.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.export;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Default in-memory implementation of SchemaRegistryRepository. Not intended for production use
+ */
+public class DefaultSchemaRegistryRepository implements SchemaRegistryRepository {
+ public static final String DEFAULT_SCHEMA_NAME = "default_schema";
+ public static final String DEFAULT_TENANT_ID = "global";
+
+ private static final String SEPARATOR = "*";
+
+ Map<String, String> schemaMap = new HashMap<String, String>();
+
+ @Override
+ public void init(Configuration conf) throws IOException {
+
+ }
+
+ @Override
+ public String exportSchema(SchemaWriter writer, PTable table) throws IOException {
+ String schemaId = getSchemaId(table);
+ schemaMap.put(schemaId, writer.exportSchema(table));
+ return schemaId;
+ }
+
+ @Override
+ public String getSchemaById(String schemaId) {
+ return schemaMap.get(schemaId);
+ }
+
+ @Override
+ public String getSchemaByTable(PTable table) {
+ return schemaMap.get(getSchemaId(table));
+ }
+
+ @Override
+ public void close() throws IOException {
+ schemaMap.clear();
+ }
+
+ public static String getSchemaId(PTable table) {
+ String schemaMetadataName = getSchemaMetadataName(table);
+ String version = table.getSchemaVersion() != null ? table.getSchemaVersion() :
+ table.getLastDDLTimestamp() != null ? table.getLastDDLTimestamp().toString() :
+ Long.toString(EnvironmentEdgeManager.currentTimeMillis());
+
+ //tenant*schema*table*version-id
+ return String.format("%s" + SEPARATOR + "%s", schemaMetadataName,
+ version);
+ }
+
+ private static String getSchemaMetadataName(PTable table) {
+ String schemaGroup = getSchemaGroup(table);
+ return schemaGroup + SEPARATOR + table.getTableName().getString();
+ }
+
+ private static String getSchemaGroup(PTable table) {
+ String tenantId = (table.getTenantId() != null) ? table.getTenantId().getString() :
+ DEFAULT_TENANT_ID;
+ String schemaName = table.getSchemaName().getString();
+ if (schemaName == null) {
+ schemaName = DEFAULT_SCHEMA_NAME;
+ }
+ return tenantId + SEPARATOR + schemaName;
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/export/DefaultSchemaWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/export/DefaultSchemaWriter.java
new file mode 100644
index 0000000..f9930cd
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/export/DefaultSchemaWriter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.schema.export;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableImpl;
+
+import java.io.IOException;
+
+public class DefaultSchemaWriter implements SchemaWriter {
+ @Override
+ public void init(Configuration conf) throws IOException {
+
+ }
+
+ @Override
+ public String exportSchema(PTable table) throws IOException {
+ return PTableImpl.toProto(table).toString();
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/export/SchemaImporter.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/export/SchemaImporter.java
new file mode 100644
index 0000000..89c0fdd
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/export/SchemaImporter.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.schema.export;
+
+import org.apache.phoenix.schema.PTable;
+
+import java.io.IOException;
+
+/**
+ * Interface for importing schemas stored externally from Phoenix into Phoenix by converting the
+ * schema into a PTable
+ */
+public interface SchemaImporter {
+
+ /**
+ *
+ * @param schema String form of an external schema. The expected format of the schema depends
+ * on the implementation of the class.
+ * @return a Phoenix PTable
+ */
+ PTable getTableFromSchema(String schema) throws IOException;
+
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/export/SchemaRegistryRepository.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/export/SchemaRegistryRepository.java
new file mode 100644
index 0000000..52a26e2
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/export/SchemaRegistryRepository.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.export;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.schema.PTable;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Interface for exporting a Phoenix object (e.g a table or view) to an external schema repository
+ * The choice of which schema repository, and the transport mechanism, are deferred to
+ * implementing classes
+ */
+public interface SchemaRegistryRepository extends Closeable {
+
+ public static final String SCHEMA_WRITER_IMPL_KEY =
+ "org.apache.phoenix.export.schemawriter.impl";
+ public static final String SCHEMA_REGISTRY_IMPL_KEY =
+ "org.apache.phoenix.export.schemaregistry.impl";
+
+ /**
+ * Optional method for any necessary bootstrapping to connect to the external schema registry
+ * @param conf Configuration object with necessary parameters to talk to the external schema
+ * registry
+ * @throws IOException Exception if something goes wrong in connecting to the registry
+ */
+ void init(Configuration conf) throws IOException;
+
+ /**
+ * Export a Phoenix PTable into an external schema registry by reformatting it into a suitable
+ * form.
+ * @param writer An object which can translate a PTable into a String suitable for the external
+ * schema registry
+ * @param table a Phoenix PTable for a table or view
+ * @returns Schema id generated by the schema registry, represented as a string.
+ * @throws IOException Exception if something goes wrong in constructing or sending the schema
+ */
+ String exportSchema(SchemaWriter writer, PTable table) throws IOException;
+
+ /**
+ * Return a schema from an external schema repository by its unique identifier
+ * @param schemaId schema identifier
+ * @return a schema
+ */
+ String getSchemaById(String schemaId);
+
+ /**
+ * Return a schema from an external schema repository using information on a PTable
+ * @param table a Phoenix PTable for a table or view
+ * @return a schema
+ */
+ String getSchemaByTable(PTable table);
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/export/SchemaRegistryRepositoryFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/export/SchemaRegistryRepositoryFactory.java
new file mode 100644
index 0000000..d20a487
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/export/SchemaRegistryRepositoryFactory.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.schema.export;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public final class SchemaRegistryRepositoryFactory {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SchemaRegistryRepository.class);
+ private static SchemaRegistryRepository exporter;
+
+ public synchronized static SchemaRegistryRepository getSchemaRegistryRepository(Configuration conf)
+ throws IOException {
+ if (exporter != null) {
+ return exporter;
+ }
+ try {
+ String className = conf.get(SchemaRegistryRepository.SCHEMA_REGISTRY_IMPL_KEY);
+ if (className == null) {
+ exporter = new DefaultSchemaRegistryRepository();
+ } else {
+ Class<SchemaRegistryRepository> clazz =
+ (Class<SchemaRegistryRepository>) Class.forName(className);
+ exporter = clazz.newInstance();
+ }
+ exporter.init(conf);
+ return exporter;
+ } catch (Exception e) {
+ LOGGER.error("Error constructing SchemaRegistryExporter object", e);
+ if (exporter != null) {
+ try {
+ exporter.close();
+ exporter = null;
+ } catch (IOException innerE) {
+ LOGGER.error("Error closing incorrectly constructed SchemaRegistryExporter", e);
+ }
+ }
+ throw new IOException(e);
+ }
+ }
+
+ public synchronized static void close() throws IOException {
+ if (exporter != null) {
+ exporter.close();
+ exporter = null;
+ }
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/export/SchemaWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/export/SchemaWriter.java
new file mode 100644
index 0000000..bef6352
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/export/SchemaWriter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.schema.export;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.schema.PTable;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Interface for classes to implement for converting Phoenix schema data, captured in a PTable, into
+ * some other format interpretable by an external system. Possible implementing classes would include
+ * converters to Avro, Protobuf, Thrift, various dialects of SQL,
+ * or other similar cross-platform data description languages.
+ */
+public interface SchemaWriter extends Closeable {
+ public static final String SCHEMA_REGISTRY_IMPL_KEY =
+ "org.apache.phoenix.export.schemawriter.impl";
+
+ /**
+ * Initialize the schema writer with appropriate configuration
+ * @param conf a Configuration object
+ * @throws IOException if something goes wrong during initialization
+ */
+ void init(Configuration conf) throws IOException;
+
+ /**
+ * Given a Phoenix PTable, output a schema document readable by some external system.
+ * @param table A Phoenix PTable describing a table or view
+ * @return a String interpretable as a data format schema in an external system
+ */
+ String exportSchema(PTable table) throws IOException;
+
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/export/SchemaWriterFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/export/SchemaWriterFactory.java
new file mode 100644
index 0000000..a47709b
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/export/SchemaWriterFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.schema.export;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+
+public class SchemaWriterFactory {
+ public static SchemaWriter getSchemaWriter(Configuration conf)
+ throws IOException {
+ try {
+ String className = conf.get(SchemaRegistryRepository.SCHEMA_WRITER_IMPL_KEY);
+ if (className == null) {
+ return new DefaultSchemaWriter();
+ }
+ Class<SchemaWriter> clazz =
+ (Class<SchemaWriter>) Class.forName(className);
+ return clazz.newInstance();
+ } catch (ClassNotFoundException | InstantiationException | IllegalAccessException |
+ ClassCastException e) {
+ throw new IOException(e);
+ }
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
index 5fdc9f9..1ae532b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
@@ -17,8 +17,7 @@
*/
package org.apache.phoenix.util;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME_INDEX;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.FAMILY_NAME_INDEX;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.*;
import static org.apache.phoenix.util.SchemaUtil.getVarChars;
import java.io.IOException;
@@ -26,6 +25,10 @@ import java.nio.charset.StandardCharsets;
import java.sql.SQLException;
import java.util.*;
+import org.apache.phoenix.schema.types.PChar;
+import org.apache.phoenix.schema.types.PTinyint;
+import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.schema.types.PVarchar;
import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.conf.Configuration;
@@ -126,6 +129,17 @@ public class MetaDataUtil {
return p;
}
+ public static Put getExternalSchemaIdUpdate(byte[] tableHeaderRowKey,
+ String externalSchemaId) {
+ //use client timestamp as the timestamp of the Cell, to match the other Cells that might
+ // be created by this DDL. But the actual value will be a _server_ timestamp
+ Put p = new Put(tableHeaderRowKey);
+ byte[] externalSchemaIdBytes = PVarchar.INSTANCE.toBytes(externalSchemaId);
+ p.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+ PhoenixDatabaseMetaData.EXTERNAL_SCHEMA_ID_BYTES, externalSchemaIdBytes);
+ return p;
+ }
+
/**
* Checks if a table is meant to be queried directly (and hence is relevant to external
* systems tracking Phoenix schema)
@@ -420,7 +434,7 @@ public class MetaDataUtil {
List<Cell> kvs = tableMutation.getFamilyCellMap().get(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES);
if (kvs != null) {
for (Cell kv : kvs) { // list is not ordered, so search. TODO: we could potentially assume the position
- if (Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength(), PhoenixDatabaseMetaData.TABLE_SEQ_NUM_BYTES, 0, PhoenixDatabaseMetaData.TABLE_SEQ_NUM_BYTES.length) == 0) {
+ if (isSequenceNumber(kv)) {
return PLong.INSTANCE.getCodec().decodeLong(kv.getValueArray(), kv.getValueOffset(), SortOrder.getDefault());
}
}
@@ -431,7 +445,21 @@ public class MetaDataUtil {
public static long getSequenceNumber(List<Mutation> tableMetaData) {
return getSequenceNumber(getPutOnlyTableHeaderRow(tableMetaData));
}
-
+
+ public static boolean isSequenceNumber(Mutation m) {
+ boolean foundSequenceNumber = false;
+ for (Cell kv : m.getFamilyCellMap().get(TABLE_FAMILY_BYTES)) {
+ if (isSequenceNumber(kv)) {
+ foundSequenceNumber = true;
+ break;
+ }
+ }
+ return foundSequenceNumber;
+ }
+ public static boolean isSequenceNumber(Cell kv) {
+ return CellUtil.matchingQualifier(kv, PhoenixDatabaseMetaData.TABLE_SEQ_NUM_BYTES);
+ }
+
public static PTableType getTableType(List<Mutation> tableMetaData, KeyValueBuilder builder,
ImmutableBytesWritable value) {
if (getMutationValue(getPutOnlyTableHeaderRow(tableMetaData),
@@ -523,6 +551,52 @@ public class MetaDataUtil {
return false;
}
+ public static List<Cell> getTableCellsFromMutations(List<Mutation> tableMetaData) {
+ List<Cell> tableCells = Lists.newArrayList();
+ byte[] tableKey = tableMetaData.get(0).getRow();
+ for (int k = 0; k < tableMetaData.size(); k++) {
+ Mutation m = tableMetaData.get(k);
+ if (Bytes.equals(m.getRow(), tableKey)) {
+ tableCells.addAll(getCellList(m));
+ }
+ }
+ return tableCells;
+ }
+
+ public static List<List<Cell>> getColumnAndLinkCellsFromMutations(List<Mutation> tableMetaData) {
+ //skip the first mutation because it's the table header row with table-specific information
+ //all the rest of the mutations are either from linking rows or column definition rows
+ List<List<Cell>> allColumnsCellList = Lists.newArrayList();
+ byte[] tableKey = tableMetaData.get(0).getRow();
+ for (int k = 1; k < tableMetaData.size(); k++) {
+ Mutation m = tableMetaData.get(k);
+ //filter out mutations for the table header row and TABLE_SEQ_NUM and parent table
+ //rows such as a view's column qualifier count
+ if (!Bytes.equals(m.getRow(), tableKey) && !(!isLinkType(m) && isSequenceNumber(m)
+ && !isParentTableColumnQualifierCounter(m, tableKey))) {
+ List<Cell> listToAdd = getCellList(m);
+ if (listToAdd != null && listToAdd.size() > 0) {
+ allColumnsCellList.add(listToAdd);
+ }
+ }
+ }
+ return allColumnsCellList;
+ }
+
+ private static List<Cell> getCellList(Mutation m) {
+ List<Cell> cellList = Lists.newArrayList();
+ for (Cell c : m.getFamilyCellMap().get(TABLE_FAMILY_BYTES)) {
+ //Mutations will mark NULL columns as deletes, whereas when we read
+ //from HBase we just won't get Cells for those columns. To use Mutation cells
+ //with code expecting Cells read from HBase results, we have to purge those
+ //Delete mutations
+ if (c != null && !CellUtil.isDelete(c)) {
+ cellList.add(c);
+ }
+ }
+ return cellList;
+ }
+
/**
* Returns the first Put element in <code>tableMetaData</code>. There could be leading Delete elements before the
* table header row
@@ -902,11 +976,10 @@ public class MetaDataUtil {
public static LinkType getLinkType(Collection<Cell> kvs) {
if (kvs != null) {
for (Cell kv : kvs) {
- if (Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength(),
- PhoenixDatabaseMetaData.LINK_TYPE_BYTES, 0,
- PhoenixDatabaseMetaData.LINK_TYPE_BYTES.length) == 0) { return LinkType
- .fromSerializedValue(PUnsignedTinyint.INSTANCE.getCodec().decodeByte(kv.getValueArray(),
- kv.getValueOffset(), SortOrder.getDefault())); }
+ if (isLinkType(kv)) {
+ return LinkType.fromSerializedValue(PUnsignedTinyint.INSTANCE.getCodec().
+ decodeByte(kv.getValueArray(), kv.getValueOffset(),
+ SortOrder.getDefault())); }
}
}
return null;
@@ -917,6 +990,48 @@ public class MetaDataUtil {
return false;
}
+ public static boolean isLinkType(Cell kv) {
+ return CellUtil.matchingQualifier(kv, PhoenixDatabaseMetaData.LINK_TYPE_BYTES);
+ }
+
+ public static boolean isLinkType(Mutation m) {
+ boolean foundLinkType = false;
+ for (Cell kv : m.getFamilyCellMap().get(TABLE_FAMILY_BYTES)) {
+ if (isLinkType(kv)) {
+ foundLinkType = true;
+ break;
+ }
+ }
+ return foundLinkType;
+ }
+
+ public static boolean isParentTableColumnQualifierCounter(Mutation m, byte[] tableRow) {
+ boolean foundCQCounter = false;
+ for (Cell kv : m.getFamilyCellMap().get(TABLE_FAMILY_BYTES)) {
+ if (isParentTableColumnQualifierCounter(kv, tableRow)) {
+ foundCQCounter = true;
+ break;
+ }
+ }
+ return foundCQCounter;
+ }
+ public static boolean isParentTableColumnQualifierCounter(Cell kv, byte[] tableRow) {
+ byte[][] tableRowKeyMetaData = new byte[5][];
+ getVarChars(tableRow, tableRowKeyMetaData);
+ byte[] tableName = tableRowKeyMetaData[TABLE_NAME_INDEX];
+
+ byte[][] columnRowKeyMetaData = new byte[5][];
+ int nColumns = getVarChars(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
+ 0, columnRowKeyMetaData);
+ if (nColumns == 5) {
+ byte[] columnTableName = columnRowKeyMetaData[TABLE_NAME_INDEX];
+ if (!Bytes.equals(tableName, columnTableName)) {
+ return CellUtil.matchingQualifier(kv, COLUMN_QUALIFIER_BYTES);
+ }
+ }
+ return false;
+ }
+
public static boolean isViewIndex(String physicalName) {
if (physicalName.contains(QueryConstants.NAMESPACE_SEPARATOR)) {
return SchemaUtil.getTableNameFromFullName(physicalName).startsWith(VIEW_INDEX_TABLE_PREFIX);
@@ -1023,6 +1138,19 @@ public class MetaDataUtil {
return getLegacyViewIndexIdDataType();
}
+ public static boolean getChangeDetectionEnabled(List<Mutation> tableMetaData) {
+ KeyValueBuilder builder = GenericKeyValueBuilder.INSTANCE;
+ ImmutableBytesWritable value = new ImmutableBytesWritable();
+ if (getMutationValue(getPutOnlyTableHeaderRow(tableMetaData),
+ PhoenixDatabaseMetaData.CHANGE_DETECTION_ENABLED_BYTES, builder, value)) {
+ return Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(value.get(),
+ value.getOffset(),
+ value.getLength()));
+ } else {
+ return false;
+ }
+ }
+
public static PColumn getColumn(int pkCount, byte[][] rowKeyMetaData, PTable table) throws ColumnFamilyNotFoundException, ColumnNotFoundException {
PColumn col = null;
if (pkCount > FAMILY_NAME_INDEX
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index e4e089e..18972fa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -1363,18 +1363,10 @@ public class ScanUtil {
*/
public static void setWALAnnotationAttributes(PTable table, Scan scan) {
if (table.isChangeDetectionEnabled()) {
- if (table.getTenantId() != null) {
- scan.setAttribute(MutationState.MutationMetadataType.TENANT_ID.toString(),
- table.getTenantId().getBytes());
+ if (table.getExternalSchemaId() != null) {
+ scan.setAttribute(MutationState.MutationMetadataType.EXTERNAL_SCHEMA_ID.toString(),
+ Bytes.toBytes(table.getExternalSchemaId()));
}
- scan.setAttribute(MutationState.MutationMetadataType.SCHEMA_NAME.toString(),
- table.getSchemaName().getBytes());
- scan.setAttribute(MutationState.MutationMetadataType.LOGICAL_TABLE_NAME.toString(),
- table.getTableName().getBytes());
- scan.setAttribute(MutationState.MutationMetadataType.TABLE_TYPE.toString(),
- table.getType().getValue().getBytes());
- scan.setAttribute(MutationState.MutationMetadataType.TIMESTAMP.toString(),
- Bytes.toBytes(table.getLastDDLTimestamp()));
}
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ViewUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ViewUtil.java
index 705acf6..4d9dd92 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ViewUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ViewUtil.java
@@ -65,18 +65,8 @@ import org.apache.phoenix.parse.DropTableStatement;
import org.apache.phoenix.parse.ParseNode;
import org.apache.phoenix.parse.SQLParser;
import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.schema.ColumnNotFoundException;
-import org.apache.phoenix.schema.MetaDataClient;
-import org.apache.phoenix.schema.PColumn;
-import org.apache.phoenix.schema.PColumnImpl;
-import org.apache.phoenix.schema.PName;
-import org.apache.phoenix.schema.PNameFactory;
-import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.*;
import org.apache.phoenix.schema.PTable.LinkType;
-import org.apache.phoenix.schema.PTableImpl;
-import org.apache.phoenix.schema.PTableType;
-import org.apache.phoenix.schema.SaltingUtil;
-import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.types.PBoolean;
import org.apache.phoenix.schema.types.PLong;
import org.slf4j.Logger;
@@ -567,7 +557,38 @@ public class ViewUtil {
}
}
}
-
+
+ public static PTable addDerivedColumnsAndIndexesFromAncestors(PhoenixConnection connection,
+ PTable table) throws SQLException {
+ List<PTable> ancestorList = Lists.newArrayList(table);
+ //First generate a list of tables from child to base table. First element will be the
+ //ultimate descendant, last element will be the base table.
+ PName parentName = table.getParentName();
+ while (parentName != null && parentName.getString().length() > 0) {
+ PTable currentTable = ancestorList.get(ancestorList.size() -1);
+ String parentTableName = SchemaUtil.getTableName(currentTable.getParentSchemaName(),
+ currentTable.getParentTableName()).getString();
+ PTable parentTable;
+ try {
+ parentTable = PhoenixRuntime.getTable(connection, parentTableName);
+ } catch (TableNotFoundException tnfe) {
+ //check to see if there's a tenant-owned parent
+ parentTable = PhoenixRuntime.getTable(connection, table.getTenantId().getString(), parentTableName);
+ }
+ ancestorList.add(parentTable);
+ parentName = parentTable.getParentName();
+ }
+ //now add the columns from all ancestors up from the base table to the top-most view
+ if (ancestorList.size() > 1) {
+ for (int k = ancestorList.size() -2; k >= 0; k--) {
+ ancestorList.set(k, addDerivedColumnsAndIndexesFromParent(connection,
+ ancestorList.get(k), ancestorList.get(k +1)));
+ }
+ return ancestorList.get(0);
+ } else {
+ return table;
+ }
+ }
/**
* Inherit all indexes and columns from the parent
* @return table with inherited columns and indexes
@@ -609,6 +630,17 @@ public class ViewUtil {
* @return table with inherited columns
*/
public static PTable addDerivedColumnsFromParent(PTable view, PTable parentTable)
+ throws SQLException {
+ return addDerivedColumnsFromParent(view, parentTable, true);
+ }
+ /**
+ * Inherit all columns from the parent unless it's an excluded column.
+ * If the same column is present in the parent and child (for table metadata created before
+ * PHOENIX-3534) we choose the child column over the parent column
+ * @return table with inherited columns
+ */
+ public static PTable addDerivedColumnsFromParent(PTable view, PTable parentTable,
+ boolean recalculateBaseColumnCount)
throws SQLException {
// combine columns for view and view indexes
boolean hasIndexId = view.getViewIndexId() != null;
@@ -730,9 +762,12 @@ public class ViewUtil {
}
// we need to include the salt column when setting the base table column count in order to
// maintain b/w compatibility
- int baseTableColumnCount =
- isDiverged ? QueryConstants.DIVERGED_VIEW_BASE_COLUMN_COUNT
- : columnsToAdd.size() - myColumns.size() + (isSalted ? 1 : 0);
+ int baseTableColumnCount = view.getBaseColumnCount();
+ if (recalculateBaseColumnCount) {
+ baseTableColumnCount = isDiverged ?
+ QueryConstants.DIVERGED_VIEW_BASE_COLUMN_COUNT :
+ columnsToAdd.size() - myColumns.size() + (isSalted ? 1 : 0);
+ }
// Inherit view-modifiable properties from the parent table/view if the current view has
// not previously modified this property
long updateCacheFreq = (view.getType() != PTableType.VIEW ||
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/WALAnnotationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/WALAnnotationUtil.java
index 76564a6..4dba27a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/WALAnnotationUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/WALAnnotationUtil.java
@@ -58,17 +58,12 @@ public class WALAnnotationUtil {
* @param ddlTimestamp Server-side timestamp when the table/view was created or last had a
* column added or dropped from it, whichever is greater
*/
- public static void annotateMutation(Mutation m, byte[] tenantId, byte[] schemaName,
- byte[] logicalTableName, byte[] tableType, byte[] ddlTimestamp) {
+ public static void annotateMutation(Mutation m, byte[] externalSchemaId) {
if (!m.getDurability().equals(Durability.SKIP_WAL)) {
- if (tenantId != null) {
- m.setAttribute(MutationState.MutationMetadataType.TENANT_ID.toString(), tenantId);
+ if (externalSchemaId != null) {
+ m.setAttribute(MutationState.MutationMetadataType.EXTERNAL_SCHEMA_ID.toString(),
+ externalSchemaId);
}
- m.setAttribute(MutationState.MutationMetadataType.SCHEMA_NAME.toString(), schemaName);
- m.setAttribute(MutationState.MutationMetadataType.LOGICAL_TABLE_NAME.toString(),
- logicalTableName);
- m.setAttribute(MutationState.MutationMetadataType.TABLE_TYPE.toString(), tableType);
- m.setAttribute(MutationState.MutationMetadataType.TIMESTAMP.toString(), ddlTimestamp);
}
}
}
diff --git a/phoenix-core/src/main/protobuf/MetaDataService.proto b/phoenix-core/src/main/protobuf/MetaDataService.proto
index 0af87e4..63c79ae 100644
--- a/phoenix-core/src/main/protobuf/MetaDataService.proto
+++ b/phoenix-core/src/main/protobuf/MetaDataService.proto
@@ -54,6 +54,7 @@ enum MutationCode {
UNABLE_TO_UPDATE_PARENT_TABLE = 24;
UNABLE_TO_DELETE_CHILD_LINK = 25;
UNABLE_TO_UPSERT_TASK = 26;
+ ERROR_WRITING_TO_SCHEMA_REGISTRY = 27;
};
message SharedTableState {
diff --git a/phoenix-core/src/main/protobuf/PTable.proto b/phoenix-core/src/main/protobuf/PTable.proto
index 1a70868..45e1b6e 100644
--- a/phoenix-core/src/main/protobuf/PTable.proto
+++ b/phoenix-core/src/main/protobuf/PTable.proto
@@ -114,6 +114,7 @@ message PTable {
optional bytes physicalTableNameBytes = 47;
optional bytes baseTableLogicalNameBytes = 48;
optional bytes schemaVersion = 49;
+ optional bytes externalSchemaId=50;
}
message EncodedCQCounter {