You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by go...@apache.org on 2022/01/05 01:47:42 UTC
[phoenix] branch 4.x updated: PHOENIX-6617 IndexRegionObserver changes for creating mutations for transforming table (#701)
This is an automated email from the ASF dual-hosted git repository.
gokcen pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push:
new a6c5f71 PHOENIX-6617 IndexRegionObserver changes for creating mutations for transforming table (#701)
a6c5f71 is described below
commit a6c5f711b2b89547c2b4bb280c47e3d522b10362
Author: Gokcen Iskender <gi...@salesforce.com>
AuthorDate: Thu Aug 26 14:34:47 2021 -0700
PHOENIX-6617 IndexRegionObserver changes for creating mutations for transforming table (#701)
Signed-off-by: Gokcen Iskender <go...@gmail.com>
---
.../apache/phoenix/end2end/AppendOnlySchemaIT.java | 2 +-
.../phoenix/end2end/LogicalTableNameBaseIT.java | 2 +-
.../apache/phoenix/end2end/TransformToolIT.java | 67 ++++++++
.../apache/phoenix/end2end/index/TransformIT.java | 180 +++++++++++++++++++++
.../phoenix/coprocessor/MetaDataEndpointImpl.java | 30 +++-
.../UngroupedAggregateRegionScanner.java | 1 +
.../org/apache/phoenix/execute/MutationState.java | 1 +
.../phoenix/hbase/index/IndexRegionObserver.java | 14 +-
.../org/apache/phoenix/index/IndexMaintainer.java | 23 ++-
.../phoenix/index/IndexMetaDataCacheClient.java | 4 +
.../apache/phoenix/index/PhoenixIndexCodec.java | 11 +-
.../apache/phoenix/index/PhoenixIndexMetaData.java | 5 +-
.../phoenix/jdbc/PhoenixPreparedStatement.java | 2 +-
.../org/apache/phoenix/jdbc/PhoenixStatement.java | 14 +-
.../phoenix/mapreduce/transform/TransformTool.java | 2 -
.../phoenix/query/ConnectionQueryServices.java | 1 +
.../phoenix/query/ConnectionQueryServicesImpl.java | 6 +-
.../query/ConnectionlessQueryServicesImpl.java | 1 +
.../query/DelegateConnectionQueryServices.java | 3 +-
.../org/apache/phoenix/schema/DelegateTable.java | 3 +
.../org/apache/phoenix/schema/MetaDataClient.java | 21 ++-
.../java/org/apache/phoenix/schema/PTable.java | 12 +-
.../java/org/apache/phoenix/schema/PTableImpl.java | 33 +++-
.../apache/phoenix/schema/transform/Transform.java | 93 +++++++++--
.../schema/transform/TransformMaintainer.java | 20 +++
.../java/org/apache/phoenix/util/ViewUtil.java | 29 +++-
.../src/main/protobuf/MetaDataService.proto | 1 +
phoenix-core/src/main/protobuf/PTable.proto | 1 +
28 files changed, 540 insertions(+), 42 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AppendOnlySchemaIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AppendOnlySchemaIT.java
index a865121..6eb05bb 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AppendOnlySchemaIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AppendOnlySchemaIT.java
@@ -139,7 +139,7 @@ public class AppendOnlySchemaIT extends ParallelStatsDisabledIT {
// if not verify exists is true one call to add column table with empty mutation list (which does not make a rpc)
// else verify no add column calls
verify(connectionQueryServices, notExists ? times(1) : never() )
- .addColumn(eq(Collections.<Mutation>emptyList()), any(PTable.class),
+ .addColumn(eq(Collections.<Mutation>emptyList()), any(PTable.class), any(PTable.class),
any(PTable.class), anyMap(), anySetOf(String.class),
anyListOf(PColumn.class));
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameBaseIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameBaseIT.java
index 379ab58..72037f2 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameBaseIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameBaseIT.java
@@ -34,13 +34,13 @@ import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.types.PInteger;
-import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.StringUtil;
+import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
import java.sql.Connection;
import java.sql.DriverManager;
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TransformToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TransformToolIT.java
index 276f08b..5002c8c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TransformToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TransformToolIT.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.end2end.index.SingleCellIndexIT;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.transform.TransformTool;
import org.apache.phoenix.query.QueryServices;
@@ -60,6 +61,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
public class TransformToolIT extends ParallelStatsDisabledIT{
private static final Logger LOGGER = LoggerFactory.getLogger(TransformToolIT.class);
@@ -406,6 +408,71 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
}
}
+ @Test
+ public void testTransformMutationFailureRepair() throws Exception {
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+ IndexRegionObserver.setFailPreIndexUpdatesForTesting(true);
+ int numOfRows = 0;
+ createTableAndUpsertRows(conn, dataTableFullName, numOfRows);
+ SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableFullName);
+
+ conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
+ " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+ SystemTransformRecord record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
+ assertNotNull(record);
+ assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, record.getNewPhysicalTableName());
+
+ String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName);
+ PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
+ try {
+ IndexToolIT.upsertRow(stmt1, 1);
+ fail("Transform table upsert should have failed");
+ } catch (Exception e) {
+ }
+ assertEquals(0, getRowCount(conn, record.getNewPhysicalTableName()));
+ assertEquals(0, getRowCount(conn,dataTableFullName));
+
+ IndexRegionObserver.setFailPreIndexUpdatesForTesting(false);
+ IndexRegionObserver.setFailPostIndexUpdatesForTesting(true);
+ IndexToolIT.upsertRow(stmt1, 2);
+
+ assertEquals(1, getRowCount(conn, record.getNewPhysicalTableName()));
+ assertEquals(1, getRowCount(conn,dataTableFullName));
+
+ IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
+ IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
+ try {
+ IndexToolIT.upsertRow(stmt1, 3);
+ fail("Data table upsert should have failed");
+ } catch (Exception e) {
+ }
+ assertEquals(2, getRowCount(conn, record.getNewPhysicalTableName()));
+ assertEquals(1, getRowCount(conn,dataTableFullName));
+
+ IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
+
+ List<String> args = getArgList(schemaName, dataTableName, null,
+ null, null, null, false, false, false, false);
+ runTransformTool(args.toArray(new String[0]), 0);
+
+ //TODO: Implement GlobalIndexChecker like repair for unverified rows
+ assertEquals(1, getRowCount(conn.unwrap(PhoenixConnection.class).getQueryServices()
+ .getTable(Bytes.toBytes(dataTableFullName)), false));
+// assertEquals(1, getRowCount(conn.unwrap(PhoenixConnection.class).getQueryServices()
+// .getTable(Bytes.toBytes(record.getNewPhysicalTableName())), false));
+ } finally {
+ IndexRegionObserver.setFailPreIndexUpdatesForTesting(false);
+ IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
+ IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
+ }
+ }
+
public static List<String> getArgList(String schemaName, String dataTable, String indxTable, String tenantId,
Long startTime, Long endTime,
boolean shouldAbort, boolean shouldPause, boolean shouldResume, boolean isPartial) {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TransformIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TransformIT.java
index 5fc8b7a..136da65 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TransformIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TransformIT.java
@@ -20,10 +20,14 @@ package org.apache.phoenix.end2end.index;
import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.transform.SystemTransformRecord;
+import org.apache.phoenix.schema.transform.Transform;
import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.SchemaUtil;
import org.junit.Before;
import org.junit.Test;
@@ -36,8 +40,10 @@ import java.util.Properties;
import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN;
import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.apache.phoenix.util.TestUtil.getRowCount;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -163,6 +169,180 @@ public class TransformIT extends ParallelStatsDisabledIT {
}
}
+ @Test
+ public void testTransformForLiveMutations_mutatingTable() throws Exception {
+ try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
+ conn.setAutoCommit(true);
+ String schema = "S_" + generateUniqueName();
+ String tableName = "TBL_" + generateUniqueName();
+ String idxName = "IND_" + generateUniqueName();
+ String fullTableName = SchemaUtil.getTableName(schema, tableName);
+ String fullIdxName = SchemaUtil.getTableName(schema, idxName);
+
+ String createTableSql = "CREATE TABLE " + fullTableName + " (PK1 VARCHAR NOT NULL, INT_PK INTEGER NOT NULL, " +
+ "V1 VARCHAR, V2 INTEGER CONSTRAINT NAME_PK PRIMARY KEY(PK1, INT_PK)) ";
+ conn.createStatement().execute(createTableSql);
+
+ String upsertStmt = "UPSERT INTO " + fullTableName + " (PK1, INT_PK, V1, V2) VALUES ('%s', %d, '%s', %d)";
+ conn.createStatement().execute(String.format(upsertStmt, "a", 1, "val1", 1));
+
+ // Note that index will not be built, since we create it with ASYNC
+ String createIndexSql = "CREATE INDEX " + idxName + " ON " + fullTableName + " (PK1, INT_PK) include (V1) ASYNC";
+ conn.createStatement().execute(createIndexSql);
+ assertMetadata(conn, ONE_CELL_PER_COLUMN, NON_ENCODED_QUALIFIERS, fullTableName);
+ assertMetadata(conn, ONE_CELL_PER_COLUMN, NON_ENCODED_QUALIFIERS, fullIdxName);
+
+ String idxName2 = "IND2_" + generateUniqueName();
+ String fullIdxName2 = SchemaUtil.getTableName(schema, idxName2);
+ conn.createStatement().execute("CREATE INDEX " + idxName2 + " ON " + fullTableName + " (V1) include (V2) ASYNC");
+
+ // Now do a transform and check still the index table is empty since we didn't build it
+ conn.createStatement().execute("ALTER TABLE " + fullTableName + " SET "
+ + " IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+ SystemTransformRecord record = Transform.getTransformRecord(schema, tableName, null, null, conn.unwrap(PhoenixConnection.class));
+ assertNotNull(record);
+ assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, record.getNewPhysicalTableName());
+ assertEquals(0, getRowCount(conn, fullIdxName));
+ assertEquals(0, getRowCount(conn, fullIdxName2));
+
+ // Now these live mutations should go into the index tables and the transforming table
+ conn.createStatement().execute(String.format(upsertStmt, "b", 2, "val2", 2));
+ conn.commit();
+ conn.createStatement().execute(String.format(upsertStmt, "c", 3, "val3", 3));
+ assertEquals(2, getRowCount(conn, fullIdxName));
+ assertEquals(2, getRowCount(conn, fullIdxName2));
+ assertEquals(2, getRowCount(conn, record.getNewPhysicalTableName()));
+
+ // Delete one mutation
+ conn.createStatement().execute("DELETE FROM " + fullTableName + " WHERE PK1='b'");
+ assertEquals(1, getRowCount(conn, fullIdxName));
+ assertEquals(1, getRowCount(conn, fullIdxName2));
+ assertEquals(1, getRowCount(conn, record.getNewPhysicalTableName()));
+ }
+ }
+
+ @Test
+ public void testTransformForLiveMutations_mutatingBaseTableNoIndex() throws Exception {
+ try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
+ conn.setAutoCommit(true);
+ String schema = "S_" + generateUniqueName();
+ String tableName = "TBL_" + generateUniqueName();
+ String fullTableName = SchemaUtil.getTableName(schema, tableName);
+
+ String createTableSql = "CREATE TABLE " + fullTableName + " (PK1 VARCHAR NOT NULL, INT_PK INTEGER NOT NULL, " +
+ "V1 VARCHAR, V2 INTEGER CONSTRAINT NAME_PK PRIMARY KEY(PK1, INT_PK)) ";
+ conn.createStatement().execute(createTableSql);
+ assertMetadata(conn, ONE_CELL_PER_COLUMN, NON_ENCODED_QUALIFIERS, fullTableName);
+
+ String upsertStmt = "UPSERT INTO " + fullTableName + " (PK1, INT_PK, V1, V2) VALUES ('%s', %d, '%s', %d)";
+ conn.createStatement().execute(String.format(upsertStmt, "a", 1, "val1", 1));
+
+ conn.createStatement().execute("ALTER TABLE " + fullTableName + " SET "
+ + " IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+ SystemTransformRecord record = Transform.getTransformRecord(schema, tableName, null, null, conn.unwrap(PhoenixConnection.class));
+ assertNotNull(record);
+ assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, record.getNewPhysicalTableName());
+
+ conn.createStatement().execute(String.format(upsertStmt, "b", 2, "val2", 2));
+ conn.commit();
+ conn.createStatement().execute(String.format(upsertStmt, "c", 3, "val3", 3));
+ assertEquals(2, getRowCount(conn, record.getNewPhysicalTableName()));
+
+ // Delete one mutation
+ conn.createStatement().execute("DELETE FROM " + fullTableName + " WHERE PK1='b'");
+ assertEquals(1, getRowCount(conn, record.getNewPhysicalTableName()));
+ }
+ }
+
+ @Test
+ public void testTransformForLiveMutations_mutatingIndex() throws Exception {
+ try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
+ conn.setAutoCommit(true);
+ String schema = "S_" + generateUniqueName();
+ String tableName = "TBL_" + generateUniqueName();
+ String idxName = "IND_" + generateUniqueName();
+ String fullTableName = SchemaUtil.getTableName(schema, tableName);
+ String fullIdxName = SchemaUtil.getTableName(schema, idxName);
+
+ String createTableSql = "CREATE TABLE " + fullTableName + " (PK1 VARCHAR NOT NULL, INT_PK INTEGER NOT NULL, " +
+ "V1 VARCHAR, V2 INTEGER CONSTRAINT NAME_PK PRIMARY KEY(PK1, INT_PK)) ";
+ conn.createStatement().execute(createTableSql);
+
+ // Note that index will not be built, since we create it with ASYNC
+ String createIndexSql = "CREATE INDEX " + idxName + " ON " + fullTableName + " (PK1, INT_PK) include (V1) ASYNC";
+ conn.createStatement().execute(createIndexSql);
+ assertMetadata(conn, ONE_CELL_PER_COLUMN, NON_ENCODED_QUALIFIERS, fullIdxName);
+
+ conn.createStatement().execute("ALTER INDEX " + idxName + " ON " + fullTableName +
+ " ACTIVE IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+
+ SystemTransformRecord record = Transform.getTransformRecord(schema, idxName, fullTableName, null, conn.unwrap(PhoenixConnection.class));
+ assertNotNull(record);
+ assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, record.getNewPhysicalTableName());
+
+ String upsertStmt = "UPSERT INTO " + fullTableName + " (PK1, INT_PK, V1, V2) VALUES ('%s', %d, '%s', %d)";
+ // Now these live mutations should go into the index tables and the transforming table
+ conn.createStatement().execute(String.format(upsertStmt, "b", 2, "val2", 2));
+ conn.createStatement().execute(String.format(upsertStmt, "c", 3, "val3", 3));
+ assertEquals(2, getRowCount(conn, fullIdxName));
+ assertEquals(2, getRowCount(conn, record.getNewPhysicalTableName()));
+
+ // Delete one mutation
+ conn.createStatement().execute("DELETE FROM " + fullTableName + " WHERE PK1='b'");
+ assertEquals(1, getRowCount(conn, fullIdxName));
+ assertEquals(1, getRowCount(conn, record.getNewPhysicalTableName()));
+ }
+
+ }
+
+ @Test
+ public void testTransformForLiveMutations_mutatingBaseTableForView() throws Exception {
+ try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
+ conn.setAutoCommit(true);
+ String schema = "S_" + generateUniqueName();
+ String tableName = "TBL_" + generateUniqueName();
+ String fullTableName = SchemaUtil.getTableName(schema, tableName);
+ String parentViewName = "VWP_" + generateUniqueName();
+ String viewName = "VW_" + generateUniqueName();
+ String viewIdxName = "VWIDX_" + generateUniqueName();
+
+ String createTableSql = "CREATE TABLE " + fullTableName + " (PK1 VARCHAR NOT NULL, INT_PK INTEGER NOT NULL, " +
+ "V1 VARCHAR, V2 INTEGER CONSTRAINT NAME_PK PRIMARY KEY(PK1, INT_PK)) ";
+ conn.createStatement().execute(createTableSql);
+ assertMetadata(conn, ONE_CELL_PER_COLUMN, NON_ENCODED_QUALIFIERS, fullTableName);
+
+ String createParentViewSql = "CREATE VIEW " + parentViewName + " ( PARENT_VIEW_COL1 VARCHAR ) AS SELECT * FROM " + fullTableName;
+ conn.createStatement().execute(createParentViewSql);
+
+ String createViewSql = "CREATE VIEW " + viewName + " ( VIEW_COL1 INTEGER, VIEW_COL2 VARCHAR ) AS SELECT * FROM " + parentViewName;
+ conn.createStatement().execute(createViewSql);
+
+ String createViewIdxSql = "CREATE INDEX " + viewIdxName + " ON " + viewName + " (VIEW_COL1) include (VIEW_COL2) ";
+ conn.createStatement().execute(createViewIdxSql);
+
+ String upsertStmt = "UPSERT INTO " + viewName + " (PK1, INT_PK, V1, VIEW_COL1, VIEW_COL2) VALUES ('%s', %d, '%s', %d, '%s')";
+ conn.createStatement().execute(String.format(upsertStmt, "a", 1, "val1", 1, "col2_1"));
+
+ conn.createStatement().execute("ALTER TABLE " + fullTableName + " SET "
+ + " IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+ SystemTransformRecord record = Transform.getTransformRecord(schema, tableName, null, null, conn.unwrap(PhoenixConnection.class));
+ assertNotNull(record);
+ assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, record.getNewPhysicalTableName());
+
+ conn.createStatement().execute(String.format(upsertStmt, "b", 2, "val2", 2, "col2_2"));
+ conn.createStatement().execute(String.format(upsertStmt, "c", 3, "val3", 3, "col2_3"));
+ assertEquals(3, getRowCount(conn, viewName));
+ assertEquals(3, getRowCount(conn, viewIdxName));
+ // New table has 1 less since it is not aware of the previous record since we didn't run the TransformTool
+ assertEquals(2, getRowCount(conn, record.getNewPhysicalTableName()));
+
+ conn.createStatement().execute("DELETE FROM " + viewName + " WHERE VIEW_COL1=2");
+ assertEquals(1, getRowCount(conn, record.getNewPhysicalTableName()));
+ assertEquals(2, getRowCount(conn, viewName));
+ assertEquals(2, getRowCount(conn, viewIdxName));
+ }
+ }
+
private void assertSystemTransform(Connection conn, int rowCount, String schema, String logicalTableName, String tenantId) throws SQLException {
ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ count(*) FROM "+
PhoenixDatabaseMetaData.SYSTEM_TRANSFORM_NAME);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 37c6fa8..da5c223 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -82,8 +82,8 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_DATA
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE_BYTES;
import static org.apache.phoenix.query.QueryConstants.VIEW_MODIFIED_PROPERTY_TAG_TYPE;
+import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone;
import static org.apache.phoenix.schema.PTableType.INDEX;
-import static org.apache.phoenix.schema.PTableType.VIEW;
import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
import static org.apache.phoenix.util.SchemaUtil.getVarCharLength;
import static org.apache.phoenix.util.SchemaUtil.getVarChars;
@@ -93,7 +93,6 @@ import static org.apache.phoenix.util.ViewUtil.getSystemTableForChildLinks;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.PrivilegedExceptionAction;
-import java.sql.Connection;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
@@ -108,6 +107,7 @@ import java.util.NavigableMap;
import java.util.Properties;
import java.util.Set;
+import org.apache.directory.api.util.Strings;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
@@ -227,6 +227,8 @@ import org.apache.phoenix.schema.metrics.MetricsMetadataSource;
import org.apache.phoenix.schema.metrics.MetricsMetadataSourceFactory;
import org.apache.phoenix.schema.task.SystemTaskParams;
import org.apache.phoenix.schema.task.Task;
+import org.apache.phoenix.schema.transform.SystemTransformRecord;
+import org.apache.phoenix.schema.transform.Transform;
import org.apache.phoenix.schema.types.PBinary;
import org.apache.phoenix.schema.types.PBoolean;
import org.apache.phoenix.schema.types.PDataType;
@@ -1415,6 +1417,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
builder.setTimeStamp(timeStamp);
+ PTable transformingNewTable = null;
boolean isRegularView = (tableType == PTableType.VIEW && viewType != ViewType.MAPPED);
for (List<Cell> columnCellList : allColumnCellList) {
@@ -1487,6 +1490,17 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
} else if (linkType == LinkType.EXCLUDED_COLUMN) {
// add the excludedColumn
addExcludedColumnToTable(columns, colName, famName, colKv.getTimestamp());
+ } else if (linkType == LinkType.TRANSFORMING_NEW_TABLE) {
+ transformingNewTable = doGetTable((tenantId == null ? ByteUtil.EMPTY_BYTE_ARRAY : tenantId.getBytes())
+ , SchemaUtil.getSchemaNameFromFullName(famName.getBytes()).getBytes(), SchemaUtil.getTableNameFromFullName(famName.getBytes()).getBytes(), clientTimeStamp, null, clientVersion);
+ if (transformingNewTable == null) {
+ // It could be global
+ transformingNewTable = doGetTable(ByteUtil.EMPTY_BYTE_ARRAY
+ , SchemaUtil.getSchemaNameFromFullName(famName.getBytes()).getBytes(), SchemaUtil.getTableNameFromFullName(famName.getBytes()).getBytes(), clientTimeStamp, null, clientVersion);
+ if (transformingNewTable == null) {
+ ServerUtil.throwIOException("Transforming new table not found", new TableNotFoundException(schemaName.getString(), famName.getString()));
+ }
+ }
}
} else {
long columnTimestamp =
@@ -1510,6 +1524,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
if (!setPhysicalName && oldTable != null) {
builder.setPhysicalTableName(oldTable.getPhysicalName());
}
+ builder.setTransformingNewTable(transformingNewTable);
builder.setExcludedColumns(ImmutableList.<PColumn>of());
builder.setBaseTableLogicalName(parentLogicalName != null ?
@@ -3043,7 +3058,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
private MetaDataMutationResult mutateColumn(
final List<Mutation> tableMetadata,
final ColumnMutator mutator, final int clientVersion,
- final PTable parentTable, boolean isAddingOrDroppingColumns) throws IOException {
+ final PTable parentTable, final PTable transformingNewTable, boolean isAddingOrDroppingColumns) throws IOException {
byte[][] rowKeyMetaData = new byte[5][];
MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata, rowKeyMetaData);
byte[] tenantId = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
@@ -3124,6 +3139,10 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
parentTable);
}
}
+ if (transformingNewTable !=null) {
+ table = PTableImpl.builderWithColumns(table, getColumnsToClone(table))
+ .setTransformingNewTable(transformingNewTable).build();
+ }
if (table.getTimeStamp() >= clientTimeStamp) {
LOGGER.info("Found newer table as of " + table.getTimeStamp()
@@ -3417,9 +3436,10 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
try {
List<Mutation> tableMetaData = ProtobufUtil.getMutations(request);
PTable parentTable = request.hasParentTable() ? PTableImpl.createFromProto(request.getParentTable()) : null;
+ PTable transformingNewTable = request.hasTransformingNewTable() ? PTableImpl.createFromProto(request.getTransformingNewTable()) : null;
boolean addingColumns = request.getAddingColumns();
MetaDataMutationResult result = mutateColumn(tableMetaData, new AddColumnMutator(),
- request.getClientVersion(), parentTable, addingColumns);
+ request.getClientVersion(), parentTable, transformingNewTable, addingColumns);
if (result != null) {
done.run(MetaDataMutationResult.toProto(result));
}
@@ -3557,7 +3577,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
tableMetaData = ProtobufUtil.getMutations(request);
PTable parentTable = request.hasParentTable() ? PTableImpl.createFromProto(request.getParentTable()) : null;
MetaDataMutationResult result = mutateColumn(tableMetaData, new DropColumnMutator(env.getConfiguration()),
- request.getClientVersion(), parentTable, true);
+ request.getClientVersion(), parentTable,null, true);
if (result != null) {
done.run(MetaDataMutationResult.toProto(result));
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
index 4f4ddba..d2d8b16 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
@@ -211,6 +211,7 @@ public class UngroupedAggregateRegionScanner extends BaseRegionScanner {
}
indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes, useProto);
indexMutations = localIndexBytes == null ? new UngroupedAggregateRegionObserver.MutationList() : new UngroupedAggregateRegionObserver.MutationList(1024);
+ byte[] transforming = scan.getAttribute(BaseScannerRegionObserver.DO_TRANSFORMING);
replayMutations = scan.getAttribute(REPLAY_WRITES);
indexUUID = scan.getAttribute(PhoenixIndexCodec.INDEX_UUID);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 4f67764..2289637 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -1197,6 +1197,7 @@ public class MutationState implements SQLCloseable {
private void sendBatch(Map<TableRef, MultiRowMutationState> commitBatch, long[] serverTimeStamps, boolean sendAll) throws SQLException {
int i = 0;
Map<TableInfo, List<Mutation>> physicalTableMutationMap = Maps.newLinkedHashMap();
+
// add tracing for this operation
try (TraceScope trace = Tracing.startNewSpan(connection, "Committing mutations to tables")) {
Span span = trace.getSpan();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 90e267a..ad36f41 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -91,6 +91,7 @@ import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableImpl;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.transform.TransformMaintainer;
import org.apache.phoenix.schema.types.PVarbinary;
import org.apache.phoenix.trace.TracingUtils;
import org.apache.phoenix.trace.util.NullSpan;
@@ -1037,6 +1038,15 @@ public class IndexRegionObserver extends CompatIndexRegionObserver {
return false;
}
+ private static boolean isTransforming(PhoenixIndexMetaData indexMetaData) {
+ for (IndexMaintainer indexMaintainer : indexMetaData.getIndexMaintainers()) {
+ if (indexMaintainer instanceof TransformMaintainer) {
+ return true;
+ }
+ }
+ return false;
+ }
+
private void waitForPreviousConcurrentBatch(TableName table, BatchMutateContext context)
throws Throwable {
boolean done;
@@ -1105,7 +1115,7 @@ public class IndexRegionObserver extends CompatIndexRegionObserver {
boolean hasAtomic = hasAtomicUpdate(miniBatchOp);
long onDupCheckTime = 0;
- if (hasAtomic || hasGlobalIndex(indexMetaData)) {
+ if (hasAtomic || hasGlobalIndex(indexMetaData) || isTransforming(indexMetaData)) {
// Retrieve the current row states from the data table while holding the lock.
// This is needed for both atomic mutations and global indexes
long start = EnvironmentEdgeManager.currentTimeMillis();
@@ -1136,7 +1146,7 @@ public class IndexRegionObserver extends CompatIndexRegionObserver {
setTimestamps(miniBatchOp, builder, now);
TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
- if (hasGlobalIndex(indexMetaData)) {
+ if (hasGlobalIndex(indexMetaData) || isTransforming(indexMetaData)) {
// Prepare next data rows states for pending mutations (for global indexes)
prepareDataRowStates(c, miniBatchOp, context, now);
// Add the table rows in the mini batch to the collection of pending rows. This will be used to detect
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index 1ba7651..9308b1e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -39,6 +39,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
@@ -99,6 +100,7 @@ import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.ValueSchema;
import org.apache.phoenix.schema.ValueSchema.Field;
+import org.apache.phoenix.schema.transform.TransformMaintainer;
import org.apache.phoenix.schema.tuple.BaseTuple;
import org.apache.phoenix.schema.tuple.ValueGetterTuple;
import org.apache.phoenix.schema.types.PDataType;
@@ -224,11 +226,14 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
*/
public static void serialize(PTable dataTable, ImmutableBytesWritable ptr,
List<PTable> indexes, PhoenixConnection connection) {
- if (indexes.isEmpty()) {
+ if (indexes.isEmpty() && dataTable.getTransformingNewTable() == null) {
ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
return;
}
int nIndexes = indexes.size();
+ if (dataTable.getTransformingNewTable() != null) {
+ nIndexes++;
+ }
ByteArrayOutputStream stream = new ByteArrayOutputStream();
DataOutputStream output = new DataOutputStream(stream);
try {
@@ -242,6 +247,13 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
WritableUtils.writeVInt(output, protoBytes.length);
output.write(protoBytes);
}
+ if (dataTable.getTransformingNewTable() != null) {
+ ServerCachingProtos.TransformMaintainer proto = TransformMaintainer.toProto(
+ dataTable.getTransformingNewTable().getTransformMaintainer(dataTable, connection));
+ byte[] protoBytes = proto.toByteArray();
+ WritableUtils.writeVInt(output, protoBytes.length);
+ output.write(protoBytes);
+ }
} catch (IOException e) {
throw new RuntimeException(e); // Impossible
}
@@ -316,8 +328,13 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
int protoSize = WritableUtils.readVInt(input);
byte[] b = new byte[protoSize];
input.readFully(b);
- org.apache.phoenix.coprocessor.generated.ServerCachingProtos.IndexMaintainer proto = ServerCachingProtos.IndexMaintainer.parseFrom(b);
- maintainers.add(IndexMaintainer.fromProto(proto, rowKeySchema, isDataTableSalted));
+ try {
+ org.apache.phoenix.coprocessor.generated.ServerCachingProtos.IndexMaintainer proto = ServerCachingProtos.IndexMaintainer.parseFrom(b);
+ maintainers.add(IndexMaintainer.fromProto(proto, rowKeySchema, isDataTableSalted));
+ } catch (InvalidProtocolBufferException e) {
+ org.apache.phoenix.coprocessor.generated.ServerCachingProtos.TransformMaintainer proto = ServerCachingProtos.TransformMaintainer.parseFrom(b);
+ maintainers.add(TransformMaintainer.fromProto(proto, rowKeySchema, isDataTableSalted));
+ }
} else {
IndexMaintainer maintainer = new IndexMaintainer(rowKeySchema, isDataTableSalted);
maintainer.readFields(input);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
index 1f6dd73..be5d8f9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
@@ -18,6 +18,7 @@
package org.apache.phoenix.index;
import static org.apache.phoenix.query.QueryServices.INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB;
+import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
import java.sql.SQLException;
import java.util.List;
@@ -134,6 +135,9 @@ public class IndexMetaDataCacheClient {
mutation.setAttribute(PhoenixRuntime.TENANT_ID_ATTRIB, tenantIdBytes);
}
mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+ if (table.getTransformingNewTable() != null) {
+ mutation.setAttribute(BaseScannerRegionObserver.DO_TRANSFORMING, TRUE_BYTES);
+ }
if (attribValue != null) {
mutation.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, attribValue);
mutation.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION,
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
index 9630a0f..abf3acc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
@@ -34,6 +34,8 @@ import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.DO_TRANSFORMING;
+
/**
* Phoenix-based {@link IndexCodec}. Manages all the logic of how to cleanup an index (
* {@link #getIndexDeletes(TableState, IndexMetaData, byte[], byte[])}) as well as what the new index state should be (
@@ -69,6 +71,13 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
return true;
}
+ boolean isTransforming(Map<String, byte[]> attributes) {
+ if (attributes == null) { return false; }
+ byte[] transforming = attributes.get(DO_TRANSFORMING);
+ if (transforming == null) { return false; }
+ return true;
+ }
+
@Override
public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData context, byte[] regionStartKey, byte[] regionEndKey) throws IOException {
PhoenixIndexMetaData metaData = (PhoenixIndexMetaData)context;
@@ -122,6 +131,6 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
@Override
public boolean isEnabled(Mutation m) {
- return hasIndexMaintainers(m.getAttributesMap());
+ return hasIndexMaintainers(m.getAttributesMap()) || isTransforming(m.getAttributesMap());
}
}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
index 75ce9f4..4385894 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
@@ -26,6 +26,7 @@ import org.apache.phoenix.cache.IndexMetaDataCache;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite;
import org.apache.phoenix.hbase.index.covered.IndexMetaData;
+import org.apache.phoenix.schema.transform.TransformMaintainer;
import org.apache.phoenix.transaction.PhoenixTransactionContext;
public class PhoenixIndexMetaData implements IndexMetaData {
@@ -52,7 +53,9 @@ public class PhoenixIndexMetaData implements IndexMetaData {
boolean hasLocalIndexes = false;
for (IndexMaintainer maintainer : indexMetaDataCache.getIndexMaintainers()) {
isImmutable &= maintainer.isImmutableRows();
- hasNonPkColumns |= !maintainer.getIndexedColumns().isEmpty();
+ if (!(maintainer instanceof TransformMaintainer)) {
+ hasNonPkColumns |= !maintainer.getIndexedColumns().isEmpty();
+ }
hasLocalIndexes |= maintainer.isLocalIndex();
}
this.isImmutable = isImmutable;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
index 83d8d85..2a13121 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
@@ -184,7 +184,7 @@ public class PhoenixPreparedStatement extends PhoenixStatement implements Prepar
if (statement.getOperation().isMutation()) {
throw new ExecuteQueryNotApplicableException(statement.getOperation());
}
-
+
return executeQuery(statement,createQueryLogger(statement,query));
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 7422bbd..fddd608 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -312,10 +312,16 @@ public class PhoenixStatement implements Statement, SQLCloseable {
protected PhoenixResultSet executeQuery(final CompilableStatement stmt, final QueryLogger queryLogger)
throws SQLException {
- return executeQuery(stmt, true, queryLogger);
+ return executeQuery(stmt, true, queryLogger, false);
}
+
+ protected PhoenixResultSet executeQuery(final CompilableStatement stmt, final QueryLogger queryLogger, boolean noCommit)
+ throws SQLException {
+ return executeQuery(stmt, true, queryLogger, noCommit);
+ }
+
private PhoenixResultSet executeQuery(final CompilableStatement stmt,
- final boolean doRetryOnMetaNotFoundError, final QueryLogger queryLogger) throws SQLException {
+ final boolean doRetryOnMetaNotFoundError, final QueryLogger queryLogger, final boolean noCommit) throws SQLException {
GLOBAL_SELECT_SQL_COUNTER.increment();
try {
@@ -387,7 +393,7 @@ public class PhoenixStatement implements Statement, SQLCloseable {
setLastUpdateCount(NO_UPDATE);
setLastUpdateOperation(stmt.getOperation());
// If transactional, this will move the read pointer forward
- if (connection.getAutoCommit()) {
+ if (connection.getAutoCommit() && !noCommit) {
connection.commit();
}
connection.incrementStatementExecutionCounter();
@@ -404,7 +410,7 @@ public class PhoenixStatement implements Statement, SQLCloseable {
e.getSchemaName(), e.getTableName(), true)
.wasUpdated()) {
//TODO we can log retry count and error for debugging in LOG table
- return executeQuery(stmt, false, queryLogger);
+ return executeQuery(stmt, false, queryLogger, noCommit);
}
}
throw e;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/TransformTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/TransformTool.java
index 5e5cdcf..f7574d3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/TransformTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/TransformTool.java
@@ -435,12 +435,10 @@ public class TransformTool extends Configured implements Tool {
new PostIndexDDLCompiler(pConnection, new TableRef(pOldTable), true);
ddlCompiler.compile(pNewTable);
final List<String> newColumns = ddlCompiler.getDataColumnNames();
- //final String selectQuery = ddlCompiler.getSelectQuery();
final String upsertQuery =
QueryUtil.constructUpsertStatement(newTableWithSchema, newColumns, HintNode.Hint.NO_INDEX);
configuration.set(PhoenixConfigurationUtil.UPSERT_STATEMENT, upsertQuery);
- //PhoenixConfigurationUtil.setPhysicalTableName(configuration, pNewTable.getPhysicalName().getString());
PhoenixConfigurationUtil.setUpsertColumnNames(configuration,
ddlCompiler.getIndexColumnNames().toArray(new String[ddlCompiler.getIndexColumnNames().size()]));
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
index 1714cfe..26a2114 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -120,6 +120,7 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated
public MetaDataMutationResult addColumn(List<Mutation> tableMetaData,
PTable table,
PTable parentTable,
+ PTable transformingNewTable,
Map<String, List<Pair<String, Object>>> properties,
Set<String> colFamiliesForPColumnsToBeAdded,
List<PColumn> columns) throws SQLException;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 42bbf4e..932b676 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -2324,6 +2324,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
public MetaDataMutationResult addColumn(final List<Mutation> tableMetaData,
PTable table,
final PTable parentTable,
+ final PTable transformingNewTable,
Map<String, List<Pair<String, Object>>> stmtProperties,
Set<String> colFamiliesForPColumnsToBeAdded,
List<PColumn> columns) throws SQLException {
@@ -2410,6 +2411,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
if (parentTable!=null)
builder.setParentTable(PTableImpl.toProto(parentTable));
+ if (transformingNewTable!=null) {
+ builder.setTransformingNewTable(PTableImpl.toProto(transformingNewTable));
+ }
builder.setAddingColumns(addingColumns);
instance.addColumn(controller, builder.build(), rpcCallback);
if (controller.getFailedOn() != null) {
@@ -4449,7 +4453,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
tableMetadata.addAll(metaConnection.getMutationState().toMutations(metaConnection.getSCN()).next().getSecond());
metaConnection.rollback();
- metaConnection.getQueryServices().addColumn(tableMetadata, sysCatalogPTable, null, Collections.<String,List<Pair<String,Object>>>emptyMap(), Collections.<String>emptySet(), Lists.newArrayList(column));
+ metaConnection.getQueryServices().addColumn(tableMetadata, sysCatalogPTable, null,null, Collections.<String,List<Pair<String,Object>>>emptyMap(), Collections.<String>emptySet(), Lists.newArrayList(column));
metaConnection.removeTable(null, SYSTEM_CATALOG_NAME, null, timestamp);
ConnectionQueryServicesImpl.this.removeTable(null,
SYSTEM_CATALOG_NAME, null,
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 7667837..bef0bfe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -332,6 +332,7 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
public MetaDataMutationResult addColumn(List<Mutation> tableMetaData,
PTable table,
PTable parentTable,
+ PTable transformingNewTable,
Map<String, List<Pair<String, Object>>> properties,
Set<String> colFamiliesForPColumnsToBeAdded,
List<PColumn> columnsToBeAdded) throws SQLException {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index d271547..f672aa6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -136,10 +136,11 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple
public MetaDataMutationResult addColumn(List<Mutation> tableMetaData,
PTable table,
PTable parentTable,
+ PTable transformingNewTable,
Map<String, List<Pair<String, Object>>> properties,
Set<String> colFamiliesForPColumnsToBeAdded,
List<PColumn> columns) throws SQLException {
- return getDelegate().addColumn(tableMetaData, table, parentTable,
+ return getDelegate().addColumn(tableMetaData, table, parentTable, transformingNewTable,
properties, colFamiliesForPColumnsToBeAdded, columns);
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
index 292a0ba..38bd4e2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
@@ -143,6 +143,9 @@ public class DelegateTable implements PTable {
public List<PTable> getIndexes() { return delegate.getIndexes(); }
@Override
+ public PTable getTransformingNewTable() { return delegate.getTransformingNewTable(); }
+
+ @Override
public PIndexState getIndexState() {
return delegate.getIndexState();
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 732c718..c6713df 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -116,6 +116,7 @@ import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.ONE_CELL_P
import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS;
import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
import static org.apache.phoenix.schema.PTable.ViewType.MAPPED;
+import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone;
import static org.apache.phoenix.schema.PTableType.INDEX;
import static org.apache.phoenix.schema.PTableType.TABLE;
import static org.apache.phoenix.schema.PTableType.VIEW;
@@ -348,7 +349,7 @@ public class MetaDataClient {
private static final String CREATE_SCHEMA = "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE
+ "\"( " + TABLE_SCHEM + "," + TABLE_NAME + ") VALUES (?,?)";
- private static final String CREATE_LINK =
+ public static final String CREATE_LINK =
"UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\"( " +
TENANT_ID + "," +
TABLE_SCHEM + "," +
@@ -3152,6 +3153,11 @@ public class MetaDataClient {
QualifierEncodingScheme encodingScheme = null;
Byte encodingSchemeSerializedByte = (Byte) TableProperty.COLUMN_ENCODED_BYTES.getValue(tableProps);
if (encodingSchemeSerializedByte == null) {
+ if (tableProps.containsKey(ENCODING_SCHEME)) {
+ encodingSchemeSerializedByte = QualifierEncodingScheme.valueOf(((String) tableProps.get(ENCODING_SCHEME))).getSerializedMetadataValue();
+ }
+ }
+ if (encodingSchemeSerializedByte == null) {
// Ignore default if transactional and column encoding is not supported (as with OMID)
if (transactionProvider == null || !transactionProvider.getTransactionProvider().isUnsupported(PhoenixTransactionProvider.Feature.COLUMN_ENCODING) ) {
encodingSchemeSerializedByte = (byte)connection.getQueryServices().getProps().getInt(QueryServices.DEFAULT_COLUMN_ENCODED_BYTES_ATRRIB,
@@ -4052,9 +4058,10 @@ public class MetaDataClient {
connection.rollback();
}
- if (isTransformNeeded) {
+ PTable transformingNewTable = null;
+ if (isTransformNeeded) {
try {
- Transform.addTransform(connection, tenantIdToUse, table, metaProperties, seqNum, PTable.TransformType.METADATA_TRANSFORM);
+ transformingNewTable = Transform.addTransform(connection, tenantIdToUse, table, metaProperties, seqNum, PTable.TransformType.METADATA_TRANSFORM);
} catch (SQLException ex) {
connection.rollback();
throw ex;
@@ -4137,7 +4144,7 @@ public class MetaDataClient {
acquiredColumnMutexSet.add(pColumn.toString());
}
MetaDataMutationResult result = connection.getQueryServices().addColumn(tableMetaData, table,
- getParentTable(table), properties, colFamiliesForPColumnsToBeAdded, columns);
+ getParentTable(table), transformingNewTable, properties, colFamiliesForPColumnsToBeAdded, columns);
try {
MutationCode code = processMutationResult(schemaName, tableName, result);
@@ -4206,6 +4213,10 @@ public class MetaDataClient {
connection.getQueryServices().updateData(plan);
}
}
+ if (transformingNewTable != null) {
+ connection.removeTable(tenantId, fullTableName, null, resolvedTimeStamp);
+ connection.getQueryServices().clearCache();
+ }
if (emptyCF != null) {
Long scn = connection.getSCN();
connection.setAutoCommit(true);
@@ -4556,7 +4567,7 @@ public class MetaDataClient {
Collections.<Mutation>singletonList(new Put(SchemaUtil.getTableKey
(tenantIdBytes, tableContainingColumnToDrop.getSchemaName().getBytes(),
tableContainingColumnToDrop.getTableName().getBytes()))),
- tableContainingColumnToDrop, null, family, Sets.newHashSet(Bytes.toString(emptyCF)), Collections.<PColumn>emptyList());
+ tableContainingColumnToDrop, null, null,family, Sets.newHashSet(Bytes.toString(emptyCF)), Collections.<PColumn>emptyList());
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index 0eb5d26..437ab61 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -163,7 +163,11 @@ public interface PTable extends PMetaDataEntity {
/**
* Link from an index on a view to its parent table
*/
- VIEW_INDEX_PARENT_TABLE((byte)6);
+ VIEW_INDEX_PARENT_TABLE((byte)6),
+ /**
+ * Link from the old table to the new transforming table
+ */
+ TRANSFORMING_NEW_TABLE((byte)7);
private final byte[] byteValue;
private final byte serializedValue;
@@ -770,6 +774,12 @@ public interface PTable extends PMetaDataEntity {
List<PTable> getIndexes();
/**
+ * Return the new version of the table if it is going through transform.
+ * @return the new table.
+ */
+ PTable getTransformingNewTable();
+
+ /**
* For a table of index type, return the state of the table.
* @return the state of the index.
*/
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index 97605d7..a90e046 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -104,7 +104,6 @@ import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PDouble;
import org.apache.phoenix.schema.types.PFloat;
import org.apache.phoenix.schema.types.PVarchar;
-import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
import org.apache.phoenix.transaction.TransactionFactory;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.EncodedColumnsUtil;
@@ -116,6 +115,7 @@ import org.apache.phoenix.util.TrustedByteArrayOutputStream;
import org.apache.phoenix.thirdparty.com.google.common.base.Objects;
import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
+import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
import org.apache.phoenix.thirdparty.com.google.common.collect.ArrayListMultimap;
import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableMap;
@@ -168,6 +168,9 @@ public class PTableImpl implements PTable {
private final RowKeySchema rowKeySchema;
// Indexes associated with this table.
private final List<PTable> indexes;
+ // If the table is going through transform, we have this.
+ private final PTable transformingNewTable;
+
// Data table name that the index is created on.
private final PName parentName;
private final PName parentSchemaName;
@@ -231,6 +234,7 @@ public class PTableImpl implements PTable {
private Integer bucketNum;
private RowKeySchema rowKeySchema;
private List<PTable> indexes;
+ private PTable transformingNewTable;
private PName parentName;
private PName parentSchemaName;
private PName parentTableName;
@@ -399,6 +403,11 @@ public class PTableImpl implements PTable {
return this;
}
+ public Builder setTransformingNewTable(PTable transformingNewTable) {
+ this.transformingNewTable = transformingNewTable;
+ return this;
+ }
+
public Builder setParentName(PName parentName) {
this.parentName = parentName;
return this;
@@ -832,6 +841,9 @@ public class PTableImpl implements PTable {
for (PTable index : this.indexes) {
estimatedSize += index.getEstimatedSize();
}
+ if (transformingNewTable!=null) {
+ estimatedSize += transformingNewTable.getEstimatedSize();
+ }
estimatedSize += PNameFactory.getEstimatedSize(this.parentName);
for (PName physicalName : this.physicalNames) {
@@ -904,6 +916,7 @@ public class PTableImpl implements PTable {
this.bucketNum = builder.bucketNum;
this.rowKeySchema = builder.rowKeySchema;
this.indexes = builder.indexes;
+ this.transformingNewTable = builder.transformingNewTable;
this.parentName = builder.parentName;
this.parentSchemaName = builder.parentSchemaName;
this.parentTableName = builder.parentTableName;
@@ -1006,6 +1019,7 @@ public class PTableImpl implements PTable {
.setBucketNum(table.getBucketNum())
.setIndexes(table.getIndexes() == null ?
Collections.<PTable>emptyList() : table.getIndexes())
+ .setTransformingNewTable(table.getTransformingNewTable())
.setParentSchemaName(table.getParentSchemaName())
.setParentTableName(table.getParentTableName())
.setBaseTableLogicalName(table.getBaseTableLogicalName())
@@ -1620,6 +1634,11 @@ public class PTableImpl implements PTable {
}
@Override
+ public PTable getTransformingNewTable() {
+ return transformingNewTable;
+ }
+
+ @Override
public PIndexState getIndexState() {
return state;
}
@@ -1678,7 +1697,7 @@ public class PTableImpl implements PTable {
public synchronized boolean getIndexMaintainers(ImmutableBytesWritable ptr, PhoenixConnection connection) {
if (indexMaintainersPtr == null || indexMaintainersPtr.getLength()==0) {
indexMaintainersPtr = new ImmutableBytesWritable();
- if (indexes.isEmpty()) {
+ if (indexes.isEmpty() && transformingNewTable == null) {
indexMaintainersPtr.set(ByteUtil.EMPTY_BYTE_ARRAY);
} else {
IndexMaintainer.serialize(this, indexMaintainersPtr, connection);
@@ -1803,6 +1822,11 @@ public class PTableImpl implements PTable {
indexes.add(createFromProto(curPTableProto));
}
+ PTable transformingNewTable = null;
+ if (table.hasTransformingNewTable()){
+ PTableProtos.PTable curTransformingPTableProto = table.getTransformingNewTable();
+ transformingNewTable = createFromProto(curTransformingPTableProto);
+ }
boolean isImmutableRows = table.getIsImmutableRows();
PName parentSchemaName = null;
PName parentTableName = null;
@@ -1972,6 +1996,7 @@ public class PTableImpl implements PTable {
.setRowKeyOrderOptimizable(rowKeyOrderOptimizable)
.setBucketNum((bucketNum == NO_SALTING) ? null : bucketNum)
.setIndexes(indexes == null ? Collections.<PTable>emptyList() : indexes)
+ .setTransformingNewTable(transformingNewTable)
.setParentSchemaName(parentSchemaName)
.setParentTableName(parentTableName)
.setBaseTableLogicalName(parentLogicalName)
@@ -2041,6 +2066,10 @@ public class PTableImpl implements PTable {
for (PTable curIndex : indexes) {
builder.addIndexes(toProto(curIndex));
}
+ PTable transformingNewTable = table.getTransformingNewTable();
+ if (transformingNewTable != null) {
+ builder.setTransformingNewTable(toProto(transformingNewTable));
+ }
builder.setIsImmutableRows(table.isImmutableRows());
// TODO remove this field in 5.0 release
if (table.getParentName() != null) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java
index 56f0545..3eabc72 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java
@@ -18,12 +18,15 @@
package org.apache.phoenix.schema.transform;
import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.TableInfo;
import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
import org.apache.hadoop.conf.Configuration;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
import org.apache.phoenix.mapreduce.transform.TransformTool;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
@@ -34,12 +37,16 @@ import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PNameFactory;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableImpl;
+import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.tool.SchemaExtractionProcessor;
-import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableList;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.JacksonUtil;
import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TableViewFinderResult;
+import org.apache.phoenix.util.ViewUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,10 +56,13 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.sql.Types;
+import java.util.Collections;
import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID;
import static org.apache.phoenix.schema.ColumnMetaDataOps.addColumnMutation;
+import static org.apache.phoenix.schema.MetaDataClient.CREATE_LINK;
import static org.apache.phoenix.schema.PTableType.INDEX;
+import static org.apache.phoenix.schema.PTableType.VIEW;
public class Transform {
private static final Logger LOGGER = LoggerFactory.getLogger(Transform.class);
@@ -63,7 +73,7 @@ public class Transform {
return newName;
}
- public static void addTransform(PhoenixConnection connection, String tenantId, PTable table, MetaDataClient.MetaProperties changingProperties,
+ public static PTable addTransform(PhoenixConnection connection, String tenantId, PTable table, MetaDataClient.MetaProperties changingProperties,
long sequenceNum, PTable.TransformType transformType) throws SQLException {
try {
String newMetadata = JacksonUtil.getObjectWriter().writeValueAsString(changingProperties);
@@ -87,7 +97,7 @@ public class Transform {
newPhysicalTableName = generateNewTableName(schema, logicalTableName, sequenceNum);
}
transformBuilder.setNewPhysicalTableName(newPhysicalTableName);
- Transform.addTransform(table, changingProperties, transformBuilder.build(), connection);
+ return Transform.addTransform(table, changingProperties, transformBuilder.build(), sequenceNum, connection);
} catch (JsonProcessingException ex) {
LOGGER.error("addTransform failed", ex);
throw new SQLException("Adding transform failed with JsonProcessingException");
@@ -101,8 +111,9 @@ public class Transform {
}
}
- protected static void addTransform(
- PTable table, MetaDataClient.MetaProperties changedProps, SystemTransformRecord systemTransformParams, PhoenixConnection connection) throws Exception {
+ protected static PTable addTransform(
+ PTable table, MetaDataClient.MetaProperties changedProps, SystemTransformRecord systemTransformParams,
+ long sequenceNum, PhoenixConnection connection) throws Exception {
PName newTableName = PNameFactory.newName(systemTransformParams.getNewPhysicalTableName());
PName newTableNameWithoutSchema = PNameFactory.newName(SchemaUtil.getTableNameFromFullName(systemTransformParams.getNewPhysicalTableName()));
PTable newTable = new PTableImpl.Builder()
@@ -122,6 +133,7 @@ public class Transform {
.setImmutableRows(table.isImmutableRows())
.setIsChangeDetectionEnabled(table.isChangeDetectionEnabled())
.setIndexType(table.getIndexType())
+ .setIndexes(Collections.<PTable>emptyList())
.setName(newTableName)
.setMultiTenant(table.isMultiTenant())
.setParentName(table.getParentName())
@@ -150,16 +162,77 @@ public class Transform {
(changedProps.getColumnEncodedBytesProp() != null? changedProps.getColumnEncodedBytesProp() : table.getEncodingScheme()))
.build();
SchemaExtractionProcessor schemaExtractionProcessor = new SchemaExtractionProcessor(systemTransformParams.getTenantId(),
- connection.getQueryServices().getConfiguration(), newTable, true);
+ connection.getQueryServices().getConfiguration(), newTable, true);
String ddl = schemaExtractionProcessor.process();
LOGGER.info("Creating transforming table via " + ddl);
connection.createStatement().execute(ddl);
upsertTransform(systemTransformParams, connection);
+
+ // Add row linking from old table row to new table row
+ addTransformTableLink(connection, systemTransformParams.getTenantId(), systemTransformParams.getSchemaName(),
+ systemTransformParams.getLogicalTableName(), newTableName, sequenceNum);
+
+ // Also add the transforming new table link to views
+ TableViewFinderResult childViewsResult = ViewUtil.findChildViews(connection, systemTransformParams.getTenantId()
+ , systemTransformParams.getSchemaName(), systemTransformParams.getLogicalTableName());
+ for (TableInfo view : childViewsResult.getLinks()) {
+ addTransformTableLink(connection, view.getTenantId()==null? null: Bytes.toString(view.getTenantId()),
+ (view.getSchemaName()==null? null: Bytes.toString(view.getSchemaName())), Bytes.toString(view.getTableName())
+ , newTableName, sequenceNum);
+ }
+
+ return newTable;
+ }
+
+ private static void addTransformTableLink(Connection connection, String tenantId, String schemaName, String tableName,
+ PName newTableName, long sequenceNum) throws SQLException {
+ PreparedStatement linkStatement = connection.prepareStatement(CREATE_LINK);
+ linkStatement.setString(1, tenantId);
+ linkStatement.setString(2, schemaName);
+ linkStatement.setString(3,tableName);
+ linkStatement.setString(4, newTableName.getString());
+ linkStatement.setByte(5, PTable.LinkType.TRANSFORMING_NEW_TABLE.getSerializedValue());
+ linkStatement.setLong(6, sequenceNum);
+ linkStatement.setString(7, PTableType.TABLE.getSerializedValue());
+ linkStatement.execute();
+ }
+
+ public static SystemTransformRecord getTransformRecord(Configuration conf, PTableType tableType, PName schemaName,
+ PName tableName, PName dataTableName, PName tenantId,
+ PName parentLogicalName) throws SQLException {
+ if (tableType == PTableType.TABLE) {
+ try (PhoenixConnection connection = QueryUtil.getConnectionOnServer(conf).unwrap(PhoenixConnection.class)) {
+ return Transform.getTransformRecord(schemaName, tableName, null, tenantId, connection);
+ }
+ } else if (tableType == INDEX) {
+ try (PhoenixConnection connection = QueryUtil.getConnectionOnServer(conf).unwrap(PhoenixConnection.class)) {
+ return Transform.getTransformRecord(schemaName, tableName, dataTableName, tenantId, connection);
+ }
+ } else if (tableType == VIEW) {
+ try (PhoenixConnection connection = QueryUtil.getConnectionOnServer(conf).unwrap(PhoenixConnection.class)) {
+ return Transform.getTransformRecord(SchemaUtil.getSchemaNameFromFullName(parentLogicalName.getString()),
+ SchemaUtil.getTableNameFromFullName(parentLogicalName.getString()), null, tenantId==null? null:tenantId.getString(), connection);
+ }
+ }
+ return null;
+ }
+
+ public static SystemTransformRecord getTransformRecord(
+ PName schema, PName logicalTableName, PName logicalParentName, PName tenantId, PhoenixConnection connection) throws SQLException {
+ return getTransformRecordFromDB((schema==null?null:schema.getString())
+ , (logicalTableName==null?null:logicalTableName.getString())
+ , (logicalParentName==null?null:logicalParentName.getString())
+ , (tenantId==null?null:tenantId.getString()), connection);
}
public static SystemTransformRecord getTransformRecord(
String schema, String logicalTableName, String logicalParentName, String tenantId, PhoenixConnection connection) throws SQLException {
- try (ResultSet resultSet = connection.prepareStatement("SELECT " +
+ return getTransformRecordFromDB(schema, logicalTableName, logicalParentName, tenantId, connection);
+ }
+
+ public static SystemTransformRecord getTransformRecordFromDB(
+ String schema, String logicalTableName, String logicalParentName, String tenantId, PhoenixConnection connection) throws SQLException {
+ String sql = "SELECT " +
PhoenixDatabaseMetaData.TENANT_ID + ", " +
PhoenixDatabaseMetaData.TABLE_SCHEM + ", " +
PhoenixDatabaseMetaData.LOGICAL_TABLE_NAME + ", " +
@@ -178,8 +251,9 @@ public class Transform {
(Strings.isNullOrEmpty(tenantId) ? "" : (PhoenixDatabaseMetaData.TENANT_ID + " ='" + tenantId + "' AND ")) +
(Strings.isNullOrEmpty(schema) ? "" : (PhoenixDatabaseMetaData.TABLE_SCHEM + " ='" + schema + "' AND ")) +
PhoenixDatabaseMetaData.LOGICAL_TABLE_NAME + " ='" + logicalTableName + "'" +
- (Strings.isNullOrEmpty(logicalParentName) ? "": (" AND " + PhoenixDatabaseMetaData.LOGICAL_PARENT_NAME + "='" + logicalParentName + "'" ))
- ).executeQuery()) {
+ (Strings.isNullOrEmpty(logicalParentName) ? "" : (" AND " + PhoenixDatabaseMetaData.LOGICAL_PARENT_NAME + "='" + logicalParentName + "'"));
+ try (ResultSet resultSet = ((PhoenixPreparedStatement) connection.prepareStatement(
+ sql)).executeQuery()) {
if (resultSet.next()) {
return SystemTransformRecord.SystemTransformBuilder.build(resultSet);
}
@@ -299,7 +373,6 @@ public class Transform {
} else {
stmt.setNull(colNum++, Types.VARCHAR);
}
-
LOGGER.info("Adding transform type: "
+ systemTransformParams.getString());
stmt.execute();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
index 72bdb2d..750750e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.schema.transform;
import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
+import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.ByteStringer;
@@ -442,6 +443,25 @@ public class TransformMaintainer extends IndexMaintainer {
newTableEmptyKeyValueRef, newTableWALDisabled, oldTableImmutableStorageScheme, newTableImmutableStorageScheme, newTableEncodingScheme);
}
+ public Delete buildRowDeleteMutation(byte[] rowKey, DeleteType deleteType, long ts) {
+ byte[] emptyCF = emptyKeyValueCFPtr.copyBytesIfNecessary();
+ Delete delete = new Delete(rowKey);
+
+ for (ColumnReference ref : newTableColumns) {
+ if (deleteType == DeleteType.SINGLE_VERSION) {
+ delete.addFamilyVersion(ref.getFamily(), ts);
+ } else {
+ delete.addFamily(ref.getFamily(), ts);
+ }
+ }
+ if (deleteType == DeleteType.SINGLE_VERSION) {
+ delete.addFamilyVersion(emptyCF, ts);
+ } else {
+ delete.addFamily(emptyCF, ts);
+ }
+ return delete;
+ }
+
public ImmutableBytesPtr getEmptyKeyValueFamily() {
return emptyKeyValueCFPtr;
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ViewUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ViewUtil.java
index 4d9dd92..b5d1624 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ViewUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ViewUtil.java
@@ -19,6 +19,7 @@ import static org.apache.phoenix.coprocessor.MetaDataProtocol.MIN_SPLITTABLE_SYS
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LINK_TYPE_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PARENT_TENANT_ID_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES;
@@ -285,7 +286,33 @@ public class ViewUtil {
return new TableViewFinderResult(tableInfoList);
}
}
-
+
+ public static TableViewFinderResult findChildViews(PhoenixConnection connection, String tenantId, String schema,
+ String tableName) throws IOException, SQLException {
+ // Find child views
+ TableViewFinderResult childViewsResult = new TableViewFinderResult();
+ ReadOnlyProps readOnlyProps = connection.getQueryServices().getProps();
+ for (int i=0; i<2; i++) {
+ try (Table sysCatOrSysChildLinkTable = connection.getQueryServices()
+ .getTable(SchemaUtil.getPhysicalName(
+ i==0 ? SYSTEM_CHILD_LINK_NAME_BYTES : SYSTEM_CATALOG_TABLE_BYTES,
+ readOnlyProps).getName())) {
+ byte[] tenantIdBytes = tenantId != null ? tenantId.getBytes() : null;
+ ViewUtil.findAllRelatives(sysCatOrSysChildLinkTable, tenantIdBytes,
+ schema == null?null:schema.getBytes(),
+ tableName.getBytes(), PTable.LinkType.CHILD_TABLE, childViewsResult);
+ break;
+ } catch (TableNotFoundException ex) {
+ // try again with SYSTEM.CATALOG in case the schema is old
+ if (i == 1) {
+ // This means even SYSTEM.CATALOG was not found, so this is bad, rethrow
+ throw ex;
+ }
+ }
+ }
+ return childViewsResult;
+ }
+
/**
* Check metadata to find if a given table/view has any immediate child views. Note that this
* is not resilient to orphan {@code parent->child } links.
diff --git a/phoenix-core/src/main/protobuf/MetaDataService.proto b/phoenix-core/src/main/protobuf/MetaDataService.proto
index 63c79ae..7d87e26 100644
--- a/phoenix-core/src/main/protobuf/MetaDataService.proto
+++ b/phoenix-core/src/main/protobuf/MetaDataService.proto
@@ -146,6 +146,7 @@ message AddColumnRequest {
optional int32 clientVersion = 2;
optional PTable parentTable = 3;
optional bool addingColumns = 4;
+ optional PTable transformingNewTable = 5;
}
message DropColumnRequest {
diff --git a/phoenix-core/src/main/protobuf/PTable.proto b/phoenix-core/src/main/protobuf/PTable.proto
index 45e1b6e..26b40b4 100644
--- a/phoenix-core/src/main/protobuf/PTable.proto
+++ b/phoenix-core/src/main/protobuf/PTable.proto
@@ -115,6 +115,7 @@ message PTable {
optional bytes baseTableLogicalNameBytes = 48;
optional bytes schemaVersion = 49;
optional bytes externalSchemaId=50;
+ optional PTable transformingNewTable=51;
}
message EncodedCQCounter {