You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by go...@apache.org on 2021/12/21 22:38:13 UTC
[phoenix] branch 4.x updated: PHOENIX-6612 Add TransformTool
This is an automated email from the ASF dual-hosted git repository.
gokcen pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push:
new d797ca8 PHOENIX-6612 Add TransformTool
d797ca8 is described below
commit d797ca8089b7edcfcc8bae64b162815e396d97dc
Author: Gokcen Iskender <gi...@salesforce.com>
AuthorDate: Wed Aug 4 15:14:01 2021 -0700
PHOENIX-6612 Add TransformTool
Co-authored-by: Gokcen Iskender <47...@users.noreply.github.com>
Signed-off-by: Gokcen Iskender <go...@gmail.com>
---
.../end2end/ImmutableTablePropertiesIT.java | 70 --
.../phoenix/end2end/LogicalTableNameBaseIT.java | 10 +-
.../end2end/LogicalTableNameExtendedIT.java | 36 +
.../apache/phoenix/end2end/LogicalTableNameIT.java | 3 +-
.../end2end/PhoenixRowTimestampFunctionIT.java | 18 +-
.../apache/phoenix/end2end/TransformToolIT.java | 489 +++++++++++++
.../phoenix/end2end/index/SingleCellIndexIT.java | 4 -
.../apache/phoenix/end2end/index/TransformIT.java | 1 +
.../phoenix/compile/PostIndexDDLCompiler.java | 21 +-
.../phoenix/compile/ServerBuildIndexCompiler.java | 12 +-
.../ServerBuildTransformingTableCompiler.java | 98 +++
.../coprocessor/BaseScannerRegionObserver.java | 1 +
.../coprocessor/GlobalIndexRegionScanner.java | 9 +-
.../org/apache/phoenix/index/IndexMaintainer.java | 60 +-
.../PhoenixServerBuildIndexInputFormat.java | 22 +-
.../apache/phoenix/mapreduce/index/IndexTool.java | 12 +-
.../index/PhoenixIndexImportDirectReducer.java | 12 +
.../transform/PhoenixTransformReducer.java | 68 ++
.../phoenix/mapreduce/transform/TransformTool.java | 762 +++++++++++++++++++++
.../phoenix/mapreduce/util/IndexColumnNames.java | 2 +-
.../mapreduce/util/PhoenixConfigurationUtil.java | 43 ++
.../apache/phoenix/schema/ColumnMetaDataOps.java | 192 ++++++
.../org/apache/phoenix/schema/DelegateTable.java | 6 +
.../org/apache/phoenix/schema/MetaDataClient.java | 604 +++++++---------
.../java/org/apache/phoenix/schema/PTable.java | 10 +-
.../java/org/apache/phoenix/schema/PTableImpl.java | 10 +
.../schema/tool/SchemaExtractionProcessor.java | 14 +-
.../schema/transform/SystemTransformRecord.java | 21 +
.../apache/phoenix/schema/transform/Transform.java | 272 ++++++--
.../schema/transform/TransformMaintainer.java | 453 ++++++++++++
.../src/main/protobuf/ServerCachingService.proto | 25 +
.../java/org/apache/phoenix/query/BaseTest.java | 1 +
32 files changed, 2810 insertions(+), 551 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ImmutableTablePropertiesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ImmutableTablePropertiesIT.java
index d5e204b..1e169b7 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ImmutableTablePropertiesIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ImmutableTablePropertiesIT.java
@@ -150,74 +150,4 @@ public class ImmutableTablePropertiesIT extends ParallelStatsDisabledIT {
}
}
}
-
- @Test
- @Ignore("We now support altering storage schema")
- public void testAlterImmutableStorageSchemeProp() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- String immutableDataTableFullName1 = SchemaUtil.getTableName("", generateUniqueName());
- String immutableDataTableFullName2 = SchemaUtil.getTableName("", generateUniqueName());
- try (Connection conn = DriverManager.getConnection(getUrl(), props);) {
- Statement stmt = conn.createStatement();
- // create an immutable table with ONE_CELL_PER_COLUMN storage scheme
- String ddl = "CREATE IMMUTABLE TABLE " + immutableDataTableFullName1 +
- " (a_string varchar not null, col1 integer" +
- " CONSTRAINT pk PRIMARY KEY (a_string)) COLUMN_ENCODED_BYTES=0, IMMUTABLE_STORAGE_SCHEME="
- + PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN;
- stmt.execute(ddl);
- // create an immutable table with SINGLE_CELL_ARRAY_WITH_OFFSETS storage scheme
- ddl = "CREATE IMMUTABLE TABLE " + immutableDataTableFullName2 +
- " (a_string varchar not null, col1 integer" +
- " CONSTRAINT pk PRIMARY KEY (a_string)) COLUMN_ENCODED_BYTES=4, IMMUTABLE_STORAGE_SCHEME="
- + PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS;
- stmt.execute(ddl);
-
- // changing the storage scheme from/to ONE_CELL_PER_COLUMN should fail
- try {
- stmt.execute("ALTER TABLE " + immutableDataTableFullName1 + " SET IMMUTABLE_STORAGE_SCHEME=" + PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS);
- fail();
- }
- catch (SQLException e) {
- assertEquals(SQLExceptionCode.INVALID_IMMUTABLE_STORAGE_SCHEME_CHANGE.getErrorCode(), e.getErrorCode());
- }
- try {
- stmt.execute("ALTER TABLE " + immutableDataTableFullName2 + " SET IMMUTABLE_STORAGE_SCHEME=" + PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN);
- fail();
- }
- catch (SQLException e) {
- assertEquals(SQLExceptionCode.INVALID_IMMUTABLE_STORAGE_SCHEME_CHANGE.getErrorCode(), e.getErrorCode());
- }
- }
- }
-
- @Test
- @Ignore("We now support altering storage schema")
- public void testAlterImmutableStorageSchemeProp_Index() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- String tableName = SchemaUtil.getTableName("", generateUniqueName());
- String indexName = SchemaUtil.getTableName("", "IDX_" + generateUniqueName());
- try (Connection conn = DriverManager.getConnection(getUrl(), props);) {
- Statement stmt = conn.createStatement();
- // create an immutable table with ONE_CELL_PER_COLUMN storage scheme
- String ddl = "CREATE IMMUTABLE TABLE " + tableName +
- " (a_string varchar not null, col1 integer, col2 varchar " +
- " CONSTRAINT pk PRIMARY KEY (a_string)) COLUMN_ENCODED_BYTES=0, IMMUTABLE_STORAGE_SCHEME="
- + PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN;
- stmt.execute(ddl);
-
- ddl = "CREATE INDEX " + indexName + " ON " + tableName +
- " (col1) INCLUDE (col2)";
- stmt.execute(ddl);
-
- // changing the storage scheme from/to ONE_CELL_PER_COLUMN should fail
- try {
- stmt.execute("ALTER INDEX " + indexName + " ON " + tableName +
- " ACTIVE SET IMMUTABLE_STORAGE_SCHEME=" + PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS);
- fail();
- }
- catch (SQLException e) {
- assertEquals(SQLExceptionCode.INVALID_IMMUTABLE_STORAGE_SCHEME_CHANGE.getErrorCode(), e.getErrorCode());
- }
- }
- }
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameBaseIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameBaseIT.java
index 6df0f02..379ab58 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameBaseIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameBaseIT.java
@@ -34,6 +34,7 @@ import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
@@ -529,15 +530,14 @@ public abstract class LogicalTableNameBaseIT extends BaseTest {
public static void renameAndDropPhysicalTable(Connection conn, String tenantId, String schema, String tableName, String physicalName, boolean isNamespaceEnabled) throws Exception {
String
- changeName =
- String.format(
- "UPSERT INTO SYSTEM.CATALOG (TENANT_ID, TABLE_SCHEM, TABLE_NAME, COLUMN_NAME, COLUMN_FAMILY, PHYSICAL_TABLE_NAME) VALUES (%s, '%s', '%s', NULL, NULL, '%s')",
- tenantId, schema, tableName, physicalName);
+ changeName = String.format(
+ "UPSERT INTO SYSTEM.CATALOG (TENANT_ID, TABLE_SCHEM, TABLE_NAME, COLUMN_NAME, COLUMN_FAMILY, PHYSICAL_TABLE_NAME) VALUES (%s, %s, '%s', NULL, NULL, '%s')",
+ tenantId, schema==null ? null : ("'" + schema + "'"), tableName, physicalName);
conn.createStatement().execute(changeName);
conn.commit();
String fullTableName = SchemaUtil.getTableName(schema, tableName);
- if (isNamespaceEnabled) {
+ if (isNamespaceEnabled && !(Strings.isNullOrEmpty(schema) || NULL_STRING.equals(schema))) {
fullTableName = schema + NAMESPACE_SEPARATOR + tableName;
}
Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameExtendedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameExtendedIT.java
index cb58962..2141d99 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameExtendedIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameExtendedIT.java
@@ -29,6 +29,7 @@ import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -53,6 +54,9 @@ public class LogicalTableNameExtendedIT extends LogicalTableNameBaseIT {
}
public LogicalTableNameExtendedIT() {
+ StringBuilder optionBuilder = new StringBuilder();
+ optionBuilder.append(" ,IMMUTABLE_STORAGE_SCHEME=ONE_CELL_PER_COLUMN");
+ this.dataTableDdl = optionBuilder.toString();
propsNamespace.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(true));
}
@@ -193,6 +197,38 @@ public class LogicalTableNameExtendedIT extends LogicalTableNameBaseIT {
}
@Test
+ public void testHint() throws Exception {
+ String tableName = "TBL_" + generateUniqueName();
+ String indexName = "IDX_" + generateUniqueName();
+ String indexName2 = "IDX2_" + generateUniqueName();
+ try (Connection conn = getConnection(propsNamespace)) {
+ conn.setAutoCommit(true);
+ createTable(conn, tableName);
+ createIndexOnTable(conn, tableName, indexName);
+ createIndexOnTable(conn, tableName, indexName2);
+ populateTable(conn, tableName, 1, 2);
+
+ // Test hint
+ String tableSelect = "SELECT V1,V2,V3 FROM " + tableName;
+ ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + tableSelect);
+ assertEquals(true, QueryUtil.getExplainPlan(rs).contains(indexName));
+ try (HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices()
+ .getAdmin()) {
+ String snapshotName = new StringBuilder(indexName2).append("-Snapshot").toString();
+ admin.snapshot(snapshotName, TableName.valueOf(indexName2));
+ String newName = "NEW_" + indexName2;
+ admin.cloneSnapshot(Bytes.toBytes(snapshotName), Bytes.toBytes(newName));
+ renameAndDropPhysicalTable(conn, "NULL", null, indexName2, newName, true);
+ }
+ String indexSelect = "SELECT /*+ INDEX(" + tableName + " " + indexName2 + ")*/ V1,V2,V3 FROM " + tableName;
+ rs = conn.createStatement().executeQuery("EXPLAIN " + indexSelect);
+ assertEquals(true, QueryUtil.getExplainPlan(rs).contains(indexName2));
+ rs = conn.createStatement().executeQuery(indexSelect);
+ assertEquals(true, rs.next());
+ }
+ }
+
+ @Test
public void testUpdatePhysicalTableName_tenantViews() throws Exception {
try (Connection conn = getConnection(propsNamespace)) {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameIT.java
index 729a8d7..119fd7ed 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameIT.java
@@ -73,8 +73,9 @@ public class LogicalTableNameIT extends LogicalTableNameBaseIT {
this.createChildAfterRename = createChildAfterRename;
this.immutable = immutable;
StringBuilder optionBuilder = new StringBuilder();
+ optionBuilder.append(" ,IMMUTABLE_STORAGE_SCHEME=ONE_CELL_PER_COLUMN");
if (immutable) {
- optionBuilder.append(" ,IMMUTABLE_STORAGE_SCHEME=ONE_CELL_PER_COLUMN, IMMUTABLE_ROWS=true");
+ optionBuilder.append(" , IMMUTABLE_ROWS=true");
}
this.dataTableDdl = optionBuilder.toString();
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRowTimestampFunctionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRowTimestampFunctionIT.java
index 0ef3dce..e1a39a0 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRowTimestampFunctionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRowTimestampFunctionIT.java
@@ -63,13 +63,19 @@ public class PhoenixRowTimestampFunctionIT extends ParallelStatsDisabledIT {
public PhoenixRowTimestampFunctionIT(QualifierEncodingScheme encoding,
ImmutableStorageScheme storage) {
StringBuilder optionBuilder = new StringBuilder();
- optionBuilder.append(" COLUMN_ENCODED_BYTES = " + encoding.ordinal());
- optionBuilder.append(",IMMUTABLE_STORAGE_SCHEME = "+ storage.toString());
- this.tableDDLOptions = optionBuilder.toString();
- this.encoded = (encoding != QualifierEncodingScheme.NON_ENCODED_QUALIFIERS)
- ? true : false;
this.optimized = storage == ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS
? true : false;
+ // We cannot have non encoded column names if the storage type is single cell
+ this.encoded = (encoding != QualifierEncodingScheme.NON_ENCODED_QUALIFIERS)
+ ? true : (this.optimized) ? true : false;
+
+ if (this.optimized && encoding == QualifierEncodingScheme.NON_ENCODED_QUALIFIERS) {
+ optionBuilder.append(" COLUMN_ENCODED_BYTES = " + QualifierEncodingScheme.ONE_BYTE_QUALIFIERS.ordinal());
+ } else {
+ optionBuilder.append(" COLUMN_ENCODED_BYTES = " + encoding.ordinal());
+ }
+ optionBuilder.append(", IMMUTABLE_STORAGE_SCHEME = "+ storage.toString());
+ this.tableDDLOptions = optionBuilder.toString();
}
@Parameterized.Parameters(name = "encoding={0},storage={1}")
@@ -153,7 +159,7 @@ public class PhoenixRowTimestampFunctionIT extends ParallelStatsDisabledIT {
@Test
public void testRowTimestampDefault() throws Exception {
-
+ if (encoded || optimized) return;
String tableName = generateUniqueName();
try (Connection conn = DriverManager.getConnection(getUrl())) {
String ddl = "CREATE TABLE IF NOT EXISTS " + tableName
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TransformToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TransformToolIT.java
new file mode 100644
index 0000000..276f08b
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TransformToolIT.java
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.end2end.index.SingleCellIndexIT;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.transform.TransformTool;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.transform.SystemTransformRecord;
+import org.apache.phoenix.schema.transform.Transform;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.apache.phoenix.util.TestUtil.getRowCount;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class TransformToolIT extends ParallelStatsDisabledIT{
+ private static final Logger LOGGER = LoggerFactory.getLogger(TransformToolIT.class);
+ private final String tableDDLOptions;
+ // TODO test with immutable
+ private boolean mutable = true;
+
+ public TransformToolIT() {
+ StringBuilder optionBuilder = new StringBuilder();
+ optionBuilder.append(" IMMUTABLE_STORAGE_SCHEME=ONE_CELL_PER_COLUMN, COLUMN_ENCODED_BYTES=NONE, TTL=18000 ");
+ if (!mutable) {
+ optionBuilder.append(", IMMUTABLE_ROWS=true ");
+ }
+ tableDDLOptions = optionBuilder.toString();
+ }
+
+ @BeforeClass
+ public static synchronized void setup() throws Exception {
+ Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(2);
+ serverProps.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(20));
+ serverProps.put(QueryServices.MAX_SERVER_METADATA_CACHE_TIME_TO_LIVE_MS_ATTRIB, Long.toString(5));
+ serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
+ QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
+ serverProps.put(QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS, Long.toString(8));
+ Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2);
+ setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()),
+ new ReadOnlyProps(clientProps.entrySet().iterator()));
+ }
+
+ private void createTableAndUpsertRows(Connection conn, String dataTableFullName, int numOfRows) throws SQLException {
+ String stmString1 =
+ "CREATE TABLE IF NOT EXISTS " + dataTableFullName
+ + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) "
+ + tableDDLOptions;
+ conn.createStatement().execute(stmString1);
+ String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName);
+ PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
+
+ // insert rows
+ for (int i = 1; i <= numOfRows; i++) {
+ IndexToolIT.upsertRow(stmt1, i);
+ }
+ conn.commit();
+ }
+ @Test
+ public void testTransformTable() throws Exception {
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ String newTableFullName = dataTableFullName + "_1";
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+ int numOfRows = 2;
+ createTableAndUpsertRows(conn, dataTableFullName, numOfRows);
+
+ conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
+ " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+
+ SystemTransformRecord record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
+ assertNotNull(record);
+
+ List<String> args = getArgList(schemaName, dataTableName, null,
+ null, null, null, false, false, false, false);
+ runTransformTool(args.toArray(new String[0]), 0);
+ record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
+ assertEquals(PTable.TransformStatus.COMPLETED.name(), record.getTransformStatus());
+ assertEquals(getRowCount(conn, dataTableFullName), getRowCount(conn,newTableFullName));
+
+ // Test that the PhysicalTableName is updated.
+ PTable oldTable = PhoenixRuntime.getTable(conn, dataTableFullName);
+ assertEquals(dataTableName+"_1", oldTable.getPhysicalName(true).getString());
+
+ String sql = "SELECT ID, NAME, ZIP FROM %s ";
+ ResultSet rs1 = conn.createStatement().executeQuery(String.format(sql, dataTableFullName));
+ ResultSet rs2 = conn.createStatement().executeQuery(String.format(sql, newTableFullName));
+ for (int i=0; i < numOfRows; i++) {
+ assertTrue(rs1.next());
+ assertTrue(rs2.next());
+ assertEquals(rs1.getString(1), rs2.getString(1));
+ assertEquals(rs1.getString(2), rs2.getString(2));
+ assertEquals(rs1.getInt(3), rs2.getInt(3));
+ }
+ assertFalse(rs1.next());
+ assertFalse(rs2.next());
+ }
+ }
+
+ @Test
+ public void testAbortTransform() throws Exception {
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+ createTableAndUpsertRows(conn, dataTableFullName, 2);
+
+ conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
+ " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+
+ SystemTransformRecord record = Transform.getTransformRecord(schemaName, dataTableName, null, null,conn.unwrap(PhoenixConnection.class));
+ assertNotNull(record);
+
+ List<String> args = getArgList(schemaName, dataTableName, null,
+ null, null, null, true, false, false, false);
+
+ runTransformTool(args.toArray(new String[0]), 0);
+ record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
+ assertNull(record);
+ }
+ }
+
+ private void pauseTableTransform(String schemaName, String dataTableName, Connection conn) throws Exception {
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+
+ createTableAndUpsertRows(conn, dataTableFullName, 2);
+
+ conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
+ " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+
+ SystemTransformRecord record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
+ assertNotNull(record);
+
+ List<String> args = getArgList(schemaName, dataTableName, null,
+ null, null, null, false, true, false, false);
+
+ runTransformTool(args.toArray(new String[0]), 0);
+ record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
+ assertEquals(PTable.TransformStatus.PAUSED.name(), record.getTransformStatus());
+ }
+
+ @Test
+ public void testPauseTransform() throws Exception {
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+ pauseTableTransform(schemaName, dataTableName, conn);
+ }
+ }
+
+ @Test
+ public void testResumeTransform() throws Exception {
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+ pauseTableTransform(schemaName, dataTableName, conn);
+ List<String> args = getArgList(schemaName, dataTableName, null,
+ null, null, null, false, false, true, false);
+
+ runTransformTool(args.toArray(new String[0]), 0);
+ SystemTransformRecord record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
+ assertEquals(PTable.TransformStatus.COMPLETED.name(), record.getTransformStatus());
+ }
+ }
+
+ /**
+ * Test presplitting an index table
+ */
+ @Test
+ public void testSplitTable() throws Exception {
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ final TableName dataTN = TableName.valueOf(dataTableFullName);
+ final TableName newDataTN = TableName.valueOf(dataTableFullName + "_1");
+ try (Connection conn =
+ DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES));
+ HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin()) {
+ conn.setAutoCommit(true);
+ String dataDDL =
+ "CREATE TABLE " + dataTableFullName + "(\n"
+ + "ID VARCHAR NOT NULL PRIMARY KEY,\n"
+ + "\"info\".CAR_NUM VARCHAR(18) NULL,\n"
+ + "\"test\".CAR_NUM VARCHAR(18) NULL,\n"
+ + "\"info\".CAP_DATE VARCHAR NULL,\n" + "\"info\".ORG_ID BIGINT NULL,\n"
+ + "\"info\".ORG_NAME VARCHAR(255) NULL\n" + ") IMMUTABLE_STORAGE_SCHEME=ONE_CELL_PER_COLUMN, COLUMN_ENCODED_BYTES = 0";
+ conn.createStatement().execute(dataDDL);
+
+ String[] idPrefixes = new String[] {"1", "2", "3", "4"};
+
+ // split the data table, as the tool splits the new table to have the same # of regions
+ // doesn't really matter what the split points are, we just want a target # of regions
+ int numSplits = idPrefixes.length;
+ int targetNumRegions = numSplits + 1;
+ byte[][] splitPoints = new byte[numSplits][];
+ for (String prefix : idPrefixes) {
+ splitPoints[--numSplits] = Bytes.toBytes(prefix);
+ }
+ HTableDescriptor dataTD = admin.getTableDescriptor(dataTN);
+ admin.disableTable(dataTN);
+ admin.deleteTable(dataTN);
+ admin.createTable(dataTD, splitPoints);
+ assertEquals(targetNumRegions, admin.getTableRegions(dataTN).size());
+
+ // insert data
+ int idCounter = 1;
+ try (PreparedStatement ps = conn.prepareStatement("UPSERT INTO " + dataTableFullName
+ + "(ID,\"info\".CAR_NUM,\"test\".CAR_NUM,CAP_DATE,ORG_ID,ORG_NAME) VALUES(?,?,?,'2021-01-01 00:00:00',11,'orgname1')")){
+ for (String carNum : idPrefixes) {
+ for (int i = 0; i < 100; i++) {
+ ps.setString(1, idCounter++ + "");
+ ps.setString(2, carNum + "_" + i);
+ ps.setString(3, "test-" + carNum + "_ " + i);
+ ps.addBatch();
+ }
+ }
+ ps.executeBatch();
+ conn.commit();
+ }
+ SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableFullName);
+
+ conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
+ " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+
+ List<String> args = getArgList(schemaName, dataTableName, null,
+ null, null, null, false, false, false, false);
+ // split if data table more than 3 regions
+ args.add("--autosplit=3");
+
+ args.add("-op");
+ args.add("/tmp/" + UUID.randomUUID().toString());
+
+ runTransformTool(args.toArray(new String[0]), 0);
+
+ SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, newDataTN.getNameAsString());
+ assertEquals(targetNumRegions, admin.getTableRegions(newDataTN).size());
+ assertEquals(getRowCount(conn, dataTableFullName), getRowCount(conn, dataTableFullName + "_1"));
+ }
+ }
+
+ @Test
+ public void testDataAfterTransformingMultiColFamilyTable() throws Exception {
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ final TableName dataTN = TableName.valueOf(dataTableFullName);
+ final TableName newDataTN = TableName.valueOf(dataTableFullName + "_1");
+ try (Connection conn =
+ DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES));
+ HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin()) {
+ conn.setAutoCommit(true);
+ String dataDDL =
+ "CREATE TABLE " + dataTableFullName + "(\n"
+ + "ID VARCHAR(5) NOT NULL PRIMARY KEY,\n"
+ + "\"info\".CAR_NUM VARCHAR(18) NULL,\n"
+ + "\"test\".CAR_NUM VARCHAR(18) NULL,\n"
+ + "\"info\".CAP_DATE VARCHAR NULL,\n" + "\"info\".ORG_ID BIGINT NULL,\n"
+ + "\"test\".ORG_NAME VARCHAR(255) NULL\n" + ") IMMUTABLE_STORAGE_SCHEME=ONE_CELL_PER_COLUMN, COLUMN_ENCODED_BYTES = 0";
+ conn.createStatement().execute(dataDDL);
+
+ // insert data
+ int idCounter = 1;
+ try (PreparedStatement ps = conn.prepareStatement("UPSERT INTO " + dataTableFullName
+ + "(ID,\"info\".CAR_NUM,\"test\".CAR_NUM,CAP_DATE,ORG_ID,ORG_NAME) VALUES(?,?,?,'2021-01-01 00:00:00',11,'orgname1')")) {
+ for (int i = 0; i < 5; i++) {
+ ps.setString(1, idCounter++ + "");
+ ps.setString(2, "info-" + i);
+ ps.setString(3, "test-" + i);
+ ps.addBatch();
+ }
+ ps.executeBatch();
+ conn.commit();
+ }
+ SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableFullName);
+
+ conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
+ " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+
+ List<String> args = getArgList(schemaName, dataTableName, null,
+ null, null, null, false, false, false, false);
+ runTransformTool(args.toArray(new String[0]), 0);
+
+ SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, newDataTN.getNameAsString());
+ assertEquals(getRowCount(conn, dataTableFullName), getRowCount(conn, dataTableFullName + "_1"));
+
+ String select = "SELECT ID,\"info\".CAR_NUM,\"test\".CAR_NUM,CAP_DATE,ORG_ID,ORG_NAME FROM ";
+ ResultSet resultSetNew = conn.createStatement().executeQuery(select + newDataTN.getNameAsString());
+ ResultSet resultSetOld = conn.createStatement().executeQuery(select + dataTableFullName);
+ for (int i=0; i < idCounter-1; i++) {
+ assertTrue(resultSetNew.next());
+ assertTrue(resultSetOld.next());
+ assertEquals(resultSetOld.getString(1), resultSetNew.getString(1));
+ assertEquals(resultSetOld.getString(2), resultSetNew.getString(2));
+ assertEquals(resultSetOld.getString(3), resultSetNew.getString(3));
+ assertEquals(resultSetOld.getString(4), resultSetNew.getString(4));
+ assertEquals(resultSetOld.getString(5), resultSetNew.getString(5));
+ assertEquals(resultSetOld.getString(6), resultSetNew.getString(6));
+ }
+ assertFalse(resultSetNew.next());
+ assertFalse(resultSetOld.next());
+ }
+ }
+
+ @Test
+ public void testTransformIndex() throws Exception {
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ String indexTableName = "I_" + generateUniqueName();
+ String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+ int numOfRows = 2;
+ createTableAndUpsertRows(conn, dataTableFullName, numOfRows);
+ conn.createStatement().execute("CREATE INDEX " + indexTableName + " ON " + dataTableFullName + " (NAME) INCLUDE (ZIP)");
+ SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, indexTableFullName);
+ conn.createStatement().execute("ALTER INDEX " + indexTableName + " ON " + dataTableFullName +
+ " ACTIVE IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+
+ SystemTransformRecord record = Transform.getTransformRecord(schemaName, indexTableName, dataTableFullName, null, conn.unwrap(PhoenixConnection.class));
+ assertNotNull(record);
+
+ List<String> args = getArgList(schemaName, dataTableName, indexTableName,
+ null, null, null, false, false, false, false);
+
+ runTransformTool(args.toArray(new String[0]), 0);
+ record = Transform.getTransformRecord(schemaName, indexTableName, dataTableFullName, null, conn.unwrap(PhoenixConnection.class));
+ assertEquals(PTable.TransformStatus.COMPLETED.name(), record.getTransformStatus());
+ assertEquals(getRowCount(conn, indexTableFullName), getRowCount(conn, indexTableFullName + "_1"));
+
+ // Test that the PhysicalTableName is updated.
+ PTable oldTable = PhoenixRuntime.getTable(conn, indexTableFullName);
+ assertEquals(indexTableName+"_1", oldTable.getPhysicalName(true).getString());
+
+ String sql = "SELECT \":ID\", \"0:NAME\", \"0:ZIP\" FROM %s ORDER BY \":ID\"";
+ ResultSet rs1 = conn.createStatement().executeQuery(String.format(sql, indexTableFullName));
+ ResultSet rs2 = conn.createStatement().executeQuery(String.format(sql, indexTableFullName + "_1"));
+ for (int i=0; i < numOfRows; i++) {
+ assertTrue(rs1.next());
+ assertTrue(rs2.next());
+ assertEquals(rs1.getString(1), rs2.getString(1));
+ assertEquals(rs1.getString(2), rs2.getString(2));
+ assertEquals(rs1.getInt(3), rs2.getInt(3));
+ }
+ assertFalse(rs1.next());
+ assertFalse(rs2.next());
+ }
+ }
+
+ public static List<String> getArgList(String schemaName, String dataTable, String indxTable, String tenantId,
+ Long startTime, Long endTime,
+ boolean shouldAbort, boolean shouldPause, boolean shouldResume, boolean isPartial) {
+ List<String> args = Lists.newArrayList();
+ if (schemaName != null) {
+ args.add("--schema=" + schemaName);
+ }
+ // Work around CLI-254. The long-form arg parsing doesn't strip off double-quotes
+ args.add("--data-table=" + dataTable);
+ if (indxTable != null) {
+ args.add("--index-table=" + indxTable);
+ }
+
+ args.add("-op");
+ args.add("/tmp/" + UUID.randomUUID().toString());
+ // Need to run this job in foreground for the test to be deterministic
+ args.add("-runfg");
+
+ if (tenantId != null) {
+ args.add("-tenant");
+ args.add(tenantId);
+ }
+ if(startTime != null) {
+ args.add("-st");
+ args.add(String.valueOf(startTime));
+ }
+ if(endTime != null) {
+ args.add("-et");
+ args.add(String.valueOf(endTime));
+ }
+ if (shouldAbort) {
+ args.add("-abort");
+ }
+ if (shouldPause) {
+ args.add("-pause");
+ }
+ if (shouldResume) {
+ args.add("-resume");
+ }
+ if (isPartial) {
+ args.add("-pt");
+ }
+ return args;
+ }
+
+
+ public static String [] getArgValues(String schemaName,
+ String dataTable, String indexTable, String tenantId,
+ Long startTime, Long endTime) {
+ List<String> args = getArgList(schemaName, dataTable, indexTable,
+ tenantId, startTime, endTime, false, false, false, false);
+ args.add("-op");
+ args.add("/tmp/" + UUID.randomUUID().toString());
+ return args.toArray(new String[0]);
+ }
+
+ public static TransformTool runTransformTool(int expectedStatus, String schemaName, String dataTableName, String indexTableName, String tenantId,
+ String... additionalArgs) throws Exception {
+ final String[] cmdArgs = getArgValues(schemaName, dataTableName,
+ indexTableName, tenantId, 0L, 0L);
+ List<String> cmdArgList = new ArrayList<>(Arrays.asList(cmdArgs));
+ cmdArgList.add("-op");
+ cmdArgList.add("/tmp/" + UUID.randomUUID().toString());
+
+ cmdArgList.addAll(Arrays.asList(additionalArgs));
+ return runTransformTool(cmdArgList.toArray(new String[cmdArgList.size()]), expectedStatus);
+ }
+
+ public static TransformTool runTransformTool(String[] cmdArgs, int expectedStatus) throws Exception {
+ TransformTool tt = new TransformTool();
+ Configuration conf = new Configuration(getUtility().getConfiguration());
+ tt.setConf(conf);
+
+ LOGGER.info("Running TransformTool with {}", Arrays.toString(cmdArgs), new Exception("Stack Trace"));
+ int status = tt.run(cmdArgs);
+
+ assertEquals(expectedStatus, status);
+ return tt;
+ }
+
+}
\ No newline at end of file
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SingleCellIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SingleCellIndexIT.java
index 3eb378a..9e9dd76 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SingleCellIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SingleCellIndexIT.java
@@ -461,10 +461,6 @@ public class SingleCellIndexIT extends ParallelStatsDisabledIT {
for (Result result = scanner.next(); result != null; result = scanner.next()) {
for (Cell cell : result.rawCells()) {
String cellString = cell.toString();
- for (Map.Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> entryF : result.getMap()
- .entrySet()) {
- byte[] family = entryF.getKey();
- }
LOGGER.info(cellString + " ****** value : " + Bytes.toStringBinary(CellUtil.cloneValue(cell)));
}
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TransformIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TransformIT.java
index 8c51e80..5fc8b7a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TransformIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TransformIT.java
@@ -45,6 +45,7 @@ public class TransformIT extends ParallelStatsDisabledIT {
private Properties testProps = PropertiesUtil.deepCopy(TEST_PROPERTIES);
public TransformIT() {
+ testProps.put(QueryServices.DEFAULT_IMMUTABLE_STORAGE_SCHEME_ATTRIB, "ONE_CELL_PER_COLUMN");
testProps.put(QueryServices.DEFAULT_COLUMN_ENCODED_BYTES_ATRRIB, "0");
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java
index 355377b..ec6fcf3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java
@@ -32,6 +32,8 @@ import org.apache.phoenix.util.StringUtil;
import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import static org.apache.phoenix.util.IndexUtil.INDEX_COLUMN_NAME_SEP;
+
/**
* Class that compiles plan to generate initial data values after a DDL command for
@@ -43,7 +45,8 @@ public class PostIndexDDLCompiler {
private List<String> indexColumnNames;
private List<String> dataColumnNames;
private String selectQuery;
-
+ private boolean forTransform = false;
+
public PostIndexDDLCompiler(PhoenixConnection connection, TableRef dataTableRef) {
this.connection = connection;
this.dataTableRef = dataTableRef;
@@ -51,6 +54,11 @@ public class PostIndexDDLCompiler {
dataColumnNames = Lists.newArrayList();
}
+ public PostIndexDDLCompiler(PhoenixConnection connection, TableRef dataTableRef, boolean forTransform) {
+ this(connection, dataTableRef);
+ this.forTransform = forTransform;
+ }
+
public MutationPlan compile(final PTable indexTable) throws SQLException {
/*
* Compiles an UPSERT SELECT command to read from the data table and populate the index table
@@ -69,7 +77,8 @@ public class PostIndexDDLCompiler {
PColumn col = indexPKColumns.get(i);
String indexColName = col.getName().getString();
// need to escape backslash as this used in the SELECT statement
- String dataColName = StringUtil.escapeBackslash(col.getExpressionStr());
+ String dataColName = col.getExpressionStr() == null ? col.getName().getString()
+ : StringUtil.escapeBackslash(col.getExpressionStr());
dataColumns.append(dataColName).append(",");
indexColumns.append('"').append(indexColName).append("\",");
indexColumnNames.add(indexColName);
@@ -81,10 +90,16 @@ public class PostIndexDDLCompiler {
for (PColumn col : family.getColumns()) {
if (col.getViewConstant() == null) {
String indexColName = col.getName().getString();
- String dataFamilyName = IndexUtil.getDataColumnFamilyName(indexColName);
+ // Transforming tables also behave like indexes but they don't have index_col_name_sep. So we use family name directly.
+ String dataFamilyName = indexColName.indexOf(INDEX_COLUMN_NAME_SEP)!=-1 ? IndexUtil.getDataColumnFamilyName(indexColName) :
+ col.getFamilyName().getString();
String dataColumnName = IndexUtil.getDataColumnName(indexColName);
if (!dataFamilyName.equals("")) {
dataColumns.append('"').append(dataFamilyName).append("\".");
+ if (forTransform) {
+ // transforming table columns have the same family name
+ indexColumns.append('"').append(dataFamilyName).append("\".");
+ }
}
dataColumns.append('"').append(dataColumnName).append("\",");
indexColumns.append('"').append(indexColName).append("\",");
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
index 27e3585..fd658f9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
@@ -51,13 +51,13 @@ import static org.apache.phoenix.util.ScanUtil.addEmptyColumnToScan;
* index table.
*/
public class ServerBuildIndexCompiler {
- private final PhoenixConnection connection;
- private final String tableName;
- private PTable dataTable;
- private QueryPlan plan;
+ protected final PhoenixConnection connection;
+ protected final String tableName;
+ protected PTable dataTable;
+ protected QueryPlan plan;
- private class RowCountMutationPlan extends BaseMutationPlan {
- private RowCountMutationPlan(StatementContext context, PhoenixStatement.Operation operation) {
+ protected class RowCountMutationPlan extends BaseMutationPlan {
+ protected RowCountMutationPlan(StatementContext context, PhoenixStatement.Operation operation) {
super(context, operation);
}
@Override
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildTransformingTableCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildTransformingTableCompiler.java
new file mode 100644
index 0000000..0690cd8
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildTransformingTableCompiler.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.execute.BaseQueryPlan;
+import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PColumnFamily;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.transform.TransformMaintainer;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.EncodedColumnsUtil;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.SchemaUtil;
+
+import java.sql.SQLException;
+
+import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
+import static org.apache.phoenix.util.ScanUtil.addEmptyColumnToScan;
+
+
+/**
+ * Class that compiles plan to generate initial data values after a DDL command for
+ * transforming table (new table).
+ */
+public class ServerBuildTransformingTableCompiler extends ServerBuildIndexCompiler {
+
+ public ServerBuildTransformingTableCompiler(PhoenixConnection connection, String tableName) {
+ super(connection, tableName);
+ }
+
+ public MutationPlan compile(PTable newTable) throws SQLException {
+ try (final PhoenixStatement statement = new PhoenixStatement(connection)) {
+ String query = "SELECT /*+ NO_INDEX */ count(*) FROM " + tableName;
+ this.plan = statement.compileQuery(query);
+ TableRef tableRef = plan.getTableRef();
+ Scan scan = plan.getContext().getScan();
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ dataTable = tableRef.getTable();
+
+ // By default, we'd use a FirstKeyOnly filter as nothing else needs to be projected for count(*).
+ // However, in this case, we need to project all of the data columns
+ for (PColumnFamily family : dataTable.getColumnFamilies()) {
+ scan.addFamily(family.getName().getBytes());
+ }
+
+ scan.setAttribute(BaseScannerRegionObserver.DO_TRANSFORMING, TRUE_BYTES);
+ TransformMaintainer.serialize(dataTable, ptr, newTable, plan.getContext().getConnection());
+
+ scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ByteUtil.copyKeyBytesIfNecessary(ptr));
+ scan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, TRUE_BYTES);
+ ScanUtil.setClientVersion(scan, MetaDataProtocol.PHOENIX_VERSION);
+ scan.setAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING, TRUE_BYTES);
+ // Serialize page row size only if we're overriding, else use server side value
+ String rebuildPageRowSize =
+ connection.getQueryServices().getProps()
+ .get(QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS);
+ if (rebuildPageRowSize != null) {
+ scan.setAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGE_ROWS,
+ Bytes.toBytes(Long.valueOf(rebuildPageRowSize)));
+ }
+ BaseQueryPlan.serializeViewConstantsIntoScan(scan, dataTable);
+ PTable.QualifierEncodingScheme encodingScheme = newTable.getEncodingScheme();
+ addEmptyColumnToScan(scan, SchemaUtil.getEmptyColumnFamily(newTable), EncodedColumnsUtil.getEmptyKeyValueInfo(encodingScheme).getFirst());
+
+ if (dataTable.isTransactional()) {
+ scan.setAttribute(BaseScannerRegionObserver.TX_STATE, connection.getMutationState().encodeTransaction());
+ }
+
+ // Go through MutationPlan abstraction so that we can create local indexes
+ // with a connectionless connection (which makes testing easier).
+ return new RowCountMutationPlan(plan.getContext(), PhoenixStatement.Operation.UPSERT);
+ }
+ }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index b4c204b..1217d23 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -114,6 +114,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
public static final String REVERSE_SCAN = "_ReverseScan";
public static final String ANALYZE_TABLE = "_ANALYZETABLE";
public static final String REBUILD_INDEXES = "_RebuildIndexes";
+ public static final String DO_TRANSFORMING = "_DoTransforming";
public static final String TX_STATE = "_TxState";
public static final String GUIDEPOST_WIDTH_BYTES = "_GUIDEPOST_WIDTH_BYTES";
public static final String GUIDEPOST_PER_REGION = "_GUIDEPOST_PER_REGION";
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
index c1cc0dd..78cf256 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
@@ -65,6 +65,7 @@ import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository;
import org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.transform.TransformMaintainer;
import org.apache.phoenix.schema.types.PVarbinary;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.IndexUtil;
@@ -191,7 +192,13 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner {
if (indexMetaData == null) {
indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
}
- List<IndexMaintainer> maintainers = IndexMaintainer.deserialize(indexMetaData, true);
+ byte[] transforming = scan.getAttribute(BaseScannerRegionObserver.DO_TRANSFORMING);
+ List<IndexMaintainer> maintainers = null;
+ if (transforming == null) {
+ maintainers = IndexMaintainer.deserialize(indexMetaData, true);
+ } else {
+ maintainers = TransformMaintainer.deserialize(indexMetaData);
+ }
indexMaintainer = maintainers.get(0);
this.scan = scan;
this.innerScanner = innerScanner;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index d4e68a0..1ba7651 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -400,7 +400,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
//**** START: New member variables added in 4.16 ****/
private String logicalIndexName;
- private IndexMaintainer(RowKeySchema dataRowKeySchema, boolean isDataTableSalted) {
+ protected IndexMaintainer(RowKeySchema dataRowKeySchema, boolean isDataTableSalted) {
this.dataRowKeySchema = dataRowKeySchema;
this.isDataTableSalted = isDataTableSalted;
}
@@ -1038,35 +1038,47 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
}
return indexRowKeySchema;
}
-
+
public Put buildUpdateMutation(KeyValueBuilder kvBuilder, ValueGetter valueGetter, ImmutableBytesWritable dataRowKeyPtr, long ts, byte[] regionStartKey, byte[] regionEndKey) throws IOException {
byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, regionStartKey, regionEndKey, ts);
+ return buildUpdateMutation(kvBuilder, valueGetter, dataRowKeyPtr, ts, regionStartKey, regionEndKey,
+ indexRowKey, this.getEmptyKeyValueFamily(), coveredColumnsMap,
+ indexEmptyKeyValueRef, indexWALDisabled, dataImmutableStorageScheme, immutableStorageScheme, encodingScheme);
+ }
+
+ public static Put buildUpdateMutation(KeyValueBuilder kvBuilder, ValueGetter valueGetter, ImmutableBytesWritable dataRowKeyPtr, long ts,
+ byte[] regionStartKey, byte[] regionEndKey, byte[] destRowKey, ImmutableBytesPtr emptyKeyValueCFPtr,
+ Map<ColumnReference, ColumnReference> coveredColumnsMap,
+ ColumnReference destEmptyKeyValueRef, boolean destWALDisabled,
+ ImmutableStorageScheme srcImmutableStroageScheme, ImmutableStorageScheme destImmutableStorageScheme,
+ QualifierEncodingScheme destEncodingScheme) throws IOException {
+ Set<ColumnReference> coveredColumns = coveredColumnsMap.keySet();
Put put = null;
// New row being inserted: add the empty key value
ImmutableBytesWritable latestValue = null;
if (valueGetter==null ||
- this.getCoveredColumns().isEmpty() ||
- (latestValue = valueGetter.getLatestValue(indexEmptyKeyValueRef, ts)) == null ||
+ coveredColumns.isEmpty() ||
+ (latestValue = valueGetter.getLatestValue(destEmptyKeyValueRef, ts)) == null ||
latestValue == ValueGetter.HIDDEN_BY_DELETE) {
// We need to track whether or not our empty key value is hidden by a Delete Family marker at the same timestamp.
// If it is, these Puts will be masked so should not be emitted.
if (latestValue == ValueGetter.HIDDEN_BY_DELETE) {
return null;
}
- put = new Put(indexRowKey);
+ put = new Put(destRowKey);
// add the keyvalue for the empty row
- put.add(kvBuilder.buildPut(new ImmutableBytesPtr(indexRowKey),
- this.getEmptyKeyValueFamily(), indexEmptyKeyValueRef.getQualifierWritable(), ts,
+ put.add(kvBuilder.buildPut(new ImmutableBytesPtr(destRowKey),
+ emptyKeyValueCFPtr, destEmptyKeyValueRef.getQualifierWritable(), ts,
QueryConstants.EMPTY_COLUMN_VALUE_BYTES_PTR));
- put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
+ put.setDurability(!destWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
}
- ImmutableBytesPtr rowKey = new ImmutableBytesPtr(indexRowKey);
- if (immutableStorageScheme != ImmutableStorageScheme.ONE_CELL_PER_COLUMN) {
+ ImmutableBytesPtr rowKey = new ImmutableBytesPtr(destRowKey);
+ if (destImmutableStorageScheme != ImmutableStorageScheme.ONE_CELL_PER_COLUMN) {
// map from index column family to list of pair of index column and data column (for covered columns)
Map<ImmutableBytesPtr, List<Pair<ColumnReference, ColumnReference>>> familyToColListMap = Maps.newHashMap();
- for (ColumnReference ref : this.getCoveredColumns()) {
- ColumnReference indexColRef = this.coveredColumnsMap.get(ref);
+ for (ColumnReference ref : coveredColumns) {
+ ColumnReference indexColRef = coveredColumnsMap.get(ref);
ImmutableBytesPtr cf = new ImmutableBytesPtr(indexColRef.getFamily());
if (!familyToColListMap.containsKey(cf)) {
familyToColListMap.put(cf, Lists.<Pair<ColumnReference, ColumnReference>>newArrayList());
@@ -1080,7 +1092,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
int maxEncodedColumnQualifier = Integer.MIN_VALUE;
// find the max col qualifier
for (Pair<ColumnReference, ColumnReference> colRefPair : colRefPairs) {
- maxEncodedColumnQualifier = Math.max(maxEncodedColumnQualifier, encodingScheme.decode(colRefPair.getFirst().getQualifier()));
+ maxEncodedColumnQualifier = Math.max(maxEncodedColumnQualifier, destEncodingScheme.decode(colRefPair.getFirst().getQualifier()));
}
Expression[] colValues = EncodedColumnsUtil.createColumnExpressionArray(maxEncodedColumnQualifier);
// set the values of the columns
@@ -1088,7 +1100,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
ColumnReference indexColRef = colRefPair.getFirst();
ColumnReference dataColRef = colRefPair.getSecond();
byte[] value = null;
- if (this.dataImmutableStorageScheme == ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
+ if (srcImmutableStroageScheme == ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
Expression expression = new SingleCellColumnExpression(new PDatum() {
@Override public boolean isNullable() {
return false;
@@ -1109,8 +1121,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
@Override public PDataType getDataType() {
return null;
}
- }, dataColRef.getFamily(), dataColRef.getQualifier(), encodingScheme,
- immutableStorageScheme);
+ }, dataColRef.getFamily(), dataColRef.getQualifier(), destEncodingScheme,
+ destImmutableStorageScheme);
ImmutableBytesPtr ptr = new ImmutableBytesPtr();
expression.evaluate(new ValueGetterTuple(valueGetter, ts), ptr);
value = ptr.copyBytesIfNecessary();
@@ -1122,34 +1134,34 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
}
}
if (value != null) {
- int indexArrayPos = encodingScheme.decode(indexColRef.getQualifier())-QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE+1;
+ int indexArrayPos = destEncodingScheme.decode(indexColRef.getQualifier())-QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE+1;
colValues[indexArrayPos] = new LiteralExpression(value);
}
}
List<Expression> children = Arrays.asList(colValues);
// we use SingleCellConstructorExpression to serialize multiple columns into a single byte[]
- SingleCellConstructorExpression singleCellConstructorExpression = new SingleCellConstructorExpression(immutableStorageScheme, children);
+ SingleCellConstructorExpression singleCellConstructorExpression = new SingleCellConstructorExpression(destImmutableStorageScheme, children);
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
singleCellConstructorExpression.evaluate(new BaseTuple() {}, ptr);
if (put == null) {
- put = new Put(indexRowKey);
- put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
+ put = new Put(destRowKey);
+ put.setDurability(!destWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
}
ImmutableBytesPtr colFamilyPtr = new ImmutableBytesPtr(columnFamily);
//this is a little bit of extra work for installations that are running <0.94.14, but that should be rare and is a short-term set of wrappers - it shouldn't kill GC
put.add(kvBuilder.buildPut(rowKey, colFamilyPtr, QueryConstants.SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES_PTR, ts, ptr));
}
} else {
- for (ColumnReference ref : this.getCoveredColumns()) {
- ColumnReference indexColRef = this.coveredColumnsMap.get(ref);
+ for (ColumnReference ref : coveredColumns) {
+ ColumnReference indexColRef = coveredColumnsMap.get(ref);
ImmutableBytesPtr cq = indexColRef.getQualifierWritable();
ImmutableBytesPtr cf = indexColRef.getFamilyWritable();
ImmutableBytesWritable value = valueGetter.getLatestValue(ref, ts);
if (value != null && value != ValueGetter.HIDDEN_BY_DELETE) {
if (put == null) {
- put = new Put(indexRowKey);
- put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
+ put = new Put(destRowKey);
+ put.setDurability(!destWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
}
put.add(kvBuilder.buildPut(rowKey, cf, cq, ts, value));
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
index 1052b30..1114689 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.phoenix.compile.MutationPlan;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.ServerBuildIndexCompiler;
+import org.apache.phoenix.compile.ServerBuildTransformingTableCompiler;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
@@ -63,6 +64,7 @@ import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getInde
import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolSourceTable;
import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexVerifyType;
import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolStartTime;
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIsTransforming;
import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.setCurrentScnValue;
import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
@@ -87,6 +89,17 @@ public class PhoenixServerBuildIndexInputFormat<T extends DBWritable> extends Ph
String indexTableFullName) throws SQLException;
}
+ private class TransformingDataTableQueryPlanBuilder implements QueryPlanBuilder {
+ @Override
+ public QueryPlan getQueryPlan(PhoenixConnection phoenixConnection, String oldTableFullName,
+ String newTableFullName) throws SQLException {
+ PTable newTable = PhoenixRuntime.getTableNoCache(phoenixConnection, newTableFullName);
+ ServerBuildTransformingTableCompiler compiler = new ServerBuildTransformingTableCompiler(phoenixConnection, oldTableFullName);
+ MutationPlan plan = compiler.compile(newTable);
+ return plan.getQueryPlan();
+ }
+ }
+
private class DataTableQueryPlanBuilder implements QueryPlanBuilder {
@Override
public QueryPlan getQueryPlan(PhoenixConnection phoenixConnection, String dataTableFullName,
@@ -145,8 +158,13 @@ public class PhoenixServerBuildIndexInputFormat<T extends DBWritable> extends Ph
String dataTableFullName = getIndexToolDataTableName(configuration);
String indexTableFullName = getIndexToolIndexTableName(configuration);
SourceTable sourceTable = getIndexToolSourceTable(configuration);
- queryPlanBuilder = sourceTable.equals(SourceTable.DATA_TABLE_SOURCE) ?
- new DataTableQueryPlanBuilder() : new IndexTableQueryPlanBuilder();
+ if (getIsTransforming(configuration) &&
+ PhoenixConfigurationUtil.getTransformingTableType(configuration) == SourceTable.DATA_TABLE_SOURCE) {
+ queryPlanBuilder = new TransformingDataTableQueryPlanBuilder();
+ } else {
+ queryPlanBuilder = sourceTable.equals(SourceTable.DATA_TABLE_SOURCE) ?
+ new DataTableQueryPlanBuilder() : new IndexTableQueryPlanBuilder();
+ }
try (final Connection connection = ConnectionUtil.getInputConnection(configuration, overridingProps)) {
PhoenixConnection phoenixConnection = connection.unwrap(PhoenixConnection.class);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index 08e3f4b..36522dd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -883,7 +883,7 @@ public class IndexTool extends Configured implements Tool {
validateLastVerifyTime();
}
if(isTimeRangeSet(startTime, endTime)) {
- validateTimeRange();
+ validateTimeRange(startTime, endTime);
}
if (verify) {
String value = cmdLine.getOptionValue(VERIFY_OPTION.getOpt());
@@ -939,10 +939,10 @@ public class IndexTool extends Configured implements Tool {
}
}
- private void validateTimeRange() {
+ public static void validateTimeRange(Long sTime, Long eTime) {
Long currentTime = EnvironmentEdgeManager.currentTimeMillis();
- Long st = (startTime == null) ? 0 : startTime;
- Long et = (endTime == null) ? currentTime : endTime;
+ Long st = (sTime == null) ? 0 : sTime;
+ Long et = (eTime == null) ? currentTime : eTime;
if (st.compareTo(currentTime) > 0 || et.compareTo(currentTime) > 0 || st.compareTo(et) >= 0) {
throw new RuntimeException(INVALID_TIME_RANGE_EXCEPTION_MESSAGE);
}
@@ -975,7 +975,7 @@ public class IndexTool extends Configured implements Tool {
changeDisabledIndexStateToBuiding(connection);
}
- private static boolean isTimeRangeSet(Long startTime, Long endTime) {
+ public static boolean isTimeRangeSet(Long startTime, Long endTime) {
return startTime != null || endTime != null;
}
@@ -1089,7 +1089,7 @@ public class IndexTool extends Configured implements Tool {
}
// setup a ValueGetter to get index values from the ResultSet
- private ValueGetter getIndexValueGetter(final PhoenixResultSet rs, List<String> dataColNames) {
+ public static ValueGetter getIndexValueGetter(final PhoenixResultSet rs, List<String> dataColNames) {
// map from data col name to index in ResultSet
final Map<String, Integer> rsIndex = new HashMap<>(dataColNames.size());
int i = 1;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
index 2724990..a341d2b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
@@ -33,11 +33,13 @@ import org.apache.hadoop.mapreduce.Reducer;
import org.apache.phoenix.coprocessor.IndexToolVerificationResult;
import org.apache.phoenix.coprocessor.TaskRegionObserver;
import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.transform.TransformTool;
import org.apache.phoenix.mapreduce.util.ConnectionUtil;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.task.Task;
+import org.apache.phoenix.schema.transform.Transform;
import org.apache.phoenix.util.SchemaUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -159,6 +161,16 @@ public class PhoenixIndexImportDirectReducer extends
throw new RuntimeException(e.getMessage());
}
}
+
+ if (PhoenixConfigurationUtil.getIsTransforming(context.getConfiguration())) {
+ try {
+ Transform.completeTransform(ConnectionUtil
+ .getInputConnection(context.getConfiguration()), context.getConfiguration());
+ } catch (Exception e) {
+ LOGGER.error(" Failed to complete transform", e);
+ throw new RuntimeException(e.getMessage());
+ }
+ }
}
@Override
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/PhoenixTransformReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/PhoenixTransformReducer.java
new file mode 100644
index 0000000..4a1c4fe
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/PhoenixTransformReducer.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.transform;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.schema.transform.Transform;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Reducer class that does only one task and that is to complete transform.
+ */
+public class PhoenixTransformReducer extends
+ Reducer<ImmutableBytesWritable, IntWritable, NullWritable, NullWritable> {
+ private AtomicBoolean calledOnce = new AtomicBoolean(false);
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(PhoenixTransformReducer.class);
+
+ @Override
+ protected void setup(Context context) throws IOException {
+ }
+
+ @Override
+ protected void reduce(ImmutableBytesWritable arg0, Iterable<IntWritable> arg1,
+ Context context)
+ throws IOException, InterruptedException {
+ if (!calledOnce.compareAndSet(false, true)) {
+ return;
+ }
+
+ try (final Connection
+ connection = ConnectionUtil.getInputConnection(context.getConfiguration())){
+ // Complete full Transform and add a partial transform
+ Transform.completeTransform(connection, context.getConfiguration());
+ } catch (Exception e) {
+ LOGGER.error(" Failed to complete transform", e);
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+ @Override
+ protected void cleanup(Context context) throws IOException, InterruptedException{
+
+ }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/TransformTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/TransformTool.java
new file mode 100644
index 0000000..5e5cdcf
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/TransformTool.java
@@ -0,0 +1,762 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.transform;
+
+import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobPriority;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.phoenix.compile.PostIndexDDLCompiler;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.CsvBulkImportUtil;
+import org.apache.phoenix.mapreduce.PhoenixServerBuildIndexInputFormat;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.mapreduce.index.PhoenixServerBuildIndexDBWritable;
+import org.apache.phoenix.mapreduce.index.PhoenixServerBuildIndexMapper;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
+import org.apache.phoenix.parse.HintNode;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.transform.SystemTransformRecord;
+import org.apache.phoenix.schema.transform.Transform;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLine;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLineParser;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.HelpFormatter;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.Option;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.Options;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.ParseException;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.PosixParser;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY;
+import static org.apache.phoenix.mapreduce.index.IndexTool.isTimeRangeSet;
+import static org.apache.phoenix.mapreduce.index.IndexTool.validateTimeRange;
+import static org.apache.phoenix.util.QueryUtil.getConnection;
+
+public class TransformTool extends Configured implements Tool {
+ private static final Logger LOGGER = LoggerFactory.getLogger(TransformTool.class);
+
+ public enum MR_COUNTER_METRICS {
+ TRANSFORM_FAILED,
+ TRANSFORM_SUCCEED
+ }
+
+ private static final Option OUTPUT_PATH_OPTION = new Option("op", "output-path", true,
+ "Output path where the files are written");
+ private static final Option SCHEMA_NAME_OPTION = new Option("s", "schema", true,
+ "Phoenix schema name (optional)");
+ private static final Option DATA_TABLE_OPTION = new Option("dt", "data-table", true,
+ "Data table name (mandatory)");
+ private static final Option INDEX_TABLE_OPTION = new Option("it", "index-table", true,
+ "Index table name(not required in case of partial rebuilding)");
+
+ private static final Option PARTIAL_TRANSFORM_OPTION = new Option("pt", "partial-transform", false,
+ "To transform a data table from a start timestamp");
+
+ private static final Option ABORT_TRANSFORM_OPTION = new Option("abort", "abort", false,
+ "Aborts the ongoing transform");
+
+ private static final Option PAUSE_TRANSFORM_OPTION = new Option("pause", "pause", false,
+ "Pauses the ongoing transform. If the ongoing transform fails, it will not be retried");
+
+ private static final Option RESUME_TRANSFORM_OPTION = new Option("resume", "resume", false,
+ "Resumes the ongoing transform");
+
+ private static final Option JOB_PRIORITY_OPTION = new Option("p", "job-priority", true,
+ "Define job priority from 0(highest) to 4. Default is 2(normal)");
+
+ private static final int DEFAULT_AUTOSPLIT_NUM_REGIONS = 20;
+
+ private static final Option AUTO_SPLIT_OPTION =
+ new Option("spa", "autosplit", true,
+ "Automatically split the new table if the # of data table regions is greater than N. "
+ + "Takes an optional argument specifying N, otherwise defaults to " + DEFAULT_AUTOSPLIT_NUM_REGIONS
+ );
+
+ private static final Option RUN_FOREGROUND_OPTION =
+ new Option(
+ "runfg",
+ "run-foreground",
+ false,
+ "If specified, runs transform in Foreground. Default - Runs the transform in background.");
+
+ private static final Option TENANT_ID_OPTION = new Option("tenant", "tenant-id", true,
+ "If specified, uses Tenant connection for tenant index transform (optional)");
+
+ private static final Option HELP_OPTION = new Option("h", "help", false, "Help");
+ private static final Option START_TIME_OPTION = new Option("st", "start-time",
+ true, "Start time for transform");
+
+ private static final Option END_TIME_OPTION = new Option("et", "end-time",
+ true, "End time for transform");
+
+ public static final String TRANSFORM_JOB_NAME_TEMPLATE = "PHOENIX_TRANS_%s.%s";
+
+ public static final String PARTIAL_TRANSFORM_NOT_APPLICABLE = "Partial transform accepts "
+ + "non-zero ts set in the past as start-time(st) option and that ts must be present in SYSTEM.TRANSFORM table";
+
+ public static final String TRANSFORM_NOT_APPLICABLE = "Transform is not applicable for local indexes or views or transactional tables";
+
+ public static final String PARTIAL_TRANSFORM_NOT_COMPATIBLE = "Can't abort/pause/resume/split during partial transform";
+
+ private Configuration configuration;
+ private Connection connection;
+ private String tenantId;
+ private String dataTable;
+ private String logicalParentName;
+ private String basePath;
+ // logicalTableName is index table and logicalParentName is the data table if this is an index transform
+ // If this is a data table transform, logicalParentName is null and logicalTableName is dataTable
+ private String logicalTableName;
+ private String schemaName;
+ private String indexTable;
+ private String qDataTable; //normalized with schema
+ private PTable pIndexTable = null;
+ private PTable pDataTable;
+ private PTable pOldTable;
+ private PTable pNewTable;
+
+ private String oldTableWithSchema;
+ private String newTableWithSchema;
+ private JobPriority jobPriority;
+ private String jobName;
+ private boolean isForeground;
+ private Long startTime, endTime, lastTransformTime;
+ private boolean isPartialTransform;
+ private Job job;
+
+ public Long getStartTime() {
+ return startTime;
+ }
+
+ public Long getEndTime() { return endTime; }
+
+ public CommandLine parseOptions(String[] args) {
+ final Options options = getOptions();
+ CommandLineParser parser = new PosixParser();
+ CommandLine cmdLine = null;
+ try {
+ cmdLine = parser.parse(options, args);
+ } catch (ParseException e) {
+ printHelpAndExit("Error parsing command line options: " + e.getMessage(),
+ options);
+ }
+
+ if (cmdLine.hasOption(HELP_OPTION.getOpt())) {
+ printHelpAndExit(options, 0);
+ }
+
+ this.jobPriority = getJobPriority(cmdLine);
+
+ boolean dataTableProvided = (cmdLine.hasOption(DATA_TABLE_OPTION.getOpt()));
+ if (!dataTableProvided) {
+ throw new IllegalStateException(DATA_TABLE_OPTION.getLongOpt() + " is a mandatory parameter");
+ }
+
+ return cmdLine;
+ }
+
+ private Options getOptions() {
+ final Options options = new Options();
+ options.addOption(OUTPUT_PATH_OPTION);
+ options.addOption(SCHEMA_NAME_OPTION);
+ options.addOption(DATA_TABLE_OPTION);
+ options.addOption(INDEX_TABLE_OPTION);
+ options.addOption(TENANT_ID_OPTION);
+ options.addOption(HELP_OPTION);
+ options.addOption(JOB_PRIORITY_OPTION);
+ options.addOption(RUN_FOREGROUND_OPTION);
+ options.addOption(PARTIAL_TRANSFORM_OPTION);
+ options.addOption(START_TIME_OPTION);
+ options.addOption(END_TIME_OPTION);
+ options.addOption(AUTO_SPLIT_OPTION);
+ options.addOption(ABORT_TRANSFORM_OPTION);
+ options.addOption(PAUSE_TRANSFORM_OPTION);
+ options.addOption(RESUME_TRANSFORM_OPTION);
+ START_TIME_OPTION.setOptionalArg(true);
+ END_TIME_OPTION.setOptionalArg(true);
+ return options;
+ }
+
+ private void printHelpAndExit(String errorMessage, Options options) {
+ System.err.println(errorMessage);
+ LOGGER.error(errorMessage);
+ printHelpAndExit(options, 1);
+ }
+
+ private void printHelpAndExit(Options options, int exitCode) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("help", options);
+ System.exit(exitCode);
+ }
+
+ public CommandLine parseArgs(String[] args) throws Exception {
+ CommandLine cmdLine;
+ try {
+ cmdLine = parseOptions(args);
+ } catch (IllegalStateException e) {
+ printHelpAndExit(e.getMessage(), getOptions());
+ throw e;
+ }
+
+ if (getConf() == null) {
+ setConf(HBaseConfiguration.create());
+ }
+
+ return cmdLine;
+ }
+
+ @VisibleForTesting
+ public int populateTransformToolAttributesAndValidate(CommandLine cmdLine) throws Exception {
+ boolean useStartTime = cmdLine.hasOption(START_TIME_OPTION.getOpt());
+ boolean useEndTime = cmdLine.hasOption(END_TIME_OPTION.getOpt());
+ basePath = cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt());
+ isPartialTransform = cmdLine.hasOption(PARTIAL_TRANSFORM_OPTION.getOpt());
+ if (useStartTime) {
+ startTime = new Long(cmdLine.getOptionValue(START_TIME_OPTION.getOpt()));
+ }
+
+ if (useEndTime) {
+ endTime = new Long(cmdLine.getOptionValue(END_TIME_OPTION.getOpt()));
+ }
+
+ if (isTimeRangeSet(startTime, endTime)) {
+ validateTimeRange(startTime, endTime);
+ }
+
+ if (isPartialTransform &&
+ (cmdLine.hasOption(AUTO_SPLIT_OPTION.getOpt()))) {
+ throw new IllegalArgumentException(PARTIAL_TRANSFORM_NOT_COMPATIBLE);
+ }
+ if (isPartialTransform &&
+ (cmdLine.hasOption(ABORT_TRANSFORM_OPTION.getOpt()) || cmdLine.hasOption(PAUSE_TRANSFORM_OPTION.getOpt())
+ || cmdLine.hasOption(RESUME_TRANSFORM_OPTION.getOpt()))) {
+ throw new IllegalArgumentException(PARTIAL_TRANSFORM_NOT_COMPATIBLE);
+ }
+
+ if (isPartialTransform) {
+ if (!cmdLine.hasOption(START_TIME_OPTION.getOpt())) {
+ throw new IllegalArgumentException(PARTIAL_TRANSFORM_NOT_APPLICABLE);
+ }
+ lastTransformTime = new Long(cmdLine.getOptionValue(START_TIME_OPTION.getOpt()));
+ SystemTransformRecord transformRecord = getTransformRecord(null);
+ if (transformRecord == null) {
+ throw new IllegalArgumentException(PARTIAL_TRANSFORM_NOT_APPLICABLE);
+ }
+ if (lastTransformTime == null) {
+ lastTransformTime = transformRecord.getTransformEndTs().getTime();
+ } else {
+ validateLastTransformTime();
+ }
+ }
+
+ schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt());
+ dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt());
+ indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
+ qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable);
+ isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
+ logicalTableName = dataTable;
+ logicalParentName = null;
+ if (!Strings.isNullOrEmpty(indexTable)) {
+ logicalTableName = indexTable;
+ logicalParentName = SchemaUtil.getTableName(schemaName, dataTable);
+ }
+
+ pDataTable = PhoenixRuntime.getTable(
+ connection, SchemaUtil.getQualifiedTableName(schemaName, dataTable));
+ if (indexTable != null) {
+ pIndexTable = PhoenixRuntime.getTable(
+ connection, SchemaUtil.getQualifiedTableName(schemaName, indexTable));
+ pOldTable = pIndexTable;
+ } else {
+ pOldTable = pDataTable;
+ }
+
+ SystemTransformRecord transformRecord = getTransformRecord(connection.unwrap(PhoenixConnection.class));
+
+ validateTransform(pDataTable, pIndexTable, transformRecord);
+ String newTableName = SchemaUtil.getTableNameFromFullName(transformRecord.getNewPhysicalTableName());
+ pNewTable = PhoenixRuntime.getTableNoCache(
+ connection, SchemaUtil.getQualifiedTableName(schemaName, newTableName));
+
+
+ oldTableWithSchema = SchemaUtil.getQualifiedPhoenixTableName(schemaName, SchemaUtil.getTableNameFromFullName(pOldTable.getName().getString()));
+ newTableWithSchema = SchemaUtil.getQualifiedPhoenixTableName(schemaName, SchemaUtil.getTableNameFromFullName(pNewTable.getName().getString()));
+ return 0;
+ }
+
+ public void validateTransform(PTable argPDataTable, PTable argIndexTable, SystemTransformRecord transformRecord) throws Exception {
+
+ if (argPDataTable.getType() != PTableType.TABLE) {
+ throw new IllegalArgumentException(TRANSFORM_NOT_APPLICABLE);
+ }
+
+ if (argIndexTable != null && argIndexTable.getType() != PTableType.INDEX) {
+ throw new IllegalArgumentException(TRANSFORM_NOT_APPLICABLE);
+ }
+
+ if (argPDataTable.isTransactional()) {
+ throw new IllegalArgumentException(TRANSFORM_NOT_APPLICABLE);
+ }
+
+ if (transformRecord == null){
+ throw new IllegalStateException("ALTER statement has not been run and the transform has not been created for this table");
+ }
+
+ if (pDataTable != null && pIndexTable != null) {
+ if (!IndexTool.isValidIndexTable(connection, qDataTable, indexTable, tenantId)) {
+ throw new IllegalArgumentException(
+ String.format(" %s is not an index table for %s for this connection",
+ indexTable, qDataTable));
+ }
+
+ PTable.IndexType indexType = argIndexTable.getIndexType();
+ if (PTable.IndexType.LOCAL.equals(indexType)) {
+ throw new IllegalArgumentException(TRANSFORM_NOT_APPLICABLE);
+ }
+ }
+ }
+
+ public int validateLastTransformTime() throws Exception {
+ Long currentTime = EnvironmentEdgeManager.currentTimeMillis();
+ if (lastTransformTime.compareTo(currentTime) > 0 || lastTransformTime == 0L) {
+ throw new RuntimeException(PARTIAL_TRANSFORM_NOT_APPLICABLE);
+ }
+ return 0;
+ }
+
+ public SystemTransformRecord getTransformRecord(PhoenixConnection connection) throws Exception {
+ if (connection == null) {
+ try (Connection conn = getConnection(configuration)) {
+ SystemTransformRecord transformRecord = Transform.getTransformRecord(schemaName, logicalTableName, logicalParentName, tenantId, conn.unwrap(PhoenixConnection.class));
+ return transformRecord;
+ }
+ } else {
+ return Transform.getTransformRecord(schemaName, logicalTableName, logicalParentName, tenantId, connection);
+ }
+ }
+
+ public String getJobPriority() {
+ return this.jobPriority.toString();
+ }
+
+ private JobPriority getJobPriority(CommandLine cmdLine) {
+ String jobPriorityOption = cmdLine.getOptionValue(JOB_PRIORITY_OPTION.getOpt());
+ if (jobPriorityOption == null) {
+ return JobPriority.NORMAL;
+ }
+
+ switch (jobPriorityOption) {
+ case "0" : return JobPriority.VERY_HIGH;
+ case "1" : return JobPriority.HIGH;
+ case "2" : return JobPriority.NORMAL;
+ case "3" : return JobPriority.LOW;
+ case "4" : return JobPriority.VERY_LOW;
+ default:
+ return JobPriority.NORMAL;
+ }
+ }
+
+ public Job getJob() {
+ return this.job;
+ }
+
+ public String getTenantId() {
+ return this.tenantId;
+ }
+
+ public void setJobName(String jobName) {
+ this.jobName = jobName;
+ }
+
+ public Job configureJob() throws Exception {
+ final String jobName = String.format(TRANSFORM_JOB_NAME_TEMPLATE, schemaName, dataTable, indexTable);
+ if (lastTransformTime != null) {
+ PhoenixConfigurationUtil.setCurrentScnValue(configuration, lastTransformTime);
+ }
+
+ final PhoenixConnection pConnection = connection.unwrap(PhoenixConnection.class);
+ final PostIndexDDLCompiler ddlCompiler =
+ new PostIndexDDLCompiler(pConnection, new TableRef(pOldTable), true);
+ ddlCompiler.compile(pNewTable);
+ final List<String> newColumns = ddlCompiler.getDataColumnNames();
+ //final String selectQuery = ddlCompiler.getSelectQuery();
+ final String upsertQuery =
+ QueryUtil.constructUpsertStatement(newTableWithSchema, newColumns, HintNode.Hint.NO_INDEX);
+
+ configuration.set(PhoenixConfigurationUtil.UPSERT_STATEMENT, upsertQuery);
+ //PhoenixConfigurationUtil.setPhysicalTableName(configuration, pNewTable.getPhysicalName().getString());
+
+ PhoenixConfigurationUtil.setUpsertColumnNames(configuration,
+ ddlCompiler.getIndexColumnNames().toArray(new String[ddlCompiler.getIndexColumnNames().size()]));
+ if (tenantId != null) {
+ PhoenixConfigurationUtil.setTenantId(configuration, tenantId);
+ }
+
+ long indexRebuildQueryTimeoutMs =
+ configuration.getLong(QueryServices.INDEX_REBUILD_QUERY_TIMEOUT_ATTRIB,
+ QueryServicesOptions.DEFAULT_INDEX_REBUILD_QUERY_TIMEOUT);
+ long indexRebuildRPCTimeoutMs =
+ configuration.getLong(QueryServices.INDEX_REBUILD_RPC_TIMEOUT_ATTRIB,
+ QueryServicesOptions.DEFAULT_INDEX_REBUILD_RPC_TIMEOUT);
+ long indexRebuildClientScannerTimeOutMs =
+ configuration.getLong(QueryServices.INDEX_REBUILD_CLIENT_SCANNER_TIMEOUT_ATTRIB,
+ QueryServicesOptions.DEFAULT_INDEX_REBUILD_CLIENT_SCANNER_TIMEOUT);
+ int indexRebuildRpcRetriesCounter =
+ configuration.getInt(QueryServices.INDEX_REBUILD_RPC_RETRIES_COUNTER,
+ QueryServicesOptions.DEFAULT_INDEX_REBUILD_RPC_RETRIES_COUNTER);
+ // Set various phoenix and hbase level timeouts and rpc retries
+ configuration.set(QueryServices.THREAD_TIMEOUT_MS_ATTRIB,
+ Long.toString(indexRebuildQueryTimeoutMs));
+ configuration.set(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
+ Long.toString(indexRebuildClientScannerTimeOutMs));
+ configuration.set(HConstants.HBASE_RPC_TIMEOUT_KEY,
+ Long.toString(indexRebuildRPCTimeoutMs));
+ configuration.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+ Long.toString(indexRebuildRpcRetriesCounter));
+ configuration.set("mapreduce.task.timeout", Long.toString(indexRebuildQueryTimeoutMs));
+
+ PhoenixConfigurationUtil.setIndexToolDataTableName(configuration, oldTableWithSchema);
+ PhoenixConfigurationUtil.setIndexToolIndexTableName(configuration, newTableWithSchema);
+ PhoenixConfigurationUtil.setIndexToolSourceTable(configuration, IndexScrutinyTool.SourceTable.DATA_TABLE_SOURCE);
+ if (startTime != null) {
+ PhoenixConfigurationUtil.setIndexToolStartTime(configuration, startTime);
+ }
+
+ PhoenixConfigurationUtil.setPhysicalTableName(configuration, pNewTable.getPhysicalName().getString());
+ PhoenixConfigurationUtil.setIsTransforming(configuration, true);
+ Path outputPath = null;
+ org.apache.hadoop.fs.FileSystem fs;
+ if (basePath != null) {
+ outputPath =
+ CsvBulkImportUtil.getOutputPath(new Path(basePath),
+ pIndexTable == null ?
+ pDataTable.getPhysicalName().getString() :
+ pIndexTable.getPhysicalName().getString());
+ fs = outputPath.getFileSystem(configuration);
+ fs.delete(outputPath, true);
+ }
+ this.job = Job.getInstance(getConf(), jobName);
+ job.setJarByClass(TransformTool.class);
+ job.setPriority(this.jobPriority);
+ PhoenixMapReduceUtil.setInput(job, PhoenixServerBuildIndexDBWritable.class, PhoenixServerBuildIndexInputFormat.class,
+ oldTableWithSchema, "");
+ if (outputPath != null) {
+ FileOutputFormat.setOutputPath(job, outputPath);
+ }
+ job.setReducerClass(PhoenixTransformReducer.class);
+ job.setNumReduceTasks(1);
+ job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+
+ //Set the Output classes
+ job.setMapOutputValueClass(IntWritable.class);
+ job.setOutputKeyClass(NullWritable.class);
+ job.setOutputValueClass(NullWritable.class);
+ TableMapReduceUtil.addDependencyJars(job);
+ job.setMapperClass(PhoenixServerBuildIndexMapper.class);
+
+ TableMapReduceUtil.initCredentials(job);
+ LOGGER.info("TransformTool is running for " + job.getJobName());
+
+ return job;
+ }
+
+ public int runJob() throws IOException {
+ try {
+ if (isForeground) {
+ LOGGER.info("Running TransformTool in foreground. " +
+ "Runs full table scans. This may take a long time!");
+ return (job.waitForCompletion(true)) ? 0 : 1;
+ } else {
+ LOGGER.info("Running TransformTool in Background - Submit async and exit");
+ job.submit();
+ return 0;
+ }
+ } catch (Exception e) {
+ LOGGER.error("Caught exception " + e + " trying to run TransformTool.");
+ return 1;
+ }
+ }
+
+ private void preSplitTable(CommandLine cmdLine, Connection connection,
+ Configuration configuration, PTable newTable, PTable oldTable)
+ throws SQLException, IOException {
+ boolean autosplit = cmdLine.hasOption(AUTO_SPLIT_OPTION.getOpt());
+
+ if (autosplit) {
+ String nOpt = cmdLine.getOptionValue(AUTO_SPLIT_OPTION.getOpt());
+ int autosplitNumRegions = nOpt == null ? DEFAULT_AUTOSPLIT_NUM_REGIONS : Integer.parseInt(nOpt);
+ LOGGER.info(String.format("Will split table %s , autosplit=%s ," +
+ " autoSplitNumRegions=%s", newTable.getPhysicalName(),
+ autosplit, autosplitNumRegions));
+
+ splitTable(connection.unwrap(PhoenixConnection.class), autosplit,
+ autosplitNumRegions, newTable, oldTable);
+ }
+ }
+
+ private void splitTable(PhoenixConnection pConnection, boolean autosplit,
+ int autosplitNumRegions, PTable newTable, PTable oldTable)
+ throws SQLException, IOException, IllegalArgumentException {
+ int numRegions;
+ byte[][] oldSplitPoints = null;
+ byte[][] newSplitPoints = null;
+ // TODO : if the rowkey changes via transform, we need to create new split points
+ try (Table hDataTable =
+ (Table) pConnection.getQueryServices()
+ .getTable(oldTable.getPhysicalName().getBytes());
+ org.apache.hadoop.hbase.client.Connection connection =
+ HBaseFactoryProvider.getHConnectionFactory().createConnection(configuration)) {
+ // Avoid duplicate split keys and remove the empty key
+ oldSplitPoints = connection.getRegionLocator(hDataTable.getName()).getStartKeys();
+ Arrays.sort(oldSplitPoints, Bytes.BYTES_COMPARATOR);
+ int numSplits = oldSplitPoints.length;
+ ArrayList<byte[]> splitList = new ArrayList<>();
+ byte[] lastKey = null;
+ for (byte[] keyBytes : oldSplitPoints) {
+ if (Bytes.compareTo(keyBytes, EMPTY_BYTE_ARRAY)!=0) {
+ if (lastKey != null && !Bytes.equals(keyBytes, lastKey)) {
+ splitList.add(keyBytes);
+ }
+ }
+ lastKey = keyBytes;
+ }
+ newSplitPoints = new byte[splitList.size()][];
+ for (int i=0; i < splitList.size(); i++) {
+ newSplitPoints[i] = splitList.get(i);
+ }
+ numRegions = newSplitPoints.length;
+ if (autosplit && (numRegions <= autosplitNumRegions)) {
+ LOGGER.info(String.format(
+ "Will not split %s because the data table only has %s regions, autoSplitNumRegions=%s",
+ newTable.getPhysicalName(), numRegions, autosplitNumRegions));
+ return; // do nothing if # of regions is too low
+ }
+ }
+
+ try (HBaseAdmin admin = pConnection.getQueryServices().getAdmin()) {
+ // do the split
+ // drop table and recreate with appropriate splits
+ TableName newTableSplitted = TableName.valueOf(newTable.getPhysicalName().getBytes());
+ HTableDescriptor descriptor = admin.getTableDescriptor(newTableSplitted);
+ admin.disableTable(newTableSplitted);
+ admin.deleteTable(newTableSplitted);
+ admin.createTable(descriptor, newSplitPoints);
+ }
+ }
+
+ public void updateTransformRecord(PhoenixConnection connection, PTable.TransformStatus newStatus) throws Exception {
+ SystemTransformRecord transformRecord = getTransformRecord(connection);
+ updateTransformRecord(connection, transformRecord, newStatus);
+ }
+
+ public static void updateTransformRecord(PhoenixConnection connection, SystemTransformRecord transformRecord, PTable.TransformStatus newStatus) throws Exception {
+ SystemTransformRecord.SystemTransformBuilder builder = new SystemTransformRecord.SystemTransformBuilder(transformRecord);
+ builder.setTransformStatus(newStatus.name());
+ if (newStatus == PTable.TransformStatus.COMPLETED || newStatus == PTable.TransformStatus.FAILED) {
+ builder.setEndTs(new Timestamp(EnvironmentEdgeManager.currentTimeMillis()));
+ }
+ Transform.upsertTransform(builder.build(), connection.unwrap(PhoenixConnection.class));
+ }
+
+ protected void updateTransformRecord(Job job) throws Exception {
+ if (job == null) {
+ return;
+ }
+ SystemTransformRecord transformRecord = getTransformRecord(connection.unwrap(PhoenixConnection.class));
+ SystemTransformRecord.SystemTransformBuilder builder = new SystemTransformRecord.SystemTransformBuilder(transformRecord);
+ builder.setTransformJobId(job.getJobID().toString());
+ builder.setStartTs(new Timestamp(EnvironmentEdgeManager.currentTimeMillis()));
+ Transform.upsertTransform(builder.build(), connection.unwrap(PhoenixConnection.class));
+ }
+
+ public void killJob(SystemTransformRecord transformRecord) throws Exception{
+ String jobId = transformRecord.getTransformJobId();
+ if (!Strings.isNullOrEmpty(jobId)) {
+ JobClient jobClient = new JobClient();
+ RunningJob runningJob = jobClient.getJob(jobId);
+ if (runningJob != null) {
+ try {
+ runningJob.killJob();
+ } catch (IOException ex) {
+ LOGGER.warn("Transform abort could not kill the job. ", ex);
+ }
+ }
+ }
+ }
+
+ public void abortTransform() throws Exception {
+ SystemTransformRecord transformRecord = getTransformRecord(connection.unwrap(PhoenixConnection.class));
+ if (transformRecord.getTransformStatus().equals(PTable.TransformStatus.COMPLETED.name())) {
+ throw new IllegalStateException("A completed transform cannot be aborted");
+ }
+
+ killJob(transformRecord);
+ Transform.removeTransformRecord(transformRecord, connection.unwrap(PhoenixConnection.class));
+
+ // TODO: disable transform on the old table
+
+ // Cleanup syscat
+ try (Statement stmt = connection.createStatement()) {
+ if (pIndexTable != null) {
+ stmt.execute("DROP INDEX " + transformRecord.getNewPhysicalTableName());
+ } else {
+ stmt.execute("DROP TABLE " + transformRecord.getNewPhysicalTableName());
+ }
+ } catch (SQLException ex) {
+ LOGGER.warn("Transform abort could not drop the table " + transformRecord.getNewPhysicalTableName());
+ }
+ }
+
+ public void pauseTransform() throws Exception {
+ SystemTransformRecord transformRecord = getTransformRecord(connection.unwrap(PhoenixConnection.class));
+ if (transformRecord.getTransformStatus().equals(PTable.TransformStatus.COMPLETED.name())) {
+ throw new IllegalStateException("A completed transform cannot be paused");
+ }
+
+ updateTransformRecord(connection.unwrap(PhoenixConnection.class), PTable.TransformStatus.PAUSED);
+ killJob(transformRecord);
+ }
+
+ public void resumeTransform(String[] args, CommandLine cmdLine) throws Exception {
+ SystemTransformRecord transformRecord = getTransformRecord(connection.unwrap(PhoenixConnection.class));
+ if (!transformRecord.getTransformStatus().equals(PTable.TransformStatus.PAUSED.name())) {
+ throw new IllegalStateException("Only a paused transform can be resumed");
+ }
+
+ runTransform(args, cmdLine);
+ }
+
+ public int runTransform(String[] args, CommandLine cmdLine) throws Exception {
+ int status = 0;
+ updateTransformRecord(connection.unwrap(PhoenixConnection.class), PTable.TransformStatus.STARTED);
+ PhoenixConfigurationUtil.setIsPartialTransform(configuration, isPartialTransform);
+ PhoenixConfigurationUtil.setIsTransforming(configuration, true);
+
+ if (!Strings.isNullOrEmpty(indexTable)) {
+ PhoenixConfigurationUtil.setTransformingTableType(configuration, IndexScrutinyTool.SourceTable.INDEX_TABLE_SOURCE);
+ // Index table transform. Build the index
+ IndexTool indexTool = new IndexTool();
+ indexTool.setConf(configuration);
+ status = indexTool.run(args);
+ Job job = indexTool.getJob();
+ updateTransformRecord(job);
+ } else {
+ PhoenixConfigurationUtil.setTransformingTableType(configuration, IndexScrutinyTool.SourceTable.DATA_TABLE_SOURCE);
+ if (!isPartialTransform) {
+ preSplitTable(cmdLine, connection, configuration, pNewTable, pOldTable);
+ }
+ configureJob();
+ status = runJob();
+ updateTransformRecord(this.job);
+ }
+
+ // Record status
+ if (status != 0) {
+ LOGGER.error("TransformTool/IndexTool job failed! Check logs for errors..");
+ updateTransformRecord(connection.unwrap(PhoenixConnection.class), PTable.TransformStatus.FAILED);
+ return -1;
+ }
+
+ return status;
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ connection = null;
+ int ret = 0;
+ CommandLine cmdLine = null;
+ configuration = HBaseConfiguration.addHbaseResources(getConf());
+ try {
+ cmdLine = parseArgs(args);
+ if (cmdLine.hasOption(TENANT_ID_OPTION.getOpt())) {
+ tenantId = cmdLine.getOptionValue(TENANT_ID_OPTION.getOpt());
+ if (!Strings.isNullOrEmpty(tenantId)) {
+ configuration.set(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+ }
+ }
+ try (Connection conn = getConnection(configuration)) {
+ this.connection = conn;
+ this.connection.setAutoCommit(true);
+ populateTransformToolAttributesAndValidate(cmdLine);
+ if (cmdLine.hasOption(ABORT_TRANSFORM_OPTION.getOpt())) {
+ abortTransform();
+ } else if (cmdLine.hasOption(PAUSE_TRANSFORM_OPTION.getOpt())) {
+ pauseTransform();
+ } else if (cmdLine.hasOption(RESUME_TRANSFORM_OPTION.getOpt())) {
+ resumeTransform(args, cmdLine);
+ } else {
+ ret = runTransform(args, cmdLine);
+ }
+ return ret;
+ } catch (Exception ex) {
+ LOGGER.error("An error occurred while transforming " + ExceptionUtils.getMessage(ex) + " at:\n" + ExceptionUtils.getStackTrace(ex));
+ return -1;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ printHelpAndExit(e.toString(), getOptions());
+ return -1;
+ }
+ }
+
+ public static void main(final String[] args) throws Exception {
+ int result = ToolRunner.run(new TransformTool(), args);
+ System.exit(result);
+ }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java
index 05d10f4..2b93a86 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java
@@ -133,7 +133,7 @@ public class IndexColumnNames {
}
}
- private String getDataColFullName(PColumn dCol) {
+ public static String getDataColFullName(PColumn dCol) {
String dColFullName = "";
if (dCol.getFamilyName() != null) {
dColFullName += dCol.getFamilyName().getString() + QueryConstants.NAME_SEPARATOR;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index 8cb3445..9d7df7e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -196,6 +196,15 @@ public final class PhoenixConfigurationUtil {
// by default MR snapshot restore is handled internally by phoenix
public static final boolean DEFAULT_MAPREDUCE_EXTERNAL_SNAPSHOT_RESTORE = false;
+ // Is the mapreduce used for table/index transform
+ public static final String IS_TRANSFORMING_VALUE = "phoenix.mr.istransforming";
+
+ // Is the mapreduce used for table/index transform
+ public static final String TRANSFORMING_TABLE_TYPE = "phoenix.mr.transform.tabletype";
+
+ public static final String IS_PARTIAL_TRANSFORM = "phoenix.mr.transform.ispartial";
+
+
/**
* Determines type of Phoenix Map Reduce job.
* 1. QUERY allows running arbitrary queries without aggregates
@@ -602,6 +611,40 @@ public final class PhoenixConfigurationUtil {
return clientPortString==null ? null : Integer.parseInt(clientPortString);
}
+ public static void setIsTransforming(Configuration configuration, Boolean isTransforming) {
+ Preconditions.checkNotNull(configuration);
+ Preconditions.checkNotNull(isTransforming);
+ configuration.set(IS_TRANSFORMING_VALUE, Boolean.toString(isTransforming));
+ }
+
+ public static Boolean getIsTransforming(Configuration configuration) {
+ Preconditions.checkNotNull(configuration);
+ return Boolean.valueOf(configuration.get(IS_TRANSFORMING_VALUE, "false"));
+ }
+
+ public static void setTransformingTableType(Configuration configuration,
+ SourceTable sourceTable) {
+ Preconditions.checkNotNull(configuration);
+ Preconditions.checkNotNull(sourceTable);
+ configuration.set(TRANSFORMING_TABLE_TYPE, sourceTable.name());
+ }
+
+ public static SourceTable getTransformingTableType(Configuration configuration) {
+ Preconditions.checkNotNull(configuration);
+ return SourceTable.valueOf(configuration.get(TRANSFORMING_TABLE_TYPE));
+ }
+
+ public static void setIsPartialTransform(final Configuration configuration, Boolean partialTransform) throws SQLException {
+ Preconditions.checkNotNull(configuration);
+ Preconditions.checkNotNull(partialTransform);
+ configuration.set(IS_PARTIAL_TRANSFORM, String.valueOf(partialTransform));
+ }
+
+ public static boolean getIsPartialTransform(final Configuration configuration) {
+ Preconditions.checkNotNull(configuration);
+ return configuration.getBoolean(IS_PARTIAL_TRANSFORM, false);
+ }
+
/**
* Returns the HBase zookeeper znode parent
* @param configuration
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnMetaDataOps.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnMetaDataOps.java
new file mode 100644
index 0000000..90451af
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnMetaDataOps.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema;
+
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.parse.ColumnDef;
+import org.apache.phoenix.parse.ColumnName;
+import org.apache.phoenix.parse.PrimaryKeyConstraint;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.SchemaUtil;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Types;
+
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARRAY_SIZE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_DEF;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_SIZE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TABLE_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TYPE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DECIMAL_DIGITS;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_ROW_TIMESTAMP;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_VIEW_REFERENCED;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.KEY_SEQ;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NULLABLE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ORDINAL_POSITION;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PK_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SORT_ORDER;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT;
+
+public class ColumnMetaDataOps {
+ public static final String UPSERT_COLUMN =
+ "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\"( " +
+ TENANT_ID + "," +
+ TABLE_SCHEM + "," +
+ TABLE_NAME + "," +
+ COLUMN_NAME + "," +
+ COLUMN_FAMILY + "," +
+ DATA_TYPE + "," +
+ NULLABLE + "," +
+ COLUMN_SIZE + "," +
+ DECIMAL_DIGITS + "," +
+ ORDINAL_POSITION + "," +
+ SORT_ORDER + "," +
+ DATA_TABLE_NAME + "," + // write this both in the column and table rows for access by metadata APIs
+ ARRAY_SIZE + "," +
+ VIEW_CONSTANT + "," +
+ IS_VIEW_REFERENCED + "," +
+ PK_NAME + "," + // write this both in the column and table rows for access by metadata APIs
+ KEY_SEQ + "," +
+ COLUMN_DEF + "," +
+ COLUMN_QUALIFIER + ", " +
+ IS_ROW_TIMESTAMP +
+ ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+ public static void addColumnMutation(PhoenixConnection connection, String schemaName, String tableName, PColumn column, String parentTableName, String pkName, Short keySeq, boolean isSalted) throws SQLException {
+ try (PreparedStatement colUpsert = connection.prepareStatement(UPSERT_COLUMN)) {
+ colUpsert.setString(1, connection.getTenantId() == null ? null : connection.getTenantId().getString());
+ colUpsert.setString(2, schemaName);
+ colUpsert.setString(3, tableName);
+ colUpsert.setString(4, column.getName().getString());
+ colUpsert.setString(5, column.getFamilyName() == null ? null : column.getFamilyName().getString());
+ colUpsert.setInt(6, column.getDataType().getSqlType());
+ colUpsert.setInt(7, SchemaUtil.getIsNullableInt(column.isNullable()));
+ if (column.getMaxLength() == null) {
+ colUpsert.setNull(8, Types.INTEGER);
+ } else {
+ colUpsert.setInt(8, column.getMaxLength());
+ }
+ if (column.getScale() == null) {
+ colUpsert.setNull(9, Types.INTEGER);
+ } else {
+ colUpsert.setInt(9, column.getScale());
+ }
+ colUpsert.setInt(10, column.getPosition() + (isSalted ? 0 : 1));
+ colUpsert.setInt(11, column.getSortOrder().getSystemValue());
+ colUpsert.setString(12, parentTableName);
+ if (column.getArraySize() == null) {
+ colUpsert.setNull(13, Types.INTEGER);
+ } else {
+ colUpsert.setInt(13, column.getArraySize());
+ }
+ colUpsert.setBytes(14, column.getViewConstant());
+ colUpsert.setBoolean(15, column.isViewReferenced());
+ colUpsert.setString(16, pkName);
+ if (keySeq == null) {
+ colUpsert.setNull(17, Types.SMALLINT);
+ } else {
+ colUpsert.setShort(17, keySeq);
+ }
+ if (column.getExpressionStr() == null) {
+ colUpsert.setNull(18, Types.VARCHAR);
+ } else {
+ colUpsert.setString(18, column.getExpressionStr());
+ }
+ if (column.getColumnQualifierBytes() == null) {
+ colUpsert.setNull(19, Types.VARBINARY);
+ } else {
+ colUpsert.setBytes(19, column.getColumnQualifierBytes());
+ }
+ colUpsert.setBoolean(20, column.isRowTimestamp());
+ colUpsert.execute();
+ }
+ }
+
+ public static PColumn newColumn(int position, ColumnDef def, PrimaryKeyConstraint pkConstraint, String defaultColumnFamily,
+ boolean addingToPK, byte[] columnQualifierBytes, boolean isImmutableRows) throws SQLException {
+ try {
+ ColumnName columnDefName = def.getColumnDefName();
+ SortOrder sortOrder = def.getSortOrder();
+ boolean isPK = def.isPK();
+ boolean isRowTimestamp = def.isRowTimestamp();
+ if (pkConstraint != null) {
+ Pair<ColumnName, SortOrder> pkSortOrder = pkConstraint.getColumnWithSortOrder(columnDefName);
+ if (pkSortOrder != null) {
+ isPK = true;
+ sortOrder = pkSortOrder.getSecond();
+ isRowTimestamp = pkConstraint.isColumnRowTimestamp(columnDefName);
+ }
+ }
+ String columnName = columnDefName.getColumnName();
+ if (isPK && sortOrder == SortOrder.DESC && def.getDataType() == PVarbinary.INSTANCE) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.DESC_VARBINARY_NOT_SUPPORTED)
+ .setColumnName(columnName)
+ .build().buildException();
+ }
+
+ PName familyName = null;
+ if (def.isPK() && !pkConstraint.getColumnNames().isEmpty() ) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.PRIMARY_KEY_ALREADY_EXISTS)
+ .setColumnName(columnName).build().buildException();
+ }
+ boolean isNull = def.isNull();
+ if (def.getColumnDefName().getFamilyName() != null) {
+ String family = def.getColumnDefName().getFamilyName();
+ if (isPK) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.PRIMARY_KEY_WITH_FAMILY_NAME)
+ .setColumnName(columnName).setFamilyName(family).build().buildException();
+ } else if (!def.isNull() && !isImmutableRows) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.KEY_VALUE_NOT_NULL)
+ .setColumnName(columnName).setFamilyName(family).build().buildException();
+ }
+ familyName = PNameFactory.newName(family);
+ } else if (!isPK) {
+ familyName = PNameFactory.newName(defaultColumnFamily == null ? QueryConstants.DEFAULT_COLUMN_FAMILY : defaultColumnFamily);
+ }
+
+ if (isPK && !addingToPK && pkConstraint.getColumnNames().size() <= 1) {
+ if (def.isNull() && def.isNullSet()) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.SINGLE_PK_MAY_NOT_BE_NULL)
+ .setColumnName(columnName).build().buildException();
+ }
+ isNull = false;
+ }
+ PColumn column = new PColumnImpl(PNameFactory.newName(columnName), familyName, def.getDataType(),
+ def.getMaxLength(), def.getScale(), isNull, position, sortOrder, def.getArraySize(),
+ null, false, def.getExpression(), isRowTimestamp,
+ false, columnQualifierBytes, EnvironmentEdgeManager.currentTimeMillis());
+ return column;
+ } catch (IllegalArgumentException e) { // Based on precondition check in constructor
+ throw new SQLException(e);
+ }
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
index 10f8287..292a0ba 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.transform.TransformMaintainer;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.transaction.TransactionFactory;
@@ -192,6 +193,11 @@ public class DelegateTable implements PTable {
}
@Override
+ public TransformMaintainer getTransformMaintainer(PTable oldTable, PhoenixConnection connection) {
+ return delegate.getTransformMaintainer(oldTable, connection);
+ }
+
+ @Override
public PName getDefaultFamilyName() {
return delegate.getDefaultFamilyName();
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 50d1904..732c718 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -109,6 +109,8 @@ import static org.apache.phoenix.query.QueryConstants.SYSTEM_SCHEMA_NAME;
import static org.apache.phoenix.query.QueryServices.DROP_METADATA_ATTRIB;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RUN_UPDATE_STATS_ASYNC;
+import static org.apache.phoenix.schema.ColumnMetaDataOps.addColumnMutation;
+import static org.apache.phoenix.schema.ColumnMetaDataOps.newColumn;
import static org.apache.phoenix.schema.PTable.EncodedCQCounter.NULL_COUNTER;
import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN;
import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS;
@@ -122,6 +124,7 @@ import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -437,29 +440,6 @@ public class MetaDataClient {
INDEX_DISABLE_TIMESTAMP +","+
ASYNC_REBUILD_TIMESTAMP + " " + PLong.INSTANCE.getSqlTypeName() +
") VALUES (?, ?, ?, ?, ?, ?)";
- private static final String INSERT_COLUMN_CREATE_TABLE =
- "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\"( " +
- TENANT_ID + "," +
- TABLE_SCHEM + "," +
- TABLE_NAME + "," +
- COLUMN_NAME + "," +
- COLUMN_FAMILY + "," +
- DATA_TYPE + "," +
- NULLABLE + "," +
- COLUMN_SIZE + "," +
- DECIMAL_DIGITS + "," +
- ORDINAL_POSITION + "," +
- SORT_ORDER + "," +
- DATA_TABLE_NAME + "," + // write this both in the column and table rows for access by metadata APIs
- ARRAY_SIZE + "," +
- VIEW_CONSTANT + "," +
- IS_VIEW_REFERENCED + "," +
- PK_NAME + "," + // write this both in the column and table rows for access by metadata APIs
- KEY_SEQ + "," +
- COLUMN_DEF + "," +
- COLUMN_QUALIFIER + ", " +
- IS_ROW_TIMESTAMP +
- ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
/*
* Custom sql to add a column to SYSTEM.CATALOG table during upgrade.
@@ -941,57 +921,6 @@ public class MetaDataClient {
return false;
}
- private void addColumnMutation(String schemaName, String tableName, PColumn column, PreparedStatement colUpsert, String parentTableName, String pkName, Short keySeq, boolean isSalted) throws SQLException {
- colUpsert.setString(1, connection.getTenantId() == null ? null : connection.getTenantId().getString());
- colUpsert.setString(2, schemaName);
- colUpsert.setString(3, tableName);
- colUpsert.setString(4, column.getName().getString());
- colUpsert.setString(5, column.getFamilyName() == null ? null : column.getFamilyName().getString());
- colUpsert.setInt(6, column.getDataType().getSqlType());
- colUpsert.setInt(7, SchemaUtil.getIsNullableInt(column.isNullable()));
- if (column.getMaxLength() == null) {
- colUpsert.setNull(8, Types.INTEGER);
- } else {
- colUpsert.setInt(8, column.getMaxLength());
- }
- if (column.getScale() == null) {
- colUpsert.setNull(9, Types.INTEGER);
- } else {
- colUpsert.setInt(9, column.getScale());
- }
- colUpsert.setInt(10, column.getPosition() + (isSalted ? 0 : 1));
- colUpsert.setInt(11, column.getSortOrder().getSystemValue());
- colUpsert.setString(12, parentTableName);
- if (column.getArraySize() == null) {
- colUpsert.setNull(13, Types.INTEGER);
- } else {
- colUpsert.setInt(13, column.getArraySize());
- }
- colUpsert.setBytes(14, column.getViewConstant());
- colUpsert.setBoolean(15, column.isViewReferenced());
- colUpsert.setString(16, pkName);
- if (keySeq == null) {
- colUpsert.setNull(17, Types.SMALLINT);
- } else {
- colUpsert.setShort(17, keySeq);
- }
- if (column.getExpressionStr() == null) {
- colUpsert.setNull(18, Types.VARCHAR);
- } else {
- colUpsert.setString(18, column.getExpressionStr());
- }
- //Do not try to set extra columns when using ALTER_SYSCATALOG_TABLE_UPGRADE
- if (colUpsert.getParameterMetaData().getParameterCount() > 18) {
- if (column.getColumnQualifierBytes() == null) {
- colUpsert.setNull(19, Types.VARBINARY);
- } else {
- colUpsert.setBytes(19, column.getColumnQualifierBytes());
- }
- colUpsert.setBoolean(20, column.isRowTimestamp());
- }
- colUpsert.execute();
- }
-
private void addFunctionArgMutation(String functionName, FunctionArgument arg, PreparedStatement argUpsert, int position) throws SQLException {
argUpsert.setString(1, connection.getTenantId() == null ? null : connection.getTenantId().getString());
argUpsert.setString(2, functionName);
@@ -1006,65 +935,6 @@ public class MetaDataClient {
argUpsert.execute();
}
- private PColumn newColumn(int position, ColumnDef def, PrimaryKeyConstraint pkConstraint, String defaultColumnFamily,
- boolean addingToPK, byte[] columnQualifierBytes, boolean isImmutableRows) throws SQLException {
- try {
- ColumnName columnDefName = def.getColumnDefName();
- SortOrder sortOrder = def.getSortOrder();
- boolean isPK = def.isPK();
- boolean isRowTimestamp = def.isRowTimestamp();
- if (pkConstraint != null) {
- Pair<ColumnName, SortOrder> pkSortOrder = pkConstraint.getColumnWithSortOrder(columnDefName);
- if (pkSortOrder != null) {
- isPK = true;
- sortOrder = pkSortOrder.getSecond();
- isRowTimestamp = pkConstraint.isColumnRowTimestamp(columnDefName);
- }
- }
- String columnName = columnDefName.getColumnName();
- if (isPK && sortOrder == SortOrder.DESC && def.getDataType() == PVarbinary.INSTANCE) {
- throw new SQLExceptionInfo.Builder(SQLExceptionCode.DESC_VARBINARY_NOT_SUPPORTED)
- .setColumnName(columnName)
- .build().buildException();
- }
-
- PName familyName = null;
- if (def.isPK() && !pkConstraint.getColumnNames().isEmpty() ) {
- throw new SQLExceptionInfo.Builder(SQLExceptionCode.PRIMARY_KEY_ALREADY_EXISTS)
- .setColumnName(columnName).build().buildException();
- }
- boolean isNull = def.isNull();
- if (def.getColumnDefName().getFamilyName() != null) {
- String family = def.getColumnDefName().getFamilyName();
- if (isPK) {
- throw new SQLExceptionInfo.Builder(SQLExceptionCode.PRIMARY_KEY_WITH_FAMILY_NAME)
- .setColumnName(columnName).setFamilyName(family).build().buildException();
- } else if (!def.isNull() && !isImmutableRows) {
- throw new SQLExceptionInfo.Builder(SQLExceptionCode.KEY_VALUE_NOT_NULL)
- .setColumnName(columnName).setFamilyName(family).build().buildException();
- }
- familyName = PNameFactory.newName(family);
- } else if (!isPK) {
- familyName = PNameFactory.newName(defaultColumnFamily == null ? QueryConstants.DEFAULT_COLUMN_FAMILY : defaultColumnFamily);
- }
-
- if (isPK && !addingToPK && pkConstraint.getColumnNames().size() <= 1) {
- if (def.isNull() && def.isNullSet()) {
- throw new SQLExceptionInfo.Builder(SQLExceptionCode.SINGLE_PK_MAY_NOT_BE_NULL)
- .setColumnName(columnName).build().buildException();
- }
- isNull = false;
- }
- PColumn column = new PColumnImpl(PNameFactory.newName(columnName), familyName, def.getDataType(),
- def.getMaxLength(), def.getScale(), isNull, position, sortOrder, def.getArraySize(),
- null, false, def.getExpression(), isRowTimestamp,
- false, columnQualifierBytes, EnvironmentEdgeManager.currentTimeMillis());
- return column;
- } catch (IllegalArgumentException e) { // Based on precondition check in constructor
- throw new SQLException(e);
- }
- }
-
public MutationState createTable(CreateTableStatement statement, byte[][] splits, PTable parent, String viewStatement, ViewType viewType, PDataType viewIndexIdType, byte[][] viewColumnConstants, BitSet isViewColumnReferenced) throws SQLException {
TableName tableName = statement.getTableName();
Map<String,Object> tableProps = Maps.newHashMapWithExpectedSize(statement.getProps().size());
@@ -2632,46 +2502,49 @@ public class MetaDataClient {
} else {
encodingScheme = getEncodingScheme(tableProps, schemaName, tableName, transactionProvider);
- if (isImmutableRows) {
- ImmutableStorageScheme immutableStorageSchemeProp =
- (ImmutableStorageScheme) TableProperty.IMMUTABLE_STORAGE_SCHEME
- .getValue(tableProps);
- if (immutableStorageSchemeProp == null) {
- // Ignore default if transactional and column encoding is not supported
- if (transactionProvider == null ||
- !transactionProvider.getTransactionProvider().isUnsupported(
- PhoenixTransactionProvider.Feature.COLUMN_ENCODING) ) {
- if (multiTenant) {
+ ImmutableStorageScheme immutableStorageSchemeProp =
+ (ImmutableStorageScheme) TableProperty.IMMUTABLE_STORAGE_SCHEME
+ .getValue(tableProps);
+ if (immutableStorageSchemeProp == null) {
+ // Ignore default if transactional and column encoding is not supported
+ if (transactionProvider == null ||
+ !transactionProvider.getTransactionProvider().isUnsupported(
+ PhoenixTransactionProvider.Feature.COLUMN_ENCODING)) {
+ if (multiTenant) {
+ immutableStorageScheme =
+ ImmutableStorageScheme
+ .valueOf(connection
+ .getQueryServices()
+ .getProps()
+ .get(
+ QueryServices.DEFAULT_MULTITENANT_IMMUTABLE_STORAGE_SCHEME_ATTRIB,
+ QueryServicesOptions.DEFAULT_MULTITENANT_IMMUTABLE_STORAGE_SCHEME));
+ } else {
+ if (isImmutableRows) {
immutableStorageScheme =
ImmutableStorageScheme
- .valueOf(connection
- .getQueryServices()
- .getProps()
- .get(
- QueryServices.DEFAULT_MULTITENANT_IMMUTABLE_STORAGE_SCHEME_ATTRIB,
- QueryServicesOptions.DEFAULT_MULTITENANT_IMMUTABLE_STORAGE_SCHEME));
+ .valueOf(connection
+ .getQueryServices()
+ .getProps()
+ .get(
+ QueryServices.DEFAULT_IMMUTABLE_STORAGE_SCHEME_ATTRIB,
+ QueryServicesOptions.DEFAULT_IMMUTABLE_STORAGE_SCHEME));
} else {
- immutableStorageScheme =
- ImmutableStorageScheme
- .valueOf(connection
- .getQueryServices()
- .getProps()
- .get(
- QueryServices.DEFAULT_IMMUTABLE_STORAGE_SCHEME_ATTRIB,
- QueryServicesOptions.DEFAULT_IMMUTABLE_STORAGE_SCHEME));
+ immutableStorageScheme = ONE_CELL_PER_COLUMN;
}
}
- } else {
- immutableStorageScheme = getImmutableStorageSchemeForIndex(immutableStorageSchemeProp, schemaName, tableName, transactionProvider);
}
- if (immutableStorageScheme != ONE_CELL_PER_COLUMN
- && encodingScheme == NON_ENCODED_QUALIFIERS) {
- throw new SQLExceptionInfo.Builder(
- SQLExceptionCode.INVALID_IMMUTABLE_STORAGE_SCHEME_AND_COLUMN_QUALIFIER_BYTES)
- .setSchemaName(schemaName).setTableName(tableName).build()
- .buildException();
- }
- }
+ } else {
+ immutableStorageScheme = getImmutableStorageSchemeForIndex(immutableStorageSchemeProp, schemaName, tableName, transactionProvider);
+ }
+ if (immutableStorageScheme != ONE_CELL_PER_COLUMN
+ && encodingScheme == NON_ENCODED_QUALIFIERS) {
+ getEncodingScheme(tableProps, schemaName, tableName, transactionProvider);
+ throw new SQLExceptionInfo.Builder(
+ SQLExceptionCode.INVALID_IMMUTABLE_STORAGE_SCHEME_AND_COLUMN_QUALIFIER_BYTES)
+ .setSchemaName(schemaName).setTableName(tableName).build()
+ .buildException();
+ }
}
cqCounter = encodingScheme != NON_ENCODED_QUALIFIERS ? new EncodedCQCounter() : NULL_COUNTER;
}
@@ -2904,88 +2777,87 @@ public class MetaDataClient {
List<Mutation> columnMetadata = Lists.newArrayListWithExpectedSize(columns.size());
boolean isRegularView = (tableType == PTableType.VIEW && viewType!=ViewType.MAPPED);
- try (PreparedStatement colUpsert = connection.prepareStatement(INSERT_COLUMN_CREATE_TABLE)) {
- for (Map.Entry<PColumn, PColumn> entry : columns.entrySet()) {
- PColumn column = entry.getValue();
- final int columnPosition = column.getPosition();
- // For client-side cache, we need to update the column
- // set the autoPartition column attributes
- if (parent != null && parent.getAutoPartitionSeqName() != null
- && parent.getPKColumns().get(MetaDataUtil.getAutoPartitionColIndex(parent)).equals(column)) {
+ for (Map.Entry<PColumn, PColumn> entry : columns.entrySet()) {
+ PColumn column = entry.getValue();
+ final int columnPosition = column.getPosition();
+ // For client-side cache, we need to update the column
+ // set the autoPartition column attributes
+ if (parent != null && parent.getAutoPartitionSeqName() != null
+ && parent.getPKColumns().get(MetaDataUtil.getAutoPartitionColIndex(parent)).equals(column)) {
+ entry.setValue(column = new DelegateColumn(column) {
+ @Override
+ public byte[] getViewConstant() {
+ // set to non-null value so that we will generate a Put that
+ // will be set correctly on the server
+ return QueryConstants.EMPTY_COLUMN_VALUE_BYTES;
+ }
+
+ @Override
+ public boolean isViewReferenced() {
+ return true;
+ }
+ });
+ } else if (isViewColumnReferenced != null) {
+ if (viewColumnConstants != null && columnPosition < viewColumnConstants.length) {
entry.setValue(column = new DelegateColumn(column) {
@Override
public byte[] getViewConstant() {
- // set to non-null value so that we will generate a Put that
- // will be set correctly on the server
- return QueryConstants.EMPTY_COLUMN_VALUE_BYTES;
+ return viewColumnConstants[columnPosition];
}
@Override
public boolean isViewReferenced() {
- return true;
+ return isViewColumnReferenced.get(columnPosition);
}
});
- } else if (isViewColumnReferenced != null) {
- if (viewColumnConstants != null && columnPosition < viewColumnConstants.length) {
- entry.setValue(column = new DelegateColumn(column) {
- @Override
- public byte[] getViewConstant() {
- return viewColumnConstants[columnPosition];
- }
-
- @Override
- public boolean isViewReferenced() {
- return isViewColumnReferenced.get(columnPosition);
- }
- });
- } else {
- entry.setValue(column = new DelegateColumn(column) {
- @Override
- public boolean isViewReferenced() {
- return isViewColumnReferenced.get(columnPosition);
- }
- });
- }
+ } else {
+ entry.setValue(column = new DelegateColumn(column) {
+ @Override
+ public boolean isViewReferenced() {
+ return isViewColumnReferenced.get(columnPosition);
+ }
+ });
+ }
- // if the base table column is referenced in the view
- // or if we are adding a new column during view creation
- if (isViewColumnReferenced.get(columnPosition) ||
- viewNewColumnPositions.contains(
- columnPosition)) {
- // acquire the mutex using the global physical table
- // name to prevent this column from being dropped
- // while the view is being created or to prevent
- // a conflicting column from being added to a parent
- // in case the view creation adds new columns
- boolean acquiredMutex = writeCell(
- null,
+ // if the base table column is referenced in the view
+ // or if we are adding a new column during view creation
+ if (isViewColumnReferenced.get(columnPosition) ||
+ viewNewColumnPositions.contains(
+ columnPosition)) {
+ // acquire the mutex using the global physical table
+ // name to prevent this column from being dropped
+ // while the view is being created or to prevent
+ // a conflicting column from being added to a parent
+ // in case the view creation adds new columns
+ boolean acquiredMutex = writeCell(
+ null,
+ parentPhysicalSchemaName,
+ parentPhysicalTableName,
+ column.toString());
+ if (!acquiredMutex) {
+ throw new ConcurrentTableMutationException(
parentPhysicalSchemaName,
- parentPhysicalTableName,
- column.toString());
- if (!acquiredMutex) {
- throw new ConcurrentTableMutationException(
- parentPhysicalSchemaName,
- parentPhysicalTableName);
- }
- acquiredColumnMutexSet.add(column.toString());
+ parentPhysicalTableName);
}
+ acquiredColumnMutexSet.add(column.toString());
}
- Short keySeq = SchemaUtil.isPKColumn(column) ? ++nextKeySeq : null;
- // Prior to PHOENIX-3534 we were sending the parent table column metadata while creating a
- // child view, now that we combine columns by resolving the parent table hierarchy we
- // don't need to include the parent table columns while creating a view
- // If QueryServices.ALLOW_SPLITTABLE_SYSTEM_CATALOG_ROLLBACK is true we continue
- // to store the parent table column metadata along with the child view metadata
- // so that we can rollback the upgrade if required.
- if (allowSystemCatalogRollback || !isRegularView
- || columnPosition >= baseTableColumnCount) {
- addColumnMutation(schemaName, tableName, column, colUpsert, parentTableName,
+ }
+ Short keySeq = SchemaUtil.isPKColumn(column) ? ++nextKeySeq : null;
+ // Prior to PHOENIX-3534 we were sending the parent table column metadata while creating a
+ // child view, now that we combine columns by resolving the parent table hierarchy we
+ // don't need to include the parent table columns while creating a view
+ // If QueryServices.ALLOW_SPLITTABLE_SYSTEM_CATALOG_ROLLBACK is true we continue
+ // to store the parent table column metadata along with the child view metadata
+ // so that we can rollback the upgrade if required.
+ if (allowSystemCatalogRollback || !isRegularView
+ || columnPosition >= baseTableColumnCount) {
+ addColumnMutation(connection, schemaName, tableName, column, parentTableName,
pkName, keySeq, saltBucketNum != null);
- columnMetadata.addAll(connection.getMutationState().toMutations(timestamp).next().getSecond());
- connection.rollback();
- }
+ columnMetadata.addAll(connection.getMutationState().toMutations(timestamp).next().getSecond());
+ connection.rollback();
}
}
+
// add the columns in reverse order since we reverse the list later
Collections.reverse(columnMetadata);
tableMetaData.addAll(columnMetadata);
@@ -3730,57 +3602,72 @@ public class MetaDataClient {
tableUpsert.close();
}
if (isImmutableRows != null) {
- mutateBooleanProperty(tenantId, schemaName, tableName, IMMUTABLE_ROWS, isImmutableRows);
+ mutateBooleanProperty(connection, tenantId, schemaName, tableName, IMMUTABLE_ROWS, isImmutableRows);
}
if (disableWAL != null) {
- mutateBooleanProperty(tenantId, schemaName, tableName, DISABLE_WAL, disableWAL);
+ mutateBooleanProperty(connection,tenantId, schemaName, tableName, DISABLE_WAL, disableWAL);
}
if (isMultiTenant != null) {
- mutateBooleanProperty(tenantId, schemaName, tableName, MULTI_TENANT, isMultiTenant);
+ mutateBooleanProperty(connection,tenantId, schemaName, tableName, MULTI_TENANT, isMultiTenant);
}
if (storeNulls != null) {
- mutateBooleanProperty(tenantId, schemaName, tableName, STORE_NULLS, storeNulls);
+ mutateBooleanProperty(connection,tenantId, schemaName, tableName, STORE_NULLS, storeNulls);
}
if (isTransactional != null) {
- mutateBooleanProperty(tenantId, schemaName, tableName, TRANSACTIONAL, isTransactional);
+ mutateBooleanProperty(connection,tenantId, schemaName, tableName, TRANSACTIONAL, isTransactional);
}
if (transactionProvider !=null) {
- mutateByteProperty(tenantId, schemaName, tableName, TRANSACTION_PROVIDER, transactionProvider.getCode());
+ mutateByteProperty(connection, tenantId, schemaName, tableName, TRANSACTION_PROVIDER, transactionProvider.getCode());
}
if (updateCacheFrequency != null) {
- mutateLongProperty(tenantId, schemaName, tableName, UPDATE_CACHE_FREQUENCY, updateCacheFrequency);
+ mutateLongProperty(connection,tenantId, schemaName, tableName, UPDATE_CACHE_FREQUENCY, updateCacheFrequency);
}
if (guidePostWidth == null || guidePostWidth >= 0) {
- mutateLongProperty(tenantId, schemaName, tableName, GUIDE_POSTS_WIDTH, guidePostWidth);
+ mutateLongProperty(connection, tenantId, schemaName, tableName, GUIDE_POSTS_WIDTH, guidePostWidth);
}
if (appendOnlySchema !=null) {
- mutateBooleanProperty(tenantId, schemaName, tableName, APPEND_ONLY_SCHEMA, appendOnlySchema);
+ mutateBooleanProperty(connection, tenantId, schemaName, tableName, APPEND_ONLY_SCHEMA, appendOnlySchema);
}
if (columnEncodedBytes !=null) {
- mutateByteProperty(tenantId, schemaName, tableName, ENCODING_SCHEME, columnEncodedBytes.getSerializedMetadataValue());
+ mutateByteProperty(connection, tenantId, schemaName, tableName, ENCODING_SCHEME, columnEncodedBytes.getSerializedMetadataValue());
}
if (immutableStorageScheme !=null) {
- mutateStringProperty(tenantId, schemaName, tableName, IMMUTABLE_STORAGE_SCHEME, immutableStorageScheme.name());
+ mutateStringProperty(connection, tenantId, schemaName, tableName, IMMUTABLE_STORAGE_SCHEME, immutableStorageScheme.name());
}
if (useStatsForParallelization != null) {
- mutateBooleanProperty(tenantId, schemaName, tableName, USE_STATS_FOR_PARALLELIZATION, useStatsForParallelization);
+ mutateBooleanProperty(connection, tenantId, schemaName, tableName, USE_STATS_FOR_PARALLELIZATION, useStatsForParallelization);
}
if (phoenixTTL != null) {
- mutateLongProperty(tenantId, schemaName, tableName, PHOENIX_TTL, phoenixTTL);
+ mutateLongProperty(connection, tenantId, schemaName, tableName, PHOENIX_TTL, phoenixTTL);
}
if (isChangeDetectionEnabled != null) {
- mutateBooleanProperty(tenantId, schemaName, tableName, CHANGE_DETECTION_ENABLED, isChangeDetectionEnabled);
+ mutateBooleanProperty(connection, tenantId, schemaName, tableName, CHANGE_DETECTION_ENABLED, isChangeDetectionEnabled);
}
if (!Strings.isNullOrEmpty(physicalTableName)) {
- mutateStringProperty(tenantId, schemaName, tableName, PHYSICAL_TABLE_NAME, physicalTableName);
+ mutateStringProperty(connection, tenantId, schemaName, tableName, PHYSICAL_TABLE_NAME, physicalTableName);
}
if (!Strings.isNullOrEmpty(schemaVersion)) {
- mutateStringProperty(tenantId, schemaName, tableName, SCHEMA_VERSION, schemaVersion);
+ mutateStringProperty(connection, tenantId, schemaName, tableName, SCHEMA_VERSION, schemaVersion);
}
return seqNum;
}
- private void mutateBooleanProperty(String tenantId, String schemaName, String tableName,
+ public static void mutateTransformProperties(Connection connection, String tenantId, String schemaName, String tableName,
+ String physicalTableName,
+ ImmutableStorageScheme immutableStorageScheme,
+ QualifierEncodingScheme columnEncodedBytes) throws SQLException {
+ if (columnEncodedBytes !=null) {
+ mutateByteProperty(connection, tenantId, schemaName, tableName, ENCODING_SCHEME, columnEncodedBytes.getSerializedMetadataValue());
+ }
+ if (immutableStorageScheme !=null) {
+ mutateByteProperty(connection, tenantId, schemaName, tableName, IMMUTABLE_STORAGE_SCHEME, immutableStorageScheme.getSerializedMetadataValue());
+ }
+ if (!Strings.isNullOrEmpty(physicalTableName)) {
+ mutateStringProperty(connection, tenantId, schemaName, tableName, PHYSICAL_TABLE_NAME, physicalTableName);
+ }
+ }
+
+ private static void mutateBooleanProperty(Connection connection, String tenantId, String schemaName, String tableName,
String propertyName, boolean propertyValue) throws SQLException {
String updatePropertySql = "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\"( " +
TENANT_ID + "," +
@@ -3797,7 +3684,7 @@ public class MetaDataClient {
}
}
- private void mutateLongProperty(String tenantId, String schemaName, String tableName,
+ private static void mutateLongProperty(Connection connection, String tenantId, String schemaName, String tableName,
String propertyName, Long propertyValue) throws SQLException {
String updatePropertySql = "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\"( " +
TENANT_ID + "," +
@@ -3818,7 +3705,7 @@ public class MetaDataClient {
}
}
- private void mutateByteProperty(String tenantId, String schemaName, String tableName,
+ private static void mutateByteProperty(Connection connection, String tenantId, String schemaName, String tableName,
String propertyName, Byte propertyValue) throws SQLException {
String updatePropertySql = "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\"( " +
TENANT_ID + "," +
@@ -3839,7 +3726,7 @@ public class MetaDataClient {
}
}
- private void mutateStringProperty(String tenantId, String schemaName, String tableName,
+ private static void mutateStringProperty(Connection connection, String tenantId, String schemaName, String tableName,
String propertyName, String propertyValue) throws SQLException {
String updatePropertySql = "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\"( " +
TENANT_ID + "," +
@@ -3996,123 +3883,117 @@ public class MetaDataClient {
Map<String, Integer> changedCqCounters = new HashMap<>(numCols);
if (numCols > 0 ) {
StatementContext context = new StatementContext(new PhoenixStatement(connection), resolver);
- String addColumnSqlToUse = connection.isRunningUpgrade()
- && tableName.equals(PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE)
- && schemaName.equals(PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA) ? ALTER_SYSCATALOG_TABLE_UPGRADE
- : INSERT_COLUMN_CREATE_TABLE;
- try (PreparedStatement colUpsert = connection.prepareStatement(addColumnSqlToUse)) {
- short nextKeySeq = SchemaUtil.getMaxKeySeq(table);
- for ( ColumnDef colDef : columnDefs) {
- if (colDef != null && !colDef.isNull()) {
- if (colDef.isPK()) {
- throw new SQLExceptionInfo.Builder(SQLExceptionCode.NOT_NULLABLE_COLUMN_IN_ROW_KEY)
- .setColumnName(colDef.getColumnDefName().getColumnName()).build().buildException();
- } else if (!willBeImmutableRows) {
- throw new SQLExceptionInfo.Builder(SQLExceptionCode.KEY_VALUE_NOT_NULL)
- .setColumnName(colDef.getColumnDefName().getColumnName()).build().buildException();
- }
- }
- if (colDef != null && colDef.isPK() && table.getType() == VIEW && table.getViewType() != MAPPED) {
- throwIfLastPKOfParentIsVariableLength(getParentOfView(table), schemaName, tableName, colDef);
- }
- if (colDef != null && colDef.isRowTimestamp()) {
- throw new SQLExceptionInfo.Builder(SQLExceptionCode.ROWTIMESTAMP_CREATE_ONLY)
+ short nextKeySeq = SchemaUtil.getMaxKeySeq(table);
+ for ( ColumnDef colDef : columnDefs) {
+ if (colDef != null && !colDef.isNull()) {
+ if (colDef.isPK()) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.NOT_NULLABLE_COLUMN_IN_ROW_KEY)
+ .setColumnName(colDef.getColumnDefName().getColumnName()).build().buildException();
+ } else if (!willBeImmutableRows) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.KEY_VALUE_NOT_NULL)
.setColumnName(colDef.getColumnDefName().getColumnName()).build().buildException();
}
- if (!colDef.validateDefault(context, null)) {
- colDef = new ColumnDef(colDef, null); // Remove DEFAULT as it's not necessary
- }
- String familyName = null;
- Integer encodedCQ = null;
- if (!colDef.isPK()) {
- String colDefFamily = colDef.getColumnDefName().getFamilyName();
- ImmutableStorageScheme storageScheme = table.getImmutableStorageScheme();
- String defaultColumnFamily = tableForCQCounters.getDefaultFamilyName() != null && !Strings.isNullOrEmpty(tableForCQCounters.getDefaultFamilyName().getString()) ?
- tableForCQCounters.getDefaultFamilyName().getString() : DEFAULT_COLUMN_FAMILY;
- if (table.getType() == PTableType.INDEX && table.getIndexType() == IndexType.LOCAL) {
- defaultColumnFamily = QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX + defaultColumnFamily;
- }
- if (storageScheme == SINGLE_CELL_ARRAY_WITH_OFFSETS) {
- familyName = colDefFamily != null ? colDefFamily : defaultColumnFamily;
- } else {
- familyName = defaultColumnFamily;
- }
- encodedCQ = table.isAppendOnlySchema() ? Integer.valueOf(ENCODED_CQ_COUNTER_INITIAL_VALUE + position) : cqCounterToUse.getNextQualifier(familyName);
- if (!table.isAppendOnlySchema() && cqCounterToUse.increment(familyName)) {
- changedCqCounters.put(familyName,
- cqCounterToUse.getNextQualifier(familyName));
+ }
+ if (colDef != null && colDef.isPK() && table.getType() == VIEW && table.getViewType() != MAPPED) {
+ throwIfLastPKOfParentIsVariableLength(getParentOfView(table), schemaName, tableName, colDef);
+ }
+ if (colDef != null && colDef.isRowTimestamp()) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.ROWTIMESTAMP_CREATE_ONLY)
+ .setColumnName(colDef.getColumnDefName().getColumnName()).build().buildException();
+ }
+ if (!colDef.validateDefault(context, null)) {
+ colDef = new ColumnDef(colDef, null); // Remove DEFAULT as it's not necessary
+ }
+ String familyName = null;
+ Integer encodedCQ = null;
+ if (!colDef.isPK()) {
+ String colDefFamily = colDef.getColumnDefName().getFamilyName();
+ ImmutableStorageScheme storageScheme = table.getImmutableStorageScheme();
+ String defaultColumnFamily = tableForCQCounters.getDefaultFamilyName() != null && !Strings.isNullOrEmpty(tableForCQCounters.getDefaultFamilyName().getString()) ?
+ tableForCQCounters.getDefaultFamilyName().getString() : DEFAULT_COLUMN_FAMILY;
+ if (table.getType() == PTableType.INDEX && table.getIndexType() == IndexType.LOCAL) {
+ defaultColumnFamily = QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX + defaultColumnFamily;
}
+ if (storageScheme == SINGLE_CELL_ARRAY_WITH_OFFSETS) {
+ familyName = colDefFamily != null ? colDefFamily : defaultColumnFamily;
+ } else {
+ familyName = defaultColumnFamily;
}
- byte[] columnQualifierBytes = null;
- try {
- columnQualifierBytes = EncodedColumnsUtil.getColumnQualifierBytes(colDef.getColumnDefName().getColumnName(), encodedCQ, table, colDef.isPK());
- }
- catch (QualifierOutOfRangeException e) {
- throw new SQLExceptionInfo.Builder(SQLExceptionCode.MAX_COLUMNS_EXCEEDED)
- .setSchemaName(schemaName)
- .setTableName(tableName).build().buildException();
- }
- PColumn column = newColumn(position++, colDef, PrimaryKeyConstraint.EMPTY, table.getDefaultFamilyName() == null ? null : table.getDefaultFamilyName().getString(), true, columnQualifierBytes, willBeImmutableRows);
- HashMap<PTable, PColumn> indexToIndexColumnMap = null;
- if (cascade) {
- indexToIndexColumnMap = getPTablePColumnHashMapForCascade(indexesPTable, willBeImmutableRows,
- colDef, familyName, indexToColumnSizeMap);
+ encodedCQ = table.isAppendOnlySchema() ? Integer.valueOf(ENCODED_CQ_COUNTER_INITIAL_VALUE + position) : cqCounterToUse.getNextQualifier(familyName);
+ if (!table.isAppendOnlySchema() && cqCounterToUse.increment(familyName)) {
+ changedCqCounters.put(familyName,
+ cqCounterToUse.getNextQualifier(familyName));
}
+ }
+ byte[] columnQualifierBytes = null;
+ try {
+ columnQualifierBytes = EncodedColumnsUtil.getColumnQualifierBytes(colDef.getColumnDefName().getColumnName(), encodedCQ, table, colDef.isPK());
+ }
+ catch (QualifierOutOfRangeException e) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.MAX_COLUMNS_EXCEEDED)
+ .setSchemaName(schemaName)
+ .setTableName(tableName).build().buildException();
+ }
+ PColumn column = newColumn(position++, colDef, PrimaryKeyConstraint.EMPTY, table.getDefaultFamilyName() == null ? null : table.getDefaultFamilyName().getString(), true, columnQualifierBytes, willBeImmutableRows);
+ HashMap<PTable, PColumn> indexToIndexColumnMap = null;
+ if (cascade) {
+ indexToIndexColumnMap = getPTablePColumnHashMapForCascade(indexesPTable, willBeImmutableRows,
+ colDef, familyName, indexToColumnSizeMap);
+ }
- columns.add(column);
- String pkName = null;
- Short keySeq = null;
-
- // TODO: support setting properties on other families?
- if (column.getFamilyName() == null) {
- ++numPkColumnsAdded;
- pkName = table.getPKName() == null ? null : table.getPKName().getString();
- keySeq = ++nextKeySeq;
- } else {
- families.add(column.getFamilyName().getString());
+ columns.add(column);
+ String pkName = null;
+ Short keySeq = null;
+
+ // TODO: support setting properties on other families?
+ if (column.getFamilyName() == null) {
+ ++numPkColumnsAdded;
+ pkName = table.getPKName() == null ? null : table.getPKName().getString();
+ keySeq = ++nextKeySeq;
+ } else {
+ families.add(column.getFamilyName().getString());
+ }
+ colFamiliesForPColumnsToBeAdded.add(column.getFamilyName() == null ? null : column.getFamilyName().getString());
+ addColumnMutation(connection, schemaName, tableName, column, null, pkName, keySeq, table.getBucketNum() != null);
+ // add new columns for given indexes one by one
+ if (cascade) {
+ for (PTable index: indexesPTable) {
+ LOGGER.info("Adding column "+column.getName().getString()+" to "+index.getTableName().toString());
+ addColumnMutation(connection, schemaName, index.getTableName().getString(), indexToIndexColumnMap.get(index), null, "", keySeq, index.getBucketNum() != null);
}
- colFamiliesForPColumnsToBeAdded.add(column.getFamilyName() == null ? null : column.getFamilyName().getString());
- addColumnMutation(schemaName, tableName, column, colUpsert, null, pkName, keySeq, table.getBucketNum() != null);
- // add new columns for given indexes one by one
- if (cascade) {
- for (PTable index: indexesPTable) {
- LOGGER.info("Adding column "+column.getName().getString()+" to "+index.getTableName().toString());
- addColumnMutation(schemaName, index.getTableName().getString(), indexToIndexColumnMap.get(index), colUpsert, null, "", keySeq, index.getBucketNum() != null);
- }
+ }
+ }
+
+ // Add any new PK columns to end of index PK
+ if (numPkColumnsAdded > 0) {
+ // create PK column list that includes the newly created columns
+ List<PColumn> pkColumns = Lists.newArrayListWithExpectedSize(table.getPKColumns().size()+numPkColumnsAdded);
+ pkColumns.addAll(table.getPKColumns());
+ for (int i=0; i<numCols; ++i) {
+ if (columnDefs.get(i).isPK()) {
+ pkColumns.add(columns.get(i));
}
}
-
- // Add any new PK columns to end of index PK
- if (numPkColumnsAdded > 0) {
- // create PK column list that includes the newly created columns
- List<PColumn> pkColumns = Lists.newArrayListWithExpectedSize(table.getPKColumns().size()+numPkColumnsAdded);
- pkColumns.addAll(table.getPKColumns());
+ int pkSlotPosition = table.getPKColumns().size()-1;
+ for (PTable index : table.getIndexes()) {
+ short nextIndexKeySeq = SchemaUtil.getMaxKeySeq(index);
+ int indexPosition = index.getColumns().size();
for (int i=0; i<numCols; ++i) {
- if (columnDefs.get(i).isPK()) {
- pkColumns.add(columns.get(i));
- }
- }
- int pkSlotPosition = table.getPKColumns().size()-1;
- for (PTable index : table.getIndexes()) {
- short nextIndexKeySeq = SchemaUtil.getMaxKeySeq(index);
- int indexPosition = index.getColumns().size();
- for (int i=0; i<numCols; ++i) {
- ColumnDef colDef = columnDefs.get(i);
- if (colDef.isPK()) {
- PDataType indexColDataType = IndexUtil.getIndexColumnDataType(colDef.isNull(), colDef.getDataType());
- ColumnName indexColName = ColumnName.caseSensitiveColumnName(IndexUtil.getIndexColumnName(null, colDef.getColumnDefName().getColumnName()));
- Expression expression = new RowKeyColumnExpression(columns.get(i), new RowKeyValueAccessor(pkColumns, pkSlotPosition));
- ColumnDef indexColDef = FACTORY.columnDef(indexColName, indexColDataType.getSqlTypeName(), colDef.isNull(), colDef.getMaxLength(), colDef.getScale(), true, colDef.getSortOrder(), expression.toString(), colDef.isRowTimestamp());
- PColumn indexColumn = newColumn(indexPosition++, indexColDef, PrimaryKeyConstraint.EMPTY, null, true, null, willBeImmutableRows);
- addColumnMutation(schemaName, index.getTableName().getString(), indexColumn, colUpsert, index.getParentTableName().getString(), index.getPKName() == null ? null : index.getPKName().getString(), ++nextIndexKeySeq, index.getBucketNum() != null);
- }
+ ColumnDef colDef = columnDefs.get(i);
+ if (colDef.isPK()) {
+ PDataType indexColDataType = IndexUtil.getIndexColumnDataType(colDef.isNull(), colDef.getDataType());
+ ColumnName indexColName = ColumnName.caseSensitiveColumnName(IndexUtil.getIndexColumnName(null, colDef.getColumnDefName().getColumnName()));
+ Expression expression = new RowKeyColumnExpression(columns.get(i), new RowKeyValueAccessor(pkColumns, pkSlotPosition));
+ ColumnDef indexColDef = FACTORY.columnDef(indexColName, indexColDataType.getSqlTypeName(), colDef.isNull(), colDef.getMaxLength(), colDef.getScale(), true, colDef.getSortOrder(), expression.toString(), colDef.isRowTimestamp());
+ PColumn indexColumn = newColumn(indexPosition++, indexColDef, PrimaryKeyConstraint.EMPTY, null, true, null, willBeImmutableRows);
+ addColumnMutation(connection, schemaName, index.getTableName().getString(), indexColumn, index.getParentTableName().getString(), index.getPKName() == null ? null : index.getPKName().getString(), ++nextIndexKeySeq, index.getBucketNum() != null);
}
}
- ++pkSlotPosition;
}
- columnMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond());
- connection.rollback();
+ ++pkSlotPosition;
}
+ columnMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond());
+ connection.rollback();
} else {
// Check that HBase configured properly for mutable secondary indexing
// if we're changing from an immutable table to a mutable table and we
@@ -5553,10 +5434,9 @@ public class MetaDataClient {
}
}
- if (!Strings.isNullOrEmpty(metaProperties.getPhysicalTableName())) {
- if (!metaProperties.getPhysicalTableName().
- equals(table.getPhysicalName(true).getString())) {
- metaPropertiesEvaluated.setPhysicalTableName(metaProperties.getPhysicalTableName());
+ if (!Strings.isNullOrEmpty(metaProperties.getPhysicalTableNameProp())) {
+ if (!metaProperties.getPhysicalTableNameProp().equals(table.getPhysicalName(true))) {
+ metaPropertiesEvaluated.setPhysicalTableName(metaProperties.getPhysicalTableNameProp());
changingPhoenixTableProperty = true;
}
}
@@ -5639,6 +5519,10 @@ public class MetaDataClient {
this.isTransactionalProp = isTransactionalProp;
}
+ public String getPhysicalTableNameProp() {
+ return this.physicalTableNameProp;
+ }
+
public void setPhysicalTableNameProp(String physicalTableNameProp) {
this.physicalTableNameProp = physicalTableNameProp;
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index ea272ce..0eb5d26 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.transform.TransformMaintainer;
import org.apache.phoenix.schema.types.PArrayDataType;
import org.apache.phoenix.schema.types.PArrayDataTypeDecoder;
import org.apache.phoenix.schema.types.PArrayDataTypeEncoder;
@@ -248,7 +249,8 @@ public interface PTable extends PMetaDataEntity {
}
public enum TransformType {
- METADATA_TRANSFORM((byte)1);
+ METADATA_TRANSFORM((byte)1),
+ METADATA_TRANSFORM_PARTIAL((byte)2);
private final byte[] byteValue;
private final int serializedValue;
@@ -297,6 +299,11 @@ public interface PTable extends PMetaDataEntity {
return "FAILED";
}
},
+ PAUSED {
+ public String toString() {
+ return "PAUSED";
+ }
+ },
}
public enum ImmutableStorageScheme implements ColumnValueEncoderDecoderSupplier {
@@ -818,6 +825,7 @@ public interface PTable extends PMetaDataEntity {
boolean getIndexMaintainers(ImmutableBytesWritable ptr, PhoenixConnection connection);
IndexMaintainer getIndexMaintainer(PTable dataTable, PhoenixConnection connection);
+ TransformMaintainer getTransformMaintainer(PTable oldTable, PhoenixConnection connection);
PName getDefaultFamilyName();
boolean isWALDisabled();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index 70efc26..97605d7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -97,6 +97,7 @@ import org.apache.phoenix.parse.SQLParser;
import org.apache.phoenix.protobuf.ProtobufUtil;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.RowKeySchema.RowKeySchemaBuilder;
+import org.apache.phoenix.schema.transform.TransformMaintainer;
import org.apache.phoenix.schema.types.PBinary;
import org.apache.phoenix.schema.types.PChar;
import org.apache.phoenix.schema.types.PDataType;
@@ -138,6 +139,7 @@ public class PTableImpl implements PTable {
private static final int VIEW_MODIFIED_PHOENIX_TTL_BIT_SET_POS = 2;
private IndexMaintainer indexMaintainer;
+ private TransformMaintainer transformMaintainer;
private ImmutableBytesWritable indexMaintainersPtr;
private final PTableKey key;
@@ -1657,6 +1659,14 @@ public class PTableImpl implements PTable {
}
@Override
+ public synchronized TransformMaintainer getTransformMaintainer(PTable oldTable, PhoenixConnection connection) {
+ if (transformMaintainer == null) {
+ transformMaintainer = TransformMaintainer.create(oldTable, this, connection);
+ }
+ return transformMaintainer;
+ }
+
+ @Override
public synchronized IndexMaintainer getIndexMaintainer(PTable dataTable, PhoenixConnection connection) {
if (indexMaintainer == null) {
indexMaintainer = IndexMaintainer.create(dataTable, this, connection);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tool/SchemaExtractionProcessor.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tool/SchemaExtractionProcessor.java
index 656bd31..8e400b1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/tool/SchemaExtractionProcessor.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tool/SchemaExtractionProcessor.java
@@ -47,6 +47,7 @@ import java.util.List;
import java.util.ArrayList;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.UPDATE_CACHE_FREQUENCY;
import static org.apache.phoenix.util.MetaDataUtil.SYNCED_DATA_TABLE_AND_INDEX_COL_FAM_PROPERTIES;
public class SchemaExtractionProcessor implements SchemaProcessor {
@@ -117,7 +118,8 @@ public class SchemaExtractionProcessor implements SchemaProcessor {
HTableDescriptor htd = getTableDescriptor(cqsi, table);
setHTableProperties(htd);
}
- String propertiesString = convertPropertiesToString();
+
+ String propertiesString = convertPropertiesToString(true);
return generateIndexDDLString(quotedBaseTableFullName, indexedColumnsString, coveredColumnsString,
indexPTable.getIndexType().equals(PTable.IndexType.LOCAL), quotedIndexTableName, propertiesString);
}
@@ -275,7 +277,7 @@ public class SchemaExtractionProcessor implements SchemaProcessor {
setHColumnFamilyProperties(hcds);
String columnInfoString = getColumnInfoStringForTable(table);
- String propertiesString = convertPropertiesToString();
+ String propertiesString = convertPropertiesToString(false);
return generateTableDDLString(columnInfoString, propertiesString, pSchemaName, pTableName);
}
@@ -367,7 +369,7 @@ public class SchemaExtractionProcessor implements SchemaProcessor {
}
}
- private String convertPropertiesToString() {
+ private String convertPropertiesToString(boolean forIndex) {
StringBuilder optionBuilder = new StringBuilder();
for(Map.Entry<String, String> entry : definedProps.entrySet()) {
String key = entry.getKey();
@@ -381,6 +383,12 @@ public class SchemaExtractionProcessor implements SchemaProcessor {
}
if(value!=null && (shouldGenerateWithDefaults || (defaultProps.get(key) != null && !value.equals(defaultProps.get(key))))) {
+ if (forIndex) {
+ // cannot set these for index
+ if (key.equals(UPDATE_CACHE_FREQUENCY)) {
+ continue;
+ }
+ }
if (optionBuilder.length() != 0) {
optionBuilder.append(", ");
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/SystemTransformRecord.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/SystemTransformRecord.java
index c2cbf19..dd678ab 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/SystemTransformRecord.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/SystemTransformRecord.java
@@ -152,6 +152,27 @@ public class SystemTransformRecord {
private String newMetadata;
private String transformFunction;
+ public SystemTransformBuilder() {
+
+ }
+
+ public SystemTransformBuilder(SystemTransformRecord systemTransformRecord) {
+ this.setTransformType(systemTransformRecord.getTransformType());
+ this.setTenantId(systemTransformRecord.getTenantId());
+ this.setSchemaName(systemTransformRecord.getSchemaName());
+ this.setLogicalTableName(systemTransformRecord.getLogicalTableName());
+ this.setNewPhysicalTableName(systemTransformRecord.getNewPhysicalTableName());
+ this.setLogicalParentName(systemTransformRecord.getLogicalParentName());
+ this.setTransformStatus(systemTransformRecord.getTransformStatus());
+ this.setTransformJobId(systemTransformRecord.getTransformJobId());
+ this.setTransformRetryCount(systemTransformRecord.getTransformRetryCount());
+ this.setStartTs(systemTransformRecord.getTransformStartTs());
+ this.setEndTs(systemTransformRecord.getTransformEndTs());
+ this.setOldMetadata(systemTransformRecord.getOldMetadata());
+ this.setNewMetadata(systemTransformRecord.getNewMetadata());
+ this.setTransformFunction(systemTransformRecord.getTransformFunction());
+ }
+
public SystemTransformBuilder setTransformType(PTable.TransformType transformType) {
this.transformType = transformType;
return this;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java
index d3f016e..56f0545 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java
@@ -19,22 +19,39 @@ package org.apache.phoenix.schema.transform;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
+import org.apache.hadoop.conf.Configuration;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
+import org.apache.phoenix.mapreduce.transform.TransformTool;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.schema.MetaDataClient;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PColumnImpl;
+import org.apache.phoenix.schema.PName;
+import org.apache.phoenix.schema.PNameFactory;
import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableImpl;
+import org.apache.phoenix.schema.tool.SchemaExtractionProcessor;
+import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.JacksonUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.SchemaUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Timestamp;
import java.sql.Types;
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID;
+import static org.apache.phoenix.schema.ColumnMetaDataOps.addColumnMutation;
import static org.apache.phoenix.schema.PTableType.INDEX;
public class Transform {
@@ -46,9 +63,8 @@ public class Transform {
return newName;
}
- public static void addTransform(PhoenixConnection connection, String tenantId, PTable table,
- MetaDataClient.MetaProperties changingProperties, long sequenceNum,
- PTable.TransformType transformType) throws SQLException {
+ public static void addTransform(PhoenixConnection connection, String tenantId, PTable table, MetaDataClient.MetaProperties changingProperties,
+ long sequenceNum, PTable.TransformType transformType) throws SQLException {
try {
String newMetadata = JacksonUtil.getObjectWriter().writeValueAsString(changingProperties);
String oldMetadata = "";
@@ -71,14 +87,148 @@ public class Transform {
newPhysicalTableName = generateNewTableName(schema, logicalTableName, sequenceNum);
}
transformBuilder.setNewPhysicalTableName(newPhysicalTableName);
- Transform.addTransform(transformBuilder.build(), connection);
+ Transform.addTransform(table, changingProperties, transformBuilder.build(), connection);
} catch (JsonProcessingException ex) {
LOGGER.error("addTransform failed", ex);
throw new SQLException("Adding transform failed with JsonProcessingException");
+ } catch (SQLException ex) {
+ throw ex;
+ } catch(Exception ex) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.valueOf("CANNOT_MUTATE_TABLE"))
+ .setSchemaName((table.getSchemaName() == null? null: table.getSchemaName().getString()))
+ .setRootCause(ex)
+ .setTableName(table.getName().getString()).build().buildException();
}
}
- public static void addTransform(
+ protected static void addTransform(
+ PTable table, MetaDataClient.MetaProperties changedProps, SystemTransformRecord systemTransformParams, PhoenixConnection connection) throws Exception {
+ PName newTableName = PNameFactory.newName(systemTransformParams.getNewPhysicalTableName());
+ PName newTableNameWithoutSchema = PNameFactory.newName(SchemaUtil.getTableNameFromFullName(systemTransformParams.getNewPhysicalTableName()));
+ PTable newTable = new PTableImpl.Builder()
+ .setTableName(newTableNameWithoutSchema)
+ .setParentTableName(table.getParentTableName())
+ .setBaseTableLogicalName(table.getBaseTableLogicalName())
+ .setPhysicalTableName(newTableNameWithoutSchema)
+ .setAllColumns(table.getColumns())
+ .setAppendOnlySchema(table.isAppendOnlySchema())
+ .setAutoPartitionSeqName(table.getAutoPartitionSeqName())
+ .setBaseColumnCount(table.getBaseColumnCount())
+ .setBucketNum(table.getBucketNum())
+ .setDefaultFamilyName(table.getDefaultFamilyName())
+ .setDisableWAL(table.isWALDisabled())
+ .setEstimatedSize(table.getEstimatedSize())
+ .setFamilies(table.getColumnFamilies())
+ .setImmutableRows(table.isImmutableRows())
+ .setIsChangeDetectionEnabled(table.isChangeDetectionEnabled())
+ .setIndexType(table.getIndexType())
+ .setName(newTableName)
+ .setMultiTenant(table.isMultiTenant())
+ .setParentName(table.getParentName())
+ .setParentSchemaName(table.getParentSchemaName())
+ .setPhoenixTTL(table.getPhoenixTTL())
+ .setNamespaceMapped(table.isNamespaceMapped())
+ .setSchemaName(table.getSchemaName())
+ .setPkColumns(table.getPKColumns())
+ .setPkName(table.getPKName())
+ .setPhoenixTTLHighWaterMark(table.getPhoenixTTLHighWaterMark())
+ .setRowKeySchema(table.getRowKeySchema())
+ .setStoreNulls(table.getStoreNulls())
+ .setTenantId(table.getTenantId())
+ .setType(table.getType())
+ // SchemaExtractor uses physical name to get the table descriptor from. So we use the existing table here
+ .setPhysicalNames(ImmutableList.copyOf(table.getPhysicalNames()))
+ .setUpdateCacheFrequency(table.getUpdateCacheFrequency())
+ .setTransactionProvider(table.getTransactionProvider())
+ .setUseStatsForParallelization(table.useStatsForParallelization())
+ .setSchemaVersion(table.getSchemaVersion())
+ .setIsChangeDetectionEnabled(table.isChangeDetectionEnabled())
+ // Transformables
+ .setImmutableStorageScheme(
+ (changedProps.getImmutableStorageSchemeProp() != null? changedProps.getImmutableStorageSchemeProp():table.getImmutableStorageScheme()))
+ .setQualifierEncodingScheme(
+ (changedProps.getColumnEncodedBytesProp() != null? changedProps.getColumnEncodedBytesProp() : table.getEncodingScheme()))
+ .build();
+ SchemaExtractionProcessor schemaExtractionProcessor = new SchemaExtractionProcessor(systemTransformParams.getTenantId(),
+ connection.getQueryServices().getConfiguration(), newTable, true);
+ String ddl = schemaExtractionProcessor.process();
+ LOGGER.info("Creating transforming table via " + ddl);
+ connection.createStatement().execute(ddl);
+ upsertTransform(systemTransformParams, connection);
+ }
+
+ public static SystemTransformRecord getTransformRecord(
+ String schema, String logicalTableName, String logicalParentName, String tenantId, PhoenixConnection connection) throws SQLException {
+ try (ResultSet resultSet = connection.prepareStatement("SELECT " +
+ PhoenixDatabaseMetaData.TENANT_ID + ", " +
+ PhoenixDatabaseMetaData.TABLE_SCHEM + ", " +
+ PhoenixDatabaseMetaData.LOGICAL_TABLE_NAME + ", " +
+ PhoenixDatabaseMetaData.NEW_PHYS_TABLE_NAME + ", " +
+ PhoenixDatabaseMetaData.TRANSFORM_TYPE + ", " +
+ PhoenixDatabaseMetaData.LOGICAL_PARENT_NAME + ", " +
+ PhoenixDatabaseMetaData.TRANSFORM_STATUS + ", " +
+ PhoenixDatabaseMetaData.TRANSFORM_JOB_ID + ", " +
+ PhoenixDatabaseMetaData.TRANSFORM_RETRY_COUNT + ", " +
+ PhoenixDatabaseMetaData.TRANSFORM_START_TS + ", " +
+ PhoenixDatabaseMetaData.TRANSFORM_END_TS + ", " +
+ PhoenixDatabaseMetaData.OLD_METADATA + " , " +
+ PhoenixDatabaseMetaData.NEW_METADATA + " , " +
+ PhoenixDatabaseMetaData.TRANSFORM_FUNCTION +
+ " FROM " + PhoenixDatabaseMetaData.SYSTEM_TRANSFORM_NAME + " WHERE " +
+ (Strings.isNullOrEmpty(tenantId) ? "" : (PhoenixDatabaseMetaData.TENANT_ID + " ='" + tenantId + "' AND ")) +
+ (Strings.isNullOrEmpty(schema) ? "" : (PhoenixDatabaseMetaData.TABLE_SCHEM + " ='" + schema + "' AND ")) +
+ PhoenixDatabaseMetaData.LOGICAL_TABLE_NAME + " ='" + logicalTableName + "'" +
+ (Strings.isNullOrEmpty(logicalParentName) ? "": (" AND " + PhoenixDatabaseMetaData.LOGICAL_PARENT_NAME + "='" + logicalParentName + "'" ))
+ ).executeQuery()) {
+ if (resultSet.next()) {
+ return SystemTransformRecord.SystemTransformBuilder.build(resultSet);
+ }
+ return null;
+ }
+ }
+
+ public static boolean checkIsTransformNeeded(MetaDataClient.MetaProperties metaProperties, String schemaName,
+ PTable table, String logicalTableName, String parentTableName,
+ String tenantId, PhoenixConnection connection) throws SQLException {
+ boolean isTransformNeeded = isTransformNeeded(metaProperties, table);
+ if (isTransformNeeded) {
+ SystemTransformRecord existingTransform = Transform.getTransformRecord(schemaName, logicalTableName, parentTableName, tenantId,connection);
+ if (existingTransform != null && existingTransform.isActive()) {
+ throw new SQLExceptionInfo.Builder(
+ SQLExceptionCode.CANNOT_TRANSFORM_ALREADY_TRANSFORMING_TABLE)
+ .setMessage(" Only one transform at a time is allowed ")
+ .setSchemaName(schemaName).setTableName(logicalTableName).build().buildException();
+ }
+ }
+ return isTransformNeeded;
+ }
+
+ private static boolean isTransformNeeded(MetaDataClient.MetaProperties metaProperties, PTable table){
+ if (metaProperties.getImmutableStorageSchemeProp()!=null
+ && metaProperties.getImmutableStorageSchemeProp() != table.getImmutableStorageScheme()) {
+ // Transform is needed
+ return true;
+ }
+ if (metaProperties.getColumnEncodedBytesProp()!=null
+ && metaProperties.getColumnEncodedBytesProp() != table.getEncodingScheme()) {
+ return true;
+ }
+ return false;
+ }
+
+ public static void removeTransformRecord(
+ SystemTransformRecord transformRecord, PhoenixConnection connection) throws SQLException {
+ connection.prepareStatement("DELETE FROM "
+ + PhoenixDatabaseMetaData.SYSTEM_TRANSFORM_NAME + " WHERE " +
+ (Strings.isNullOrEmpty(transformRecord.getSchemaName()) ? "" :
+ (PhoenixDatabaseMetaData.TABLE_SCHEM + " ='" + transformRecord.getSchemaName() + "' AND ")) +
+ PhoenixDatabaseMetaData.LOGICAL_TABLE_NAME + " ='" + transformRecord.getLogicalTableName() + "' AND " +
+ PhoenixDatabaseMetaData.NEW_PHYS_TABLE_NAME + " ='" + transformRecord.getNewPhysicalTableName() + "' AND " +
+ PhoenixDatabaseMetaData.TRANSFORM_TYPE + " =" + transformRecord.getTransformType().getSerializedValue()
+ ).execute();
+ }
+
+ public static void upsertTransform(
SystemTransformRecord systemTransformParams, PhoenixConnection connection) throws SQLException {
try (PreparedStatement stmt = connection.prepareStatement("UPSERT INTO " +
PhoenixDatabaseMetaData.SYSTEM_TRANSFORM_NAME + " ( " +
@@ -157,64 +307,70 @@ public class Transform {
}
- public static SystemTransformRecord getTransformRecord(
- String schema, String logicalTableName, String logicalParentName, String tenantId, PhoenixConnection connection) throws SQLException {
- try (ResultSet resultSet = connection.prepareStatement("SELECT " +
- PhoenixDatabaseMetaData.TENANT_ID + ", " +
- PhoenixDatabaseMetaData.TABLE_SCHEM + ", " +
- PhoenixDatabaseMetaData.LOGICAL_TABLE_NAME + ", " +
- PhoenixDatabaseMetaData.NEW_PHYS_TABLE_NAME + ", " +
- PhoenixDatabaseMetaData.TRANSFORM_TYPE + ", " +
- PhoenixDatabaseMetaData.LOGICAL_PARENT_NAME + ", " +
- PhoenixDatabaseMetaData.TRANSFORM_STATUS + ", " +
- PhoenixDatabaseMetaData.TRANSFORM_JOB_ID + ", " +
- PhoenixDatabaseMetaData.TRANSFORM_RETRY_COUNT + ", " +
- PhoenixDatabaseMetaData.TRANSFORM_START_TS + ", " +
- PhoenixDatabaseMetaData.TRANSFORM_END_TS + ", " +
- PhoenixDatabaseMetaData.OLD_METADATA + " , " +
- PhoenixDatabaseMetaData.NEW_METADATA + " , " +
- PhoenixDatabaseMetaData.TRANSFORM_FUNCTION +
- " FROM " + PhoenixDatabaseMetaData.SYSTEM_TRANSFORM_NAME + " WHERE " +
- (Strings.isNullOrEmpty(tenantId) ? "" : (PhoenixDatabaseMetaData.TENANT_ID + " ='" + tenantId + "' AND ")) +
- (Strings.isNullOrEmpty(schema) ? "" : (PhoenixDatabaseMetaData.TABLE_SCHEM + " ='" + schema + "' AND ")) +
- PhoenixDatabaseMetaData.LOGICAL_TABLE_NAME + " ='" + logicalTableName + "'" +
- (Strings.isNullOrEmpty(logicalParentName) ? "": (" AND " + PhoenixDatabaseMetaData.LOGICAL_PARENT_NAME + "='" + logicalParentName + "'" ))
- ).executeQuery()) {
- if (resultSet.next()) {
- return SystemTransformRecord.SystemTransformBuilder.build(resultSet);
+ public static void completeTransform(Connection connection, Configuration configuration) throws Exception{
+ // Will be called from Reducer
+ long timestamp= EnvironmentEdgeManager.currentTimeMillis();
+ String tenantId = configuration.get(MAPREDUCE_TENANT_ID, null);
+ String fullOldTableName = PhoenixConfigurationUtil.getInputTableName(configuration);
+ String schemaName = SchemaUtil.getSchemaNameFromFullName(fullOldTableName);
+ String oldTableLogicalName = SchemaUtil.getTableNameFromFullName(fullOldTableName);
+ String indexTableName = SchemaUtil.getTableNameFromFullName(PhoenixConfigurationUtil.getIndexToolIndexTableName(configuration));
+ String logicaTableName = oldTableLogicalName;
+ String logicalParentName = null;
+ if (PhoenixConfigurationUtil.getTransformingTableType(configuration) == IndexScrutinyTool.SourceTable.INDEX_TABLE_SOURCE)
+ if (!Strings.isNullOrEmpty(indexTableName)) {
+ logicaTableName = indexTableName;
+ logicalParentName = SchemaUtil.getTableName(schemaName, oldTableLogicalName);
}
- return null;
- }
- }
-
- public static boolean checkIsTransformNeeded(MetaDataClient.MetaProperties metaProperties, String schemaName,
- PTable table, String logicalTableName, String parentTableName,
- String tenantId, PhoenixConnection connection) throws SQLException {
- boolean isTransformNeeded = isTransformNeeded(metaProperties, table);
- if (isTransformNeeded) {
- SystemTransformRecord existingTransform = Transform.getTransformRecord(schemaName, logicalTableName, parentTableName, tenantId,connection);
- if (existingTransform != null && existingTransform.isActive()) {
- throw new SQLExceptionInfo.Builder(
- SQLExceptionCode.CANNOT_TRANSFORM_ALREADY_TRANSFORMING_TABLE)
- .setMessage(" Only one transform at a time is allowed ")
- .setSchemaName(schemaName).setTableName(logicalTableName).build().buildException();
+ boolean isPartial = PhoenixConfigurationUtil.getIsPartialTransform(configuration);
+ SystemTransformRecord transformRecord = getTransformRecord(schemaName, logicaTableName, logicalParentName,
+ tenantId, connection.unwrap(PhoenixConnection.class));
+ if (!isPartial) {
+ String newTableName = SchemaUtil.getTableNameFromFullName(transformRecord.getNewPhysicalTableName());
+ PTable pNewTable = PhoenixRuntime.getTable(connection, transformRecord.getNewPhysicalTableName());
+ PTable pOldTable = PhoenixRuntime.getTable(connection, SchemaUtil.getTableName(schemaName,logicaTableName));
+ if (pOldTable.getImmutableStorageScheme() != pNewTable.getImmutableStorageScheme() ||
+ pOldTable.getEncodingScheme() != pNewTable.getEncodingScheme()) {
+ MetaDataClient.mutateTransformProperties(connection, tenantId, schemaName, logicaTableName, newTableName,
+ pNewTable.getImmutableStorageScheme(), pNewTable.getEncodingScheme());
+ // We need to update the columns's qualifiers as well
+ if (pOldTable.getEncodingScheme() != pNewTable.getEncodingScheme()) {
+ Short nextKeySeq = 0;
+ for (PColumn newCol : pNewTable.getColumns()) {
+ boolean isPk = SchemaUtil.isPKColumn(newCol);
+ Short keySeq = isPk ? ++nextKeySeq : null;
+ PColumn column = new PColumnImpl(newCol.getName(), newCol.getFamilyName(), newCol.getDataType(),
+ newCol.getMaxLength(), newCol.getScale(), newCol.isNullable(), newCol.getPosition(), newCol.getSortOrder()
+ , newCol.getArraySize(),
+ newCol.getViewConstant(), newCol.isViewReferenced(), newCol.getExpressionStr(), newCol.isRowTimestamp(),
+ newCol.isDynamic(), newCol.getColumnQualifierBytes(), EnvironmentEdgeManager.currentTimeMillis());
+ addColumnMutation(connection.unwrap(PhoenixConnection.class), schemaName, logicaTableName, column,
+ pNewTable.getParentTableName()==null? null:pNewTable.getParentTableName().getString()
+ , pNewTable.getPKName()==null? null:pNewTable.getPKName().getString(), keySeq , pNewTable.getBucketNum() != null);
+ }
+ }
}
- }
- return isTransformNeeded;
- }
+ // Clear cache so that the new table is used for queries
+ connection.unwrap(PhoenixConnection.class).getQueryServices().clearCache();
+ TransformTool.updateTransformRecord(connection.unwrap(PhoenixConnection.class), transformRecord, PTable.TransformStatus.COMPLETED);
- private static boolean isTransformNeeded(MetaDataClient.MetaProperties metaProperties, PTable table){
- if (metaProperties.getImmutableStorageSchemeProp()!=null
- && metaProperties.getImmutableStorageSchemeProp() != table.getImmutableStorageScheme()) {
- // Transform is needed
- return true;
+ // TODO Kick partial transform from the TransformMonitor
+ SystemTransformRecord.SystemTransformBuilder builder = new SystemTransformRecord.SystemTransformBuilder(transformRecord);
+ builder.setTransformStatus(PTable.TransformStatus.CREATED.name());
+ builder.setTransformJobId(null);
+ builder.setStartTs(new Timestamp(timestamp));
+ builder.setTransformRetryCount(0);
+ builder.setTransformType(PTable.TransformType.METADATA_TRANSFORM_PARTIAL);
+ SystemTransformRecord partialStr = builder.build();
+ Transform.upsertTransform(partialStr, connection.unwrap(PhoenixConnection.class));
+ connection.commit();
+ } else {
+ TransformTool.updateTransformRecord(connection.unwrap(PhoenixConnection.class), transformRecord, PTable.TransformStatus.COMPLETED);
+ connection.commit();
+ // TODO: cleanup
}
- if (metaProperties.getColumnEncodedBytesProp()!=null
- && metaProperties.getColumnEncodedBytesProp() != table.getEncodingScheme()) {
- return true;
- }
- return false;
}
+
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
new file mode 100644
index 0000000..72bdb2d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.transform;
+
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.ByteStringer;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.phoenix.coprocessor.generated.ServerCachingProtos;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.ExpressionType;
+
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
+import org.apache.phoenix.schema.ColumnNotFoundException;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PColumnFamily;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.RowKeySchema;
+import org.apache.phoenix.schema.SaltingUtil;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.util.EncodedColumnsUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TrustedByteArrayOutputStream;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
+
+public class TransformMaintainer extends IndexMaintainer {
+ private boolean isMultiTenant;
+ // indexed expressions that are not present in the row key of the data table, the expression can also refer to a regular column
+ private List<Expression> newTableExpressions;
+ private Set<ColumnReference> newTableColumns;
+
+ private List<PDataType> newTableColumnTypes;
+ private int newTableColumnCount;
+ private byte[] newTableName;
+ private int nNewTableSaltBuckets;
+ private byte[] oldTableEmptyKeyValueCF;
+ private ImmutableBytesPtr emptyKeyValueCFPtr;
+ private int nOldTableCFs;
+ private boolean newTableWALDisabled;
+ private boolean newTableImmutableRows;
+ // Transient state
+ private final boolean isOldTableSalted;
+ private final RowKeySchema oldTableRowKeySchema;
+
+ private int estimatedNewTableRowKeyBytes;
+ private ColumnReference newTableEmptyKeyValueRef;
+ private ColumnReference oldTableEmptyKeyValueRef;
+ private boolean newTableRowKeyOrderOptimizable;
+
+ private PTable.QualifierEncodingScheme newTableEncodingScheme;
+ private PTable.ImmutableStorageScheme newTableImmutableStorageScheme;
+ private PTable.QualifierEncodingScheme oldTableEncodingScheme;
+ private PTable.ImmutableStorageScheme oldTableImmutableStorageScheme;
+ /*
+ * The first part of the pair is column family name
+ * and second part is the column name. The reason we need to track this state is because for certain storage schemes
+ * like ImmutableStorageScheme#SINGLE_CELL_ARRAY_WITH_OFFSETS, the column for which we need to generate an new
+ * table put/delete is different from the old columns in the phoenix schema.
+ */
+ private Set<Pair<String, String>> newTableColumnsInfo;
+ /*
+ * Map of covered columns where a key is column reference for a column in the data table
+ * and value is column reference for corresponding column in the new table.
+ */
+ private Map<ColumnReference, ColumnReference> coveredColumnsMap;
+
+ private String logicalNewTableName;
+
+ public static TransformMaintainer create(PTable oldTable, PTable newTable, PhoenixConnection connection) {
+ if (oldTable.getType() == PTableType.INDEX) {
+ throw new IllegalArgumentException();
+ }
+ TransformMaintainer maintainer = new TransformMaintainer(oldTable, newTable, connection);
+ return maintainer;
+ }
+
+ private TransformMaintainer(RowKeySchema oldRowKeySchema, boolean isOldTableSalted) {
+ super(oldRowKeySchema, isOldTableSalted);
+ this.oldTableRowKeySchema = oldRowKeySchema;
+ this.isOldTableSalted = isOldTableSalted;
+ }
+
+ private TransformMaintainer(final PTable oldTable, final PTable newTable, PhoenixConnection connection) {
+ this(oldTable.getRowKeySchema(), oldTable.getBucketNum() != null);
+ this.newTableRowKeyOrderOptimizable = newTable.rowKeyOrderOptimizable();
+ this.isMultiTenant = oldTable.isMultiTenant();
+
+ this.newTableEncodingScheme = newTable.getEncodingScheme() == null ? PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : newTable.getEncodingScheme();
+ this.newTableImmutableStorageScheme = newTable.getImmutableStorageScheme() == null ? PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN : newTable.getImmutableStorageScheme();
+ this.oldTableEncodingScheme = oldTable.getEncodingScheme() == null ? PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : oldTable.getEncodingScheme();
+ this.oldTableImmutableStorageScheme = oldTable.getImmutableStorageScheme() == null ? PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN : oldTable.getImmutableStorageScheme();
+
+ this.newTableName = newTable.getPhysicalName().getBytes();
+ boolean newTableWALDisabled = newTable.isWALDisabled();
+ int nNewTableColumns = newTable.getColumns().size();
+ int nNewTablePKColumns = newTable.getPKColumns().size();
+
+ List<PColumn> oldTablePKColumns = oldTable.getPKColumns();
+
+ this.newTableColumnCount = oldTablePKColumns.size();
+
+ this.newTableColumnTypes = Lists.newArrayListWithExpectedSize(nNewTablePKColumns);
+ this.newTableExpressions = Lists.newArrayListWithExpectedSize(nNewTableColumns);
+ this.coveredColumnsMap = Maps.newHashMapWithExpectedSize(nNewTableColumns - nNewTablePKColumns);
+ this.nNewTableSaltBuckets = newTable.getBucketNum() == null ? 0 : newTable.getBucketNum();
+ this.oldTableEmptyKeyValueCF = SchemaUtil.getEmptyColumnFamily(oldTable);
+ this.emptyKeyValueCFPtr = SchemaUtil.getEmptyColumnFamilyPtr(newTable);
+ this.nOldTableCFs = oldTable.getColumnFamilies().size();
+ this.newTableWALDisabled = newTableWALDisabled;
+ this.newTableImmutableRows = newTable.isImmutableRows();
+ this.newTableColumnsInfo = Sets.newHashSetWithExpectedSize(nNewTableColumns - nNewTablePKColumns);
+
+ for (int i = 0; i < newTable.getColumnFamilies().size(); i++) {
+ PColumnFamily family = newTable.getColumnFamilies().get(i);
+ for (PColumn newColumn : family.getColumns()) {
+ PColumn oldColumn = getColumnOrNull(oldTable, newColumn.getName().getString(), newColumn.getFamilyName().getString());
+ // This can happen during deletion where we don't need covered columns
+ if (oldColumn != null) {
+ byte[] oldColumnCq = oldColumn.getColumnQualifierBytes();
+ byte[] newColumnCq = newColumn.getColumnQualifierBytes();
+ this.coveredColumnsMap.put(new ColumnReference(oldColumn.getFamilyName().getBytes(), oldColumnCq),
+ new ColumnReference(newColumn.getFamilyName().getBytes(), newColumnCq));
+ }
+ }
+ }
+ this.logicalNewTableName = newTable.getName().getString();
+ initCachedState();
+ }
+
+ public static PColumn getColumnOrNull(PTable table, String columnName, String familyName) {
+ PColumnFamily family;
+ try {
+ family = table.getColumnFamily(familyName);
+ } catch (ColumnFamilyNotFoundException e) {
+ return null;
+ }
+ try {
+ return family.getPColumnForColumnName(columnName);
+ } catch (ColumnNotFoundException e) {
+ return null;
+ }
+ }
+
+ /**
+ * Init calculated state reading/creating
+ */
+ private void initCachedState() {
+ byte[] newTableEmptyKvQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(newTableEncodingScheme).getFirst();
+ byte[] oldTableEmptyKvQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(oldTableEncodingScheme).getFirst();
+ newTableEmptyKeyValueRef = new ColumnReference(oldTableEmptyKeyValueCF, newTableEmptyKvQualifier);
+ oldTableEmptyKeyValueRef = new ColumnReference(oldTableEmptyKeyValueCF, oldTableEmptyKvQualifier);
+ this.newTableColumns = Sets.newLinkedHashSetWithExpectedSize(this.newTableColumnCount);
+
+ for (ColumnReference colRef : coveredColumnsMap.keySet()) {
+ if (newTableImmutableStorageScheme == PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN) {
+ newTableColumns.add(colRef);
+ } else {
+ newTableColumns.add(new ColumnReference(colRef.getFamily(), QueryConstants.SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES));
+ }
+ }
+ }
+
+ /**
+ * For client-side to serialize TransformMaintainer for a given table
+ *
+ * @param oldTable old table
+ * @param ptr bytes pointer to hold returned serialized value
+ * @param newTable new table to serialize
+ */
+ public static void serialize(PTable oldTable, ImmutableBytesWritable ptr,
+ PTable newTable, PhoenixConnection connection) {
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ DataOutputStream output = new DataOutputStream(stream);
+ try {
+ // Encode data table salting
+ WritableUtils.writeVInt(output, oldTable.getBucketNum() == null ? 1 : -1);
+ // Write out data row key schema once, since it's the same
+ oldTable.getRowKeySchema().write(output);
+ org.apache.phoenix.coprocessor.generated.ServerCachingProtos.TransformMaintainer proto =
+ TransformMaintainer.toProto(newTable.getTransformMaintainer(oldTable, connection));
+ byte[] protoBytes = proto.toByteArray();
+ WritableUtils.writeVInt(output, protoBytes.length);
+ output.write(protoBytes);
+
+ } catch (IOException e) {
+ throw new RuntimeException(e); // Impossible
+ }
+ ptr.set(stream.toByteArray(), 0, stream.size());
+ }
+
+ public static ServerCachingProtos.TransformMaintainer toProto(TransformMaintainer maintainer) throws IOException {
+ ServerCachingProtos.TransformMaintainer.Builder builder = ServerCachingProtos.TransformMaintainer.newBuilder();
+ builder.setSaltBuckets(maintainer.nNewTableSaltBuckets);
+ builder.setIsMultiTenant(maintainer.isMultiTenant);
+
+ for (ColumnReference colRef : maintainer.newTableColumns) {
+ ServerCachingProtos.ColumnReference.Builder cRefBuilder = ServerCachingProtos.ColumnReference.newBuilder();
+ cRefBuilder.setFamily(ByteStringer.wrap(colRef.getFamily()));
+ cRefBuilder.setQualifier(ByteStringer.wrap(colRef.getQualifier()));
+ builder.addNewTableColumns(cRefBuilder.build());
+ }
+
+ for (Map.Entry<ColumnReference, ColumnReference> e : maintainer.coveredColumnsMap.entrySet()) {
+ ServerCachingProtos.ColumnReference.Builder cRefBuilder = ServerCachingProtos.ColumnReference.newBuilder();
+ ColumnReference dataTableColRef = e.getKey();
+ cRefBuilder.setFamily(ByteStringer.wrap(dataTableColRef.getFamily()));
+ cRefBuilder.setQualifier(ByteStringer.wrap(dataTableColRef.getQualifier()));
+ builder.addOldTableColRefForCoveredColumns(cRefBuilder.build());
+ if (maintainer.newTableEncodingScheme != NON_ENCODED_QUALIFIERS) {
+ // We need to serialize the colRefs of new tables only in case of encoded column names.
+ ColumnReference newTableColRef = e.getValue();
+ cRefBuilder = ServerCachingProtos.ColumnReference.newBuilder();
+ cRefBuilder.setFamily(ByteStringer.wrap(newTableColRef.getFamily()));
+ cRefBuilder.setQualifier(ByteStringer.wrap(newTableColRef.getQualifier()));
+ builder.addNewTableColRefForCoveredColumns(cRefBuilder.build());
+ }
+ }
+
+ builder.setNewTableColumnCount(maintainer.newTableColumnCount);
+ builder.setNewTableName(ByteStringer.wrap(maintainer.newTableName));
+ builder.setNewTableRowKeyOrderOptimizable(maintainer.newTableRowKeyOrderOptimizable);
+ builder.setOldTableEmptyKeyValueColFamily(ByteStringer.wrap(maintainer.oldTableEmptyKeyValueCF));
+ ServerCachingProtos.ImmutableBytesWritable.Builder ibwBuilder = ServerCachingProtos.ImmutableBytesWritable.newBuilder();
+ ibwBuilder.setByteArray(ByteStringer.wrap(maintainer.emptyKeyValueCFPtr.get()));
+ ibwBuilder.setLength(maintainer.emptyKeyValueCFPtr.getLength());
+ ibwBuilder.setOffset(maintainer.emptyKeyValueCFPtr.getOffset());
+ builder.setEmptyKeyValueColFamily(ibwBuilder.build());
+ try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) {
+ DataOutput output = new DataOutputStream(stream);
+ for (Expression expression : maintainer.newTableExpressions) {
+ WritableUtils.writeVInt(output, ExpressionType.valueOf(expression).ordinal());
+ expression.write(output);
+ }
+ builder.setNewTableExpressions(ByteStringer.wrap(stream.toByteArray()));
+ }
+
+ builder.setNumDataTableColFamilies(maintainer.nOldTableCFs);
+ builder.setNewTableWalDisabled(maintainer.newTableWALDisabled);
+ builder.setNewTableRowKeyByteSize(maintainer.estimatedNewTableRowKeyBytes);
+ builder.setNewTableImmutable(maintainer.newTableImmutableRows);
+ for (Pair<String, String> p : maintainer.newTableColumnsInfo) {
+ ServerCachingProtos.ColumnInfo.Builder ciBuilder = ServerCachingProtos.ColumnInfo.newBuilder();
+ if (p.getFirst() != null) {
+ ciBuilder.setFamilyName(p.getFirst());
+ }
+ ciBuilder.setColumnName(p.getSecond());
+ builder.addNewTableColumnInfo(ciBuilder.build());
+ }
+ builder.setNewTableEncodingScheme(maintainer.newTableEncodingScheme.getSerializedMetadataValue());
+ builder.setNewTableImmutableStorageScheme(maintainer.newTableImmutableStorageScheme.getSerializedMetadataValue());
+ builder.setLogicalNewTableName(maintainer.logicalNewTableName);
+ builder.setOldTableEncodingScheme(maintainer.oldTableEncodingScheme.getSerializedMetadataValue());
+ builder.setOldTableImmutableStorageScheme(maintainer.oldTableImmutableStorageScheme.getSerializedMetadataValue());
+ return builder.build();
+ }
+
+ public static TransformMaintainer fromProto(ServerCachingProtos.TransformMaintainer proto, RowKeySchema dataTableRowKeySchema, boolean isDataTableSalted) throws IOException {
+ TransformMaintainer maintainer = new TransformMaintainer(dataTableRowKeySchema, isDataTableSalted);
+ maintainer.nNewTableSaltBuckets = proto.getSaltBuckets();
+ maintainer.isMultiTenant = proto.getIsMultiTenant();
+ List<ServerCachingProtos.ColumnReference> newTableColList = proto.getNewTableColumnsList();
+ maintainer.newTableColumns = new HashSet<ColumnReference>(newTableColList.size());
+ for (ServerCachingProtos.ColumnReference colRefFromProto : newTableColList) {
+ maintainer.newTableColumns.add(new ColumnReference(colRefFromProto.getFamily().toByteArray(), colRefFromProto.getQualifier().toByteArray()));
+ }
+
+ maintainer.newTableName = proto.getNewTableName().toByteArray();
+ if (proto.getNewTableColumnCount() != -1) {
+ maintainer.newTableColumnCount = proto.getNewTableColumnCount();
+ }
+
+ maintainer.newTableRowKeyOrderOptimizable = proto.getNewTableRowKeyOrderOptimizable();
+ maintainer.oldTableEmptyKeyValueCF = proto.getOldTableEmptyKeyValueColFamily().toByteArray();
+ ServerCachingProtos.ImmutableBytesWritable emptyKeyValueColFamily = proto.getEmptyKeyValueColFamily();
+ maintainer.emptyKeyValueCFPtr = new ImmutableBytesPtr(emptyKeyValueColFamily.getByteArray().toByteArray(), emptyKeyValueColFamily.getOffset(), emptyKeyValueColFamily.getLength());
+
+ maintainer.nOldTableCFs = proto.getNumDataTableColFamilies();
+ maintainer.newTableWALDisabled = proto.getNewTableWalDisabled();
+ maintainer.estimatedNewTableRowKeyBytes = proto.getNewTableRowKeyByteSize();
+ maintainer.newTableImmutableRows = proto.getNewTableImmutable();
+ List<ServerCachingProtos.ColumnInfo> newTblColumnInfoList = proto.getNewTableColumnInfoList();
+ maintainer.newTableColumnsInfo = Sets.newHashSet();
+ for (ServerCachingProtos.ColumnInfo info : newTblColumnInfoList) {
+ maintainer.newTableColumnsInfo.add(new Pair<>(info.getFamilyName(), info.getColumnName()));
+ }
+ // proto doesn't support single byte so need an explicit cast here
+ maintainer.newTableEncodingScheme = PTable.QualifierEncodingScheme.fromSerializedValue((byte) proto.getNewTableEncodingScheme());
+ maintainer.newTableImmutableStorageScheme = PTable.ImmutableStorageScheme.fromSerializedValue((byte) proto.getNewTableImmutableStorageScheme());
+ maintainer.oldTableEncodingScheme = PTable.QualifierEncodingScheme.fromSerializedValue((byte) proto.getOldTableEncodingScheme());
+ maintainer.oldTableImmutableStorageScheme = PTable.ImmutableStorageScheme.fromSerializedValue((byte) proto.getOldTableImmutableStorageScheme());
+
+ List<ServerCachingProtos.ColumnReference> oldTableColRefsForCoveredColumnsList = proto.getOldTableColRefForCoveredColumnsList();
+ List<ServerCachingProtos.ColumnReference> newTableColRefsForCoveredColumnsList = proto.getNewTableColRefForCoveredColumnsList();
+ maintainer.coveredColumnsMap = Maps.newHashMapWithExpectedSize(oldTableColRefsForCoveredColumnsList.size());
+ boolean encodedColumnNames = maintainer.newTableEncodingScheme != NON_ENCODED_QUALIFIERS;
+ Iterator<ServerCachingProtos.ColumnReference> newTableColRefItr = newTableColRefsForCoveredColumnsList.iterator();
+ for (ServerCachingProtos.ColumnReference colRefFromProto : oldTableColRefsForCoveredColumnsList) {
+ ColumnReference oldTableColRef = new ColumnReference(colRefFromProto.getFamily().toByteArray(), colRefFromProto.getQualifier().toByteArray());
+ ColumnReference newTableColRef;
+ if (encodedColumnNames) {
+ ServerCachingProtos.ColumnReference fromProto = newTableColRefItr.next();
+ newTableColRef = new ColumnReference(fromProto.getFamily().toByteArray(), fromProto.getQualifier().toByteArray());
+ } else {
+ byte[] cq = oldTableColRef.getQualifier();
+ byte[] cf = oldTableColRef.getFamily();
+ newTableColRef = new ColumnReference(cf, cq);
+ }
+ maintainer.coveredColumnsMap.put(oldTableColRef, newTableColRef);
+ }
+ maintainer.logicalNewTableName = proto.getLogicalNewTableName();
+ maintainer.initCachedState();
+ return maintainer;
+ }
+
+
+ public static List<IndexMaintainer> deserialize(byte[] buf) {
+ return deserialize(buf, 0, buf.length);
+ }
+
+ private static List<IndexMaintainer> deserialize(byte[] buf, int offset, int length) {
+ List<IndexMaintainer> maintainers = Collections.emptyList();
+ if (length > 0) {
+ ByteArrayInputStream stream = new ByteArrayInputStream(buf, offset, length);
+ DataInput input = new DataInputStream(stream);
+ try {
+ int size = WritableUtils.readVInt(input);
+ boolean isDataTableSalted = size < 0;
+ size = Math.abs(size);
+ RowKeySchema rowKeySchema = new RowKeySchema();
+ rowKeySchema.readFields(input);
+ maintainers = Lists.newArrayListWithExpectedSize(size);
+ for (int i = 0; i < size; i++) {
+ int protoSize = WritableUtils.readVInt(input);
+ byte[] b = new byte[protoSize];
+ input.readFully(b);
+ ServerCachingProtos.TransformMaintainer proto = ServerCachingProtos.TransformMaintainer.parseFrom(b);
+ maintainers.add(TransformMaintainer.fromProto(proto, rowKeySchema, isDataTableSalted));
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e); // Impossible
+ }
+ }
+ return maintainers;
+ }
+
+ // Return new table's name
+ public byte[] getIndexTableName() {
+ return newTableName;
+ }
+
+ // Builds new table's rowkey using the old table's rowkey.
+ // This method will change when we support rowkey related transforms
+ public byte[] buildRowKey(ValueGetter valueGetter, ImmutableBytesWritable rowKeyPtr, byte[] regionStartKey, byte[] regionEndKey, long ts) {
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ boolean isNewTableSalted = nNewTableSaltBuckets > 0;
+
+ try (TrustedByteArrayOutputStream stream = new TrustedByteArrayOutputStream(estimatedNewTableRowKeyBytes)){
+ DataOutput output = new DataOutputStream(stream);
+
+ if (isNewTableSalted) {
+ output.write(0); // will be set at end to new table salt byte
+ }
+ // The oldTableRowKeySchema includes the salt byte field,
+ // so we must adjust for that here.
+ int dataPosOffset = isOldTableSalted ? 1 : 0 ;
+ //BitSet viewConstantColumnBitSet = this.rowKeyMetaData.getViewConstantColumnBitSet();
+ // Skip data table salt byte
+ int maxRowKeyOffset = rowKeyPtr.getOffset() + rowKeyPtr.getLength();
+ oldTableRowKeySchema.iterator(rowKeyPtr, ptr, dataPosOffset);
+
+ // Write new table row key
+ while (oldTableRowKeySchema.next(ptr, dataPosOffset, maxRowKeyOffset) != null) {
+ output.write(ptr.get(), ptr.getOffset(), ptr.getLength());
+ if (!oldTableRowKeySchema.getField(dataPosOffset).getDataType().isFixedWidth()) {
+ output.writeByte(SchemaUtil.getSeparatorByte(newTableRowKeyOrderOptimizable, ptr.getLength()==0
+ , oldTableRowKeySchema.getField(dataPosOffset)));
+ }
+ dataPosOffset++;
+ }
+
+ byte[] newTableRowKey = stream.getBuffer();
+ // Remove trailing nulls
+ int length = stream.size();
+ if (isNewTableSalted) {
+ // Set salt byte
+ byte saltByte = SaltingUtil.getSaltingByte(newTableRowKey, SaltingUtil.NUM_SALTING_BYTES, length-SaltingUtil.NUM_SALTING_BYTES, nNewTableSaltBuckets);
+ newTableRowKey[0] = saltByte;
+ }
+ return newTableRowKey.length == length ? newTableRowKey : Arrays.copyOf(newTableRowKey, length);
+ } catch (IOException e) {
+ throw new RuntimeException(e); // Impossible
+ }
+ }
+
+ public Put buildUpdateMutation(KeyValueBuilder kvBuilder, ValueGetter valueGetter, ImmutableBytesWritable oldRowKeyPtr, long ts, byte[] regionStartKey, byte[] regionEndKey) throws IOException {
+ byte[] newRowKey = this.buildRowKey(valueGetter, oldRowKeyPtr, regionStartKey, regionEndKey, ts);
+ return buildUpdateMutation(kvBuilder, valueGetter, oldRowKeyPtr, ts, regionStartKey, regionEndKey,
+ newRowKey, this.getEmptyKeyValueFamily(), coveredColumnsMap,
+ newTableEmptyKeyValueRef, newTableWALDisabled, oldTableImmutableStorageScheme, newTableImmutableStorageScheme, newTableEncodingScheme);
+ }
+
+ public ImmutableBytesPtr getEmptyKeyValueFamily() {
+ return emptyKeyValueCFPtr;
+ }
+
+ public byte[] getEmptyKeyValueQualifier() {
+ return newTableEmptyKeyValueRef.getQualifier();
+ }
+
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/protobuf/ServerCachingService.proto b/phoenix-core/src/main/protobuf/ServerCachingService.proto
index 9993992..9df9684 100644
--- a/phoenix-core/src/main/protobuf/ServerCachingService.proto
+++ b/phoenix-core/src/main/protobuf/ServerCachingService.proto
@@ -70,6 +70,31 @@ message IndexMaintainer {
optional int32 dataImmutableStorageScheme = 27;
}
+message TransformMaintainer {
+ required int32 saltBuckets = 1;
+ required bool isMultiTenant = 2;
+ repeated ColumnReference newTableColumns = 3;
+ repeated ColumnReference oldTableColRefForCoveredColumns = 4;
+ repeated ColumnReference newTableColRefForCoveredColumns = 5;
+ required bytes newTableName = 6;
+ required bool newTableRowKeyOrderOptimizable = 7;
+ required bytes oldTableEmptyKeyValueColFamily = 8;
+ required ImmutableBytesWritable emptyKeyValueColFamily = 9;
+ optional bytes newTableExpressions = 10;
+ required int32 numDataTableColFamilies = 11;
+ required bool newTableWalDisabled = 12;
+ required int32 newTableRowKeyByteSize = 13;
+ required bool newTableImmutable = 14;
+ repeated ColumnInfo newTableColumnInfo = 15;
+ required int32 newTableEncodingScheme = 16;
+ required int32 newTableImmutableStorageScheme = 17;
+ optional int32 newTableColumnCount = 18 [default = -1];
+ optional string parentTableType = 19;
+ optional string logicalNewTableName = 20;
+ optional int32 oldTableEncodingScheme = 21;
+ optional int32 oldTableImmutableStorageScheme = 22;
+}
+
message AddServerCacheRequest {
optional bytes tenantId = 1;
required bytes cacheId = 2;
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index b534603..d138d42 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -196,6 +196,7 @@ import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFac
public abstract class BaseTest {
public static final String DRIVER_CLASS_NAME_ATTRIB = "phoenix.driver.class.name";
+ protected static final String NULL_STRING="NULL";
private static final double ZERO = 1e-9;
private static final Map<String,String> tableDDLMap;
private static final Logger LOGGER = LoggerFactory.getLogger(BaseTest.class);