You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2017/05/02 01:29:42 UTC
[1/2] kudu git commit: KUDU-861 Support changing column defaults and
storage attributes
Repository: kudu
Updated Branches:
refs/heads/master 3663e9a9d -> 28869e086
KUDU-861 Support changing column defaults and storage attributes
This patch adds support for adding, changing, or removing column defaults
and changing the storage attributes of a column.
A previous patch for the same features did not maintain backwards compatibility.
This patch does (manually verified).
Change-Id: I48dcfd1ced488943c3da1eb0a26f62780ac9214f
Reviewed-on: http://gerrit.cloudera.org:8080/6725
Tested-by: Kudu Jenkins
Reviewed-by: Jean-Daniel Cryans <jd...@apache.org>
Reviewed-by: Dan Burkert <da...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/a17efaa3
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/a17efaa3
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/a17efaa3
Branch: refs/heads/master
Commit: a17efaa36f9d8efc42f191d1258d7f36ed3c2add
Parents: 3663e9a
Author: Will Berkeley <wd...@gmail.com>
Authored: Tue Apr 25 10:40:03 2017 -0700
Committer: Will Berkeley <wd...@gmail.com>
Committed: Mon May 1 17:39:52 2017 +0000
----------------------------------------------------------------------
.../apache/kudu/client/AlterTableOptions.java | 100 +++++++++
.../org/apache/kudu/client/ProtobufHelper.java | 40 ++++
.../org/apache/kudu/client/TestKuduTable.java | 100 ++++++++-
src/kudu/client/client-test.cc | 209 +++++++++++++++----
src/kudu/client/schema.cc | 53 ++++-
src/kudu/client/schema.h | 5 +
src/kudu/client/table_alterer-internal.cc | 45 ++--
src/kudu/client/value-internal.h | 4 +
src/kudu/client/value.cc | 18 ++
src/kudu/common/common.proto | 12 ++
src/kudu/common/schema.cc | 84 +++++++-
src/kudu/common/schema.h | 41 +++-
src/kudu/common/wire_protocol.cc | 48 +++++
src/kudu/common/wire_protocol.h | 10 +-
.../alter_table-randomized-test.cc | 99 ++++++++-
src/kudu/integration-tests/alter_table-test.cc | 110 ++++++++++
src/kudu/master/catalog_manager.cc | 20 +-
src/kudu/master/master.proto | 8 +-
18 files changed, 910 insertions(+), 96 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/a17efaa3/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableOptions.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableOptions.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableOptions.java
index 31617aa..a096ea53 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableOptions.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableOptions.java
@@ -17,11 +17,15 @@
package org.apache.kudu.client;
+import static org.apache.kudu.ColumnSchema.CompressionAlgorithm;
+import static org.apache.kudu.ColumnSchema.Encoding;
import static org.apache.kudu.master.Master.AlterTableRequestPB;
import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Common;
import org.apache.kudu.Type;
import org.apache.kudu.annotations.InterfaceAudience;
import org.apache.kudu.annotations.InterfaceStability;
@@ -120,6 +124,7 @@ public class AlterTableOptions {
* @return this instance
*/
public AlterTableOptions renameColumn(String oldName, String newName) {
+ // For backwards compatibility, this uses the RENAME_COLUMN step type.
AlterTableRequestPB.Step.Builder step = pb.addAlterSchemaStepsBuilder();
step.setType(AlterTableRequestPB.StepType.RENAME_COLUMN);
step.setRenameColumn(AlterTableRequestPB.RenameColumn.newBuilder().setOldName(oldName)
@@ -128,6 +133,101 @@ public class AlterTableOptions {
}
/**
+ * Remove the default value for a column.
+ * @param name name of the column
+ * @return this instance
+ */
+ public AlterTableOptions removeDefault(String name) {
+ AlterTableRequestPB.Step.Builder step = pb.addAlterSchemaStepsBuilder();
+ step.setType(AlterTableRequestPB.StepType.ALTER_COLUMN);
+ AlterTableRequestPB.AlterColumn.Builder alterBuilder =
+ AlterTableRequestPB.AlterColumn.newBuilder();
+ alterBuilder.setDelta(
+ Common.ColumnSchemaDeltaPB.newBuilder().setName(name).setRemoveDefault(true));
+ step.setAlterColumn(alterBuilder);
+ return this;
+ }
+
+ /**
+ * Change the default value for a column. `newDefault` must not be null or
+ * else throws {@link IllegalArgumentException}.
+ * @param name name of the column
+ * @param newDefault the new default value
+ * @return this instance
+ */
+ public AlterTableOptions changeDefault(String name, Object newDefault) {
+ if (newDefault == null) {
+ throw new IllegalArgumentException("newDefault cannot be null: " +
+ "use removeDefault to clear a default value");
+ }
+
+ ByteString defaultByteString = ProtobufHelper.objectToByteStringNoType(name, newDefault);
+ AlterTableRequestPB.Step.Builder step = pb.addAlterSchemaStepsBuilder();
+ step.setType(AlterTableRequestPB.StepType.ALTER_COLUMN);
+ AlterTableRequestPB.AlterColumn.Builder alterBuilder =
+ AlterTableRequestPB.AlterColumn.newBuilder();
+ alterBuilder.setDelta(
+ Common.ColumnSchemaDeltaPB.newBuilder().setName(name)
+ .setDefaultValue(defaultByteString));
+ step.setAlterColumn(alterBuilder);
+ return this;
+ }
+
+ /**
+ * Change the block size of a column's storage. A nonpositive value indicates
+ * a server-side default.
+ * @param name name of the column
+ * @param blockSize the new block size
+ * @return this instance
+ */
+ public AlterTableOptions changeDesiredBlockSize(String name, int blockSize) {
+ AlterTableRequestPB.Step.Builder step = pb.addAlterSchemaStepsBuilder();
+ step.setType(AlterTableRequestPB.StepType.ALTER_COLUMN);
+ AlterTableRequestPB.AlterColumn.Builder alterBuilder =
+ AlterTableRequestPB.AlterColumn.newBuilder();
+ alterBuilder.setDelta(
+ Common.ColumnSchemaDeltaPB.newBuilder().setName(name).setBlockSize(blockSize));
+ step.setAlterColumn(alterBuilder);
+ return this;
+ }
+
+ /**
+ * Change the encoding used for a column.
+ * @param name name of the column
+ * @param encoding the new encoding
+ * @return this instance
+ */
+ public AlterTableOptions changeEncoding(String name, Encoding encoding) {
+ AlterTableRequestPB.Step.Builder step = pb.addAlterSchemaStepsBuilder();
+ step.setType(AlterTableRequestPB.StepType.ALTER_COLUMN);
+ AlterTableRequestPB.AlterColumn.Builder alterBuilder =
+ AlterTableRequestPB.AlterColumn.newBuilder();
+ alterBuilder.setDelta(
+ Common.ColumnSchemaDeltaPB.newBuilder().setName(name)
+ .setEncoding(encoding.getInternalPbType()));
+ step.setAlterColumn(alterBuilder);
+ return this;
+ }
+
+ /**
+ * Change the compression used for a column.
+ * @param name the name of the column
+ * @param ca the new compression algorithm
+ * @return this instance
+ */
+ public AlterTableOptions changeCompressionAlgorithm(String name, CompressionAlgorithm ca) {
+ AlterTableRequestPB.Step.Builder step = pb.addAlterSchemaStepsBuilder();
+ step.setType(AlterTableRequestPB.StepType.ALTER_COLUMN);
+ AlterTableRequestPB.AlterColumn.Builder alterBuilder =
+ AlterTableRequestPB.AlterColumn.newBuilder();
+ alterBuilder.setDelta(
+ Common.ColumnSchemaDeltaPB.newBuilder().setName(name)
+ .setCompression(ca.getInternalPbType()));
+ step.setAlterColumn(alterBuilder);
+ return this;
+ }
+
+ /**
* Add a range partition to the table with an inclusive lower bound and an exclusive upper bound.
*
* If either row is empty, then that end of the range will be unbounded. If a range column is
http://git-wip-us.apache.org/repos/asf/kudu/blob/a17efaa3/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java
index 5831e25..4ed71c6 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java
@@ -17,6 +17,7 @@
package org.apache.kudu.client;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -236,6 +237,45 @@ public class ProtobufHelper {
}
/**
+ * Serializes an object based on its Java type. Used for Alter Column
+ * operations where the column's type is not available. `value` must be
+ * a Kudu-compatible type or else throws {@link IllegalArgumentException}.
+ *
+ * @param colName the name of the column (for the error message)
+ * @param value the value to serialize
+ * @return the serialized object
+ */
+ protected static ByteString objectToByteStringNoType(String colName, Object value) {
+ byte[] bytes;
+ if (value instanceof Boolean) {
+ bytes = Bytes.fromBoolean((Boolean) value);
+ } else if (value instanceof Byte) {
+ bytes = new byte[] {(Byte) value};
+ } else if (value instanceof Short) {
+ bytes = Bytes.fromShort((Short) value);
+ } else if (value instanceof Integer) {
+ bytes = Bytes.fromInt((Integer) value);
+ } else if (value instanceof Long) {
+ bytes = Bytes.fromLong((Long) value);
+ } else if (value instanceof String) {
+ bytes = ((String) value).getBytes(Charsets.UTF_8);
+ } else if (value instanceof byte[]) {
+ bytes = (byte[]) value;
+ } else if (value instanceof ByteBuffer) {
+ bytes = ((ByteBuffer) value).array();
+ } else if (value instanceof Float) {
+ bytes = Bytes.fromFloat((Float) value);
+ } else if (value instanceof Double) {
+ bytes = Bytes.fromDouble((Double) value);
+ } else {
+ throw new IllegalArgumentException("The default value provided for " +
+ "column " + colName + " is of class " + value.getClass().getName() +
+ " which does not map to a supported Kudu type");
+ }
+ return ZeroCopyLiteralByteString.wrap(bytes);
+ }
+
+ /**
* Convert a {@link com.google.common.net.HostAndPort} to
* {@link org.apache.kudu.Common.HostPortPB}
* protobuf message for serialization.
http://git-wip-us.apache.org/repos/asf/kudu/blob/a17efaa3/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTable.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTable.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTable.java
index 09ef520..0a5ece1 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTable.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTable.java
@@ -16,7 +16,12 @@
// under the License.
package org.apache.kudu.client;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.List;
@@ -48,6 +53,98 @@ public class TestKuduTable extends BaseKuduTest {
}
@Test(timeout = 100000)
+ public void testAlterColumn() throws Exception {
+ // Used a simplified schema because basicSchema has extra columns that make the asserts verbose.
+ String tableName = name.getMethodName() + System.currentTimeMillis();
+ List<ColumnSchema> columns = ImmutableList.of(
+ new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build(),
+ new ColumnSchema.ColumnSchemaBuilder("value", Type.STRING)
+ .nullable(true)
+ .desiredBlockSize(4096)
+ .encoding(ColumnSchema.Encoding.PLAIN_ENCODING)
+ .compressionAlgorithm(ColumnSchema.CompressionAlgorithm.NO_COMPRESSION)
+ .build());
+ KuduTable table = createTable(tableName, new Schema(columns), getBasicCreateTableOptions());
+ KuduSession session = syncClient.newSession();
+ // Insert a row before a default is defined and check the value is NULL.
+ insertDefaultRow(table, session, 0);
+ List<String> rows = scanTableToStrings(table);
+ assertEquals("wrong number of rows", 1, rows.size());
+ assertEquals("wrong row", "INT32 key=0, STRING value=NULL", rows.get(0));
+
+ // Add a default, checking new rows see the new default and old rows remain the same.
+ AlterTableOptions ato = new AlterTableOptions().changeDefault("value", "pizza");
+ submitAlterAndCheck(ato, tableName);
+
+ insertDefaultRow(table, session, 1);
+ rows = scanTableToStrings(table);
+ assertEquals("wrong number of rows", 2, rows.size());
+ assertEquals("wrong row", "INT32 key=0, STRING value=NULL", rows.get(0));
+ assertEquals("wrong row", "INT32 key=1, STRING value=pizza", rows.get(1));
+
+ // Change the default, checking new rows see the new default and old rows remain the same.
+ ato = new AlterTableOptions().changeDefault("value", "taco");
+ submitAlterAndCheck(ato, tableName);
+
+ insertDefaultRow(table, session, 2);
+
+ rows = scanTableToStrings(table);
+ assertEquals("wrong number of rows", 3, rows.size());
+ assertEquals("wrong row", "INT32 key=0, STRING value=NULL", rows.get(0));
+ assertEquals("wrong row", "INT32 key=1, STRING value=pizza", rows.get(1));
+ assertEquals("wrong row", "INT32 key=2, STRING value=taco", rows.get(2));
+
+ // Remove the default, checking that new rows default to NULL and old rows remain the same.
+ ato = new AlterTableOptions().removeDefault("value");
+ submitAlterAndCheck(ato, tableName);
+
+ insertDefaultRow(table, session, 3);
+
+ rows = scanTableToStrings(table);
+ assertEquals("wrong number of rows", 4, rows.size());
+ assertEquals("wrong row", "INT32 key=0, STRING value=NULL", rows.get(0));
+ assertEquals("wrong row", "INT32 key=1, STRING value=pizza", rows.get(1));
+ assertEquals("wrong row", "INT32 key=2, STRING value=taco", rows.get(2));
+ assertEquals("wrong row", "INT32 key=3, STRING value=NULL", rows.get(3));
+
+ // Change the column storage attributes.
+ assertEquals("wrong block size",
+ 4096,
+ table.getSchema().getColumn("value").getDesiredBlockSize());
+ assertEquals("wrong encoding",
+ ColumnSchema.Encoding.PLAIN_ENCODING,
+ table.getSchema().getColumn("value").getEncoding());
+ assertEquals("wrong compression algorithm",
+ ColumnSchema.CompressionAlgorithm.NO_COMPRESSION,
+ table.getSchema().getColumn("value").getCompressionAlgorithm());
+
+ ato = new AlterTableOptions().changeDesiredBlockSize("value", 8192)
+ .changeEncoding("value", ColumnSchema.Encoding.DICT_ENCODING)
+ .changeCompressionAlgorithm("value", ColumnSchema.CompressionAlgorithm.SNAPPY);
+ submitAlterAndCheck(ato, tableName);
+
+ KuduTable reopenedTable = syncClient.openTable(tableName);
+ assertEquals("wrong block size post alter",
+ 8192,
+ reopenedTable.getSchema().getColumn("value").getDesiredBlockSize());
+ assertEquals("wrong encoding post alter",
+ ColumnSchema.Encoding.DICT_ENCODING,
+ reopenedTable.getSchema().getColumn("value").getEncoding());
+ assertEquals("wrong compression algorithm post alter",
+ ColumnSchema.CompressionAlgorithm.SNAPPY,
+ reopenedTable.getSchema().getColumn("value").getCompressionAlgorithm());
+ }
+
+ private void insertDefaultRow(KuduTable table, KuduSession session, int key)
+ throws Exception {
+ Insert insert = table.newInsert();
+ PartialRow row = insert.getRow();
+ row.addInt("key", key);
+ // Omit value.
+ session.apply(insert);
+ }
+
+ @Test(timeout = 100000)
public void testAlterTable() throws Exception {
String tableName = name.getMethodName() + System.currentTimeMillis();
createTable(tableName, basicSchema, getBasicCreateTableOptions());
@@ -83,7 +180,6 @@ public class TestKuduTable extends BaseKuduTest {
(System.currentTimeMillis() * 1000));
submitAlterAndCheck(ato, tableName);
-
// Try altering a table that doesn't exist.
String nonExistingTableName = "table_does_not_exist";
try {
http://git-wip-us.apache.org/repos/asf/kudu/blob/a17efaa3/src/kudu/client/client-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index c043a2b..c4f41b2 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -351,7 +351,7 @@ class ClientTest : public KuduTest {
void DoTestScanResourceMetrics() {
KuduScanner scanner(client_table_.get());
string tablet_id = GetFirstTabletId(client_table_.get());
- // flush to ensure we scan disk later
+ // Flush to ensure we scan disk later
FlushTablet(tablet_id);
ASSERT_OK(scanner.SetProjectedColumns({ "key" }));
LOG_TIMING(INFO, "Scanning disk with no predicates") {
@@ -763,8 +763,8 @@ TEST_F(ClientTest, TestScanAtSnapshot) {
ASSERT_NO_FATAL_FAILURE(InsertTestRows(client_table_.get(),
half_the_rows));
- // get the time from the server and transform to micros disregarding any
- // logical values (we shouldn't have any with a single server anyway);
+ // Get the time from the server and transform to micros, disregarding any
+ // logical values (we shouldn't have any with a single server anyway).
int64_t ts = server::HybridClock::GetPhysicalValueMicros(
cluster_->mini_tablet_server(0)->server()->clock()->Now());
@@ -1342,7 +1342,7 @@ TEST_F(ClientTest, TestNonCoveringRangePartitions) {
// Scans
- { // full table scan
+ { // Full table scan
vector<string> rows;
KuduScanner scanner(table.get());
ASSERT_OK(scanner.SetFaultTolerant());
@@ -3104,7 +3104,7 @@ TEST_F(ClientTest, TestMutateDeletedRow) {
Status s = session->Flush();
ASSERT_FALSE(s.ok());
ASSERT_STR_CONTAINS(s.ToString(), "Some errors occurred");
- // verify error
+ // Verify error
gscoped_ptr<KuduError> error = GetSingleErrorFromSession(session.get());
ASSERT_EQ(error->failed_op().ToString(),
"UPDATE int32 key=1, int32 int_val=2");
@@ -3116,7 +3116,7 @@ TEST_F(ClientTest, TestMutateDeletedRow) {
s = session->Flush();
ASSERT_FALSE(s.ok());
ASSERT_STR_CONTAINS(s.ToString(), "Some errors occurred");
- // verify error
+ // Verify error
error = GetSingleErrorFromSession(session.get());
ASSERT_EQ(error->failed_op().ToString(),
"DELETE int32 key=1");
@@ -3134,7 +3134,7 @@ TEST_F(ClientTest, TestMutateNonexistentRow) {
Status s = session->Flush();
ASSERT_FALSE(s.ok());
ASSERT_STR_CONTAINS(s.ToString(), "Some errors occurred");
- // verify error
+ // Verify error
gscoped_ptr<KuduError> error = GetSingleErrorFromSession(session.get());
ASSERT_EQ(error->failed_op().ToString(),
"UPDATE int32 key=1, int32 int_val=2");
@@ -3146,7 +3146,7 @@ TEST_F(ClientTest, TestMutateNonexistentRow) {
s = session->Flush();
ASSERT_FALSE(s.ok());
ASSERT_STR_CONTAINS(s.ToString(), "Some errors occurred");
- // verify error
+ // Verify error
error = GetSingleErrorFromSession(session.get());
ASSERT_EQ(error->failed_op().ToString(),
"DELETE int32 key=1");
@@ -3272,72 +3272,108 @@ TEST_F(ClientTest, TestWriteWithBadSchema) {
TEST_F(ClientTest, TestBasicAlterOperations) {
const vector<string> kBadNames = {"", string(1000, 'x')};
- // test that having no steps throws an error
+ // Test that renaming a column to an invalid name throws an error.
+ for (const string& bad_name : kBadNames) {
+ gscoped_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
+ table_alterer->AlterColumn("int_val")->RenameTo(bad_name);
+ Status s = table_alterer->Alter();
+ ASSERT_TRUE(s.IsInvalidArgument());
+ ASSERT_STR_CONTAINS(s.ToString(), "invalid column name");
+ }
+
+ // Test that renaming a table to an invalid name throws an error.
+ for (const string& bad_name : kBadNames) {
+ gscoped_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
+ table_alterer->RenameTo(bad_name);
+ Status s = table_alterer->Alter();
+ ASSERT_TRUE(s.IsInvalidArgument());
+ ASSERT_STR_CONTAINS(s.ToString(), "invalid table name");
+ }
+
+ // Test trying to add columns to a table such that it exceeds the permitted
+ // maximum.
{
gscoped_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
+ for (int i = 0; i < 1000; i++) {
+ table_alterer->AddColumn(Substitute("c$0", i))->Type(KuduColumnSchema::INT32);
+ }
+ Status s = table_alterer->Alter();
+ ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(),
+ "number of columns 1004 is greater than the "
+ "permitted maximum 300");
+ }
+
+ // Having no steps should throw an error.
+ {
+ unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
Status s = table_alterer->Alter();
ASSERT_TRUE(s.IsInvalidArgument());
ASSERT_STR_CONTAINS(s.ToString(), "No alter steps provided");
}
- // test that adding a non-nullable column with no default value throws an error
+ // Adding a non-nullable column with no default value should throw an error.
{
- gscoped_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
+ unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
table_alterer->AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull();
Status s = table_alterer->Alter();
ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), "column `key`: NOT NULL columns must have a default");
}
- // test that remove key should throws an error
+ // Removing a key should throw an error.
{
- gscoped_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
+ unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
Status s = table_alterer
->DropColumn("key")
->Alter();
ASSERT_TRUE(s.IsInvalidArgument());
- ASSERT_STR_CONTAINS(s.ToString(), "cannot remove a key column");
+ ASSERT_STR_CONTAINS(s.ToString(), "cannot remove a key column: key");
}
- // test that renaming to an already-existing name throws an error
+ // Renaming a column to an already-existing name should throw an error.
{
- gscoped_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
+ unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
table_alterer->AlterColumn("int_val")->RenameTo("string_val");
Status s = table_alterer->Alter();
ASSERT_TRUE(s.IsAlreadyPresent());
ASSERT_STR_CONTAINS(s.ToString(), "The column already exists: string_val");
}
- // Test that renaming a column to an invalid name throws an error.
- for (const string& bad_name : kBadNames) {
- gscoped_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
- table_alterer->AlterColumn("int_val")->RenameTo(bad_name);
+ // Altering a column but specifying no alterations should throw an error.
+ {
+ unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
+ table_alterer->AlterColumn("string_val");
Status s = table_alterer->Alter();
ASSERT_TRUE(s.IsInvalidArgument());
- ASSERT_STR_CONTAINS(s.ToString(), "invalid column name");
+ ASSERT_STR_CONTAINS(s.ToString(), "no alter operation specified: string_val");
}
- // Test that renaming a table to an invalid name throws an error.
- for (const string& bad_name : kBadNames) {
- gscoped_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
- table_alterer->RenameTo(bad_name);
+ // Trying to change the type of a column is an error.
+ {
+ unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
+ table_alterer->AlterColumn("string_val")->Type(KuduColumnSchema::STRING);
Status s = table_alterer->Alter();
- ASSERT_TRUE(s.IsInvalidArgument());
- ASSERT_STR_CONTAINS(s.ToString(), "invalid table name");
+ ASSERT_TRUE(s.IsNotSupported());
+ ASSERT_STR_CONTAINS(s.ToString(), "unsupported alter operation: string_val");
}
- // Test trying to add columns to a table such that it exceeds the permitted
- // maximum.
+ // Trying to alter the nullability of a column is an error.
{
- gscoped_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
- for (int i = 0; i < 1000; i++) {
- table_alterer->AddColumn(Substitute("c$0", i))->Type(KuduColumnSchema::INT32);
- }
+ unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
+ table_alterer->AlterColumn("string_val")->Nullable();
Status s = table_alterer->Alter();
- ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
- ASSERT_STR_CONTAINS(s.ToString(),
- "number of columns 1004 is greater than the "
- "permitted maximum 300");
+ ASSERT_TRUE(s.IsNotSupported());
+ ASSERT_STR_CONTAINS(s.ToString(), "unsupported alter operation: string_val");
+ }
+
+ // Trying to alter the nullability of a column is an error.
+ {
+ unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
+ table_alterer->AlterColumn("string_val")->NotNull();
+ Status s = table_alterer->Alter();
+ ASSERT_TRUE(s.IsNotSupported());
+ ASSERT_STR_CONTAINS(s.ToString(), "unsupported alter operation: string_val");
}
// Need a tablet peer for the next set of tests.
@@ -3354,8 +3390,7 @@ TEST_F(ClientTest, TestBasicAlterOperations) {
ASSERT_EQ(1, tablet_peer->tablet()->metadata()->schema_version());
}
- // test that specifying an encoding incompatible with the column's
- // type throws an error
+ // Specifying an encoding incompatible with the column's type is an error.
{
gscoped_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
table_alterer->AddColumn("new_string_val")->Type(KuduColumnSchema::STRING)
@@ -3366,7 +3401,7 @@ TEST_F(ClientTest, TestBasicAlterOperations) {
ASSERT_EQ(1, tablet_peer->tablet()->metadata()->schema_version());
}
- // test that adds a new column of type string
+ // Test adding a new column of type string.
{
gscoped_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
table_alterer->AddColumn("new_string_val")->Type(KuduColumnSchema::STRING)
@@ -3375,7 +3410,7 @@ TEST_F(ClientTest, TestBasicAlterOperations) {
ASSERT_EQ(2, tablet_peer->tablet()->metadata()->schema_version());
}
- // test renaming primary key column
+ // Test renaming a primary key column.
{
gscoped_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
table_alterer->AlterColumn("key")->RenameTo("key2");
@@ -3384,24 +3419,108 @@ TEST_F(ClientTest, TestBasicAlterOperations) {
ASSERT_EQ(3, tablet_peer->tablet()->metadata()->schema_version());
}
- // test that changing the data type of a primary key column throws an error
+ // Changing the type of a primary key column is an error.
{
gscoped_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
table_alterer->AlterColumn("key2")->Type(KuduColumnSchema::INT64)->NotNull()->PrimaryKey();
Status s = table_alterer->Alter();
ASSERT_TRUE(s.IsNotSupported()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(),
- "Not implemented: cannot alter attributes for column: key2");
+ "Not implemented: unsupported alter operation: key2");
+ }
+
+ // Test changing a default value for a variable-length (string) column.
+ {
+ unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
+ table_alterer->AlterColumn("string_val")
+ ->Default(KuduValue::CopyString("hello!"));
+ ASSERT_OK(table_alterer->Alter());
+ ASSERT_EQ(4, tablet_peer->tablet()->metadata()->schema_version());
+ Schema schema = tablet_peer->tablet()->metadata()->schema();
+ ColumnSchema col_schema = schema.column(schema.find_column("string_val"));
+ ASSERT_FALSE(col_schema.has_read_default());
+ ASSERT_TRUE(col_schema.has_write_default());
+ ASSERT_EQ("hello!", *reinterpret_cast<const Slice*>(col_schema.write_default_value()));
}
- // test that changes table name
+ // Test changing a default value for an integer column.
+ {
+ unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
+ table_alterer->AlterColumn("non_null_with_default")
+ ->Default(KuduValue::FromInt(54321));
+ ASSERT_OK(table_alterer->Alter());
+ ASSERT_EQ(5, tablet_peer->tablet()->metadata()->schema_version());
+ Schema schema = tablet_peer->tablet()->metadata()->schema();
+ ColumnSchema col_schema = schema.column(schema.find_column("non_null_with_default"));
+ ASSERT_TRUE(col_schema.has_read_default()); // Started with a default
+ ASSERT_TRUE(col_schema.has_write_default());
+ ASSERT_EQ(54321, *reinterpret_cast<const int32_t*>(col_schema.write_default_value()));
+ }
+
+ // Test clearing a default value from a nullable column.
+ {
+ unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
+ table_alterer->AlterColumn("string_val")
+ ->RemoveDefault();
+ ASSERT_OK(table_alterer->Alter());
+ ASSERT_EQ(6, tablet_peer->tablet()->metadata()->schema_version());
+ Schema schema = tablet_peer->tablet()->metadata()->schema();
+ ColumnSchema col_schema = schema.column(schema.find_column("string_val"));
+ ASSERT_FALSE(col_schema.has_read_default());
+ ASSERT_FALSE(col_schema.has_write_default());
+ }
+
+ // Test clearing a default value from a non-nullable column.
+ {
+ unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
+ table_alterer->AlterColumn("non_null_with_default")
+ ->RemoveDefault();
+ ASSERT_OK(table_alterer->Alter());
+ ASSERT_EQ(7, tablet_peer->tablet()->metadata()->schema_version());
+ Schema schema = tablet_peer->tablet()->metadata()->schema();
+ ColumnSchema col_schema = schema.column(schema.find_column("non_null_with_default"));
+ ASSERT_TRUE(col_schema.has_read_default());
+ ASSERT_FALSE(col_schema.has_write_default());
+ }
+
+ // Test that specifying a default of the wrong size fails.
+ // Since the column's type isn't checked client-side and the client
+ // stores defaults for all integer-backed types as 64-bit integers, the client
+ // client can request defaults of the wrong type or size for a column's type.
+ {
+ unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
+ table_alterer->AlterColumn("non_null_with_default")
+ ->Default(KuduValue::CopyString("aaa"));
+ Status s = table_alterer->Alter();
+ ASSERT_TRUE(s.IsInvalidArgument());
+ ASSERT_STR_CONTAINS(s.ToString(), "wrong size for default value");
+ ASSERT_EQ(7, tablet_peer->tablet()->metadata()->schema_version());
+ }
+
+ // Test altering encoding, compression, and block size.
+ {
+ unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
+ table_alterer->AlterColumn("string_val")
+ ->Encoding(KuduColumnStorageAttributes::PLAIN_ENCODING)
+ ->Compression(KuduColumnStorageAttributes::LZ4)
+ ->BlockSize(16 * 1024 * 1024);
+ ASSERT_OK(table_alterer->Alter());
+ ASSERT_EQ(8, tablet_peer->tablet()->metadata()->schema_version());
+ Schema schema = tablet_peer->tablet()->metadata()->schema();
+ ColumnSchema col_schema = schema.column(schema.find_column("string_val"));
+ ASSERT_EQ(KuduColumnStorageAttributes::PLAIN_ENCODING, col_schema.attributes().encoding);
+ ASSERT_EQ(KuduColumnStorageAttributes::LZ4, col_schema.attributes().compression);
+ ASSERT_EQ(16 * 1024 * 1024, col_schema.attributes().cfile_block_size);
+ }
+
+ // Test changing a table name.
{
const char *kRenamedTableName = "RenamedTable";
gscoped_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
ASSERT_OK(table_alterer
->RenameTo(kRenamedTableName)
->Alter());
- ASSERT_EQ(4, tablet_peer->tablet()->metadata()->schema_version());
+ ASSERT_EQ(9, tablet_peer->tablet()->metadata()->schema_version());
ASSERT_EQ(kRenamedTableName, tablet_peer->tablet()->metadata()->table_name());
CatalogManager *catalog_manager = cluster_->mini_master()->master()->catalog_manager();
http://git-wip-us.apache.org/repos/asf/kudu/blob/a17efaa3/src/kudu/client/schema.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/schema.cc b/src/kudu/client/schema.cc
index ce806ca..bcf24b6 100644
--- a/src/kudu/client/schema.cc
+++ b/src/kudu/client/schema.cc
@@ -17,6 +17,7 @@
#include "kudu/client/schema.h"
+#include <boost/optional.hpp>
#include <glog/logging.h>
#include <unordered_map>
@@ -204,8 +205,6 @@ Status KuduColumnSpec::ToColumnSchema(KuduColumnSchema* col) const {
// Verify that the user isn't trying to use any methods that
// don't make sense for CREATE.
if (data_->has_rename_to) {
- // TODO(KUDU-861): adjust these errors as this method will also be used for
- // ALTER TABLE ADD COLUMN support.
return Status::NotSupported("cannot rename a column during CreateTable",
data_->name);
}
@@ -254,6 +253,56 @@ Status KuduColumnSpec::ToColumnSchema(KuduColumnSchema* col) const {
return Status::OK();
}
+Status KuduColumnSpec::ToColumnSchemaDelta(ColumnSchemaDelta* col_delta) const {
+ if (data_->has_type) {
+ return Status::InvalidArgument("type provided for column schema delta", data_->name);
+ }
+ if (data_->has_nullable) {
+ return Status::InvalidArgument("nullability provided for column schema delta", data_->name);
+ }
+ if (data_->primary_key) {
+ return Status::InvalidArgument("primary key set for column schema delta", data_->name);
+ }
+
+ if (data_->has_rename_to) {
+ col_delta->new_name = boost::optional<string>(std::move(data_->rename_to));
+ }
+
+ if (data_->has_default) {
+ col_delta->default_value = boost::optional<Slice>(DefaultValueAsSlice());
+ }
+
+ if (data_->remove_default) {
+ col_delta->remove_default = true;
+ }
+
+ if (col_delta->remove_default && col_delta->default_value) {
+ return Status::InvalidArgument("new default set but default also removed", data_->name);
+ }
+
+ if (data_->has_encoding) {
+ col_delta->encoding =
+ boost::optional<EncodingType>(ToInternalEncodingType(data_->encoding));
+ }
+
+ if (data_->has_compression) {
+ col_delta->compression =
+ boost::optional<CompressionType>(ToInternalCompressionType(data_->compression));
+ }
+
+ if (data_->has_block_size) {
+ col_delta->cfile_block_size = boost::optional<int32_t>(data_->block_size);
+ }
+
+ return Status::OK();
+}
+
+Slice KuduColumnSpec::DefaultValueAsSlice() const {
+ if (!data_->has_default) {
+ return Slice();
+ }
+ return data_->default_val->data_->GetSlice();
+}
////////////////////////////////////////////////////////////
// KuduSchemaBuilder
http://git-wip-us.apache.org/repos/asf/kudu/blob/a17efaa3/src/kudu/client/schema.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/schema.h b/src/kudu/client/schema.h
index 45f7c9f..d9c1ee3 100644
--- a/src/kudu/client/schema.h
+++ b/src/kudu/client/schema.h
@@ -27,6 +27,7 @@
namespace kudu {
class ColumnSchema;
+struct ColumnSchemaDelta;
class KuduPartialRow;
class Schema;
class TestWorkload;
@@ -350,6 +351,10 @@ class KUDU_EXPORT KuduColumnSpec {
Status ToColumnSchema(KuduColumnSchema* col) const;
+ Status ToColumnSchemaDelta(ColumnSchemaDelta* col_delta) const;
+
+ Slice DefaultValueAsSlice() const;
+
// Owned.
Data* data_;
};
http://git-wip-us.apache.org/repos/asf/kudu/blob/a17efaa3/src/kudu/client/table_alterer-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/table_alterer-internal.cc b/src/kudu/client/table_alterer-internal.cc
index b3ae30a..d5dfdcf 100644
--- a/src/kudu/client/table_alterer-internal.cc
+++ b/src/kudu/client/table_alterer-internal.cc
@@ -86,29 +86,44 @@ Status KuduTableAlterer::Data::ToRequest(AlterTableRequestPB* req) {
break;
}
case AlterTableRequestPB::ALTER_COLUMN:
- // TODO(KUDU-861): support altering a column in the wire protocol.
- // For now, we just give an error if the caller tries to do
- // any operation other than rename.
+ {
if (s.spec->data_->has_type ||
- s.spec->data_->has_encoding ||
- s.spec->data_->has_compression ||
s.spec->data_->has_nullable ||
- s.spec->data_->primary_key ||
- s.spec->data_->has_default ||
- s.spec->data_->default_val ||
- s.spec->data_->remove_default) {
- return Status::NotSupported("cannot alter attributes for column",
+ s.spec->data_->primary_key) {
+ return Status::NotSupported("unsupported alter operation",
s.spec->data_->name);
}
- // We only support rename column
- if (!s.spec->data_->has_rename_to) {
+ if (!s.spec->data_->has_rename_to &&
+ !s.spec->data_->has_default &&
+ !s.spec->data_->default_val &&
+ !s.spec->data_->remove_default &&
+ !s.spec->data_->has_encoding &&
+ !s.spec->data_->has_compression &&
+ !s.spec->data_->has_block_size) {
return Status::InvalidArgument("no alter operation specified",
s.spec->data_->name);
}
- pb_step->mutable_rename_column()->set_old_name(s.spec->data_->name);
- pb_step->mutable_rename_column()->set_new_name(s.spec->data_->rename_to);
- pb_step->set_type(AlterTableRequestPB::RENAME_COLUMN);
+ // If the alter is solely a column rename, fall back to using
+ // RENAME_COLUMN, for backwards compatibility.
+ // TODO(wdb) Change this when compat can be broken.
+ if (!s.spec->data_->has_default &&
+ !s.spec->data_->default_val &&
+ !s.spec->data_->remove_default &&
+ !s.spec->data_->has_encoding &&
+ !s.spec->data_->has_compression &&
+ !s.spec->data_->has_block_size) {
+ pb_step->set_type(AlterTableRequestPB::RENAME_COLUMN);
+ pb_step->mutable_rename_column()->set_old_name(s.spec->data_->name);
+ pb_step->mutable_rename_column()->set_new_name(s.spec->data_->rename_to);
+ break;
+ }
+ ColumnSchemaDelta col_delta(s.spec->data_->name);
+ RETURN_NOT_OK(s.spec->ToColumnSchemaDelta(&col_delta));
+ auto* alter_pb = pb_step->mutable_alter_column();
+ ColumnSchemaDeltaToPB(col_delta, alter_pb->mutable_delta());
+ pb_step->set_type(AlterTableRequestPB::ALTER_COLUMN);
break;
+ }
case AlterTableRequestPB::ADD_RANGE_PARTITION:
{
RowOperationsPBEncoder encoder(pb_step->mutable_add_range_partition()
http://git-wip-us.apache.org/repos/asf/kudu/blob/a17efaa3/src/kudu/client/value-internal.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/value-internal.h b/src/kudu/client/value-internal.h
index c66a7bc..590a75c 100644
--- a/src/kudu/client/value-internal.h
+++ b/src/kudu/client/value-internal.h
@@ -55,6 +55,10 @@ class KuduValue::Data {
DataType t,
void** val_void);
+ // Return this KuduValue as a Slice. The KuduValue object retains ownership
+ // of the underlying data.
+ const Slice GetSlice();
+
private:
// Check that this value has the expected type 'type', returning
// a nice error Status if not.
http://git-wip-us.apache.org/repos/asf/kudu/blob/a17efaa3/src/kudu/client/value.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/value.cc b/src/kudu/client/value.cc
index 7971748..3eb2993 100644
--- a/src/kudu/client/value.cc
+++ b/src/kudu/client/value.cc
@@ -188,5 +188,23 @@ Status KuduValue::Data::CheckAndPointToString(const string& col_name,
return Status::OK();
}
+const Slice KuduValue::Data::GetSlice() {
+ switch (type_) {
+ case INT: {
+ return Slice(reinterpret_cast<uint8_t*>(&int_val_),
+ sizeof(int64_t));
+ }
+ case FLOAT:
+ return Slice(reinterpret_cast<uint8_t*>(&float_val_),
+ sizeof(float));
+ case DOUBLE:
+ return Slice(reinterpret_cast<uint8_t*>(&double_val_),
+ sizeof(double));
+ case SLICE:
+ return slice_val_;
+ }
+ LOG(FATAL) << "unreachable!";
+}
+
} // namespace client
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/a17efaa3/src/kudu/common/common.proto
----------------------------------------------------------------------
diff --git a/src/kudu/common/common.proto b/src/kudu/common/common.proto
index ff501f1..6b1b608 100644
--- a/src/kudu/common/common.proto
+++ b/src/kudu/common/common.proto
@@ -95,6 +95,18 @@ message ColumnSchemaPB {
optional int32 cfile_block_size = 10 [default=0];
}
+message ColumnSchemaDeltaPB {
+ optional string name = 1;
+ optional string new_name = 2;
+
+ optional bytes default_value = 4;
+ optional bool remove_default = 5;
+
+ optional EncodingType encoding = 6;
+ optional CompressionType compression = 7;
+ optional int32 block_size = 8;
+}
+
message SchemaPB {
repeated ColumnSchemaPB columns = 1;
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/a17efaa3/src/kudu/common/schema.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/schema.cc b/src/kudu/common/schema.cc
index bd83088..94fcf87 100644
--- a/src/kudu/common/schema.cc
+++ b/src/kudu/common/schema.cc
@@ -17,9 +17,10 @@
#include "kudu/common/schema.h"
-#include <set>
#include <algorithm>
+#include <set>
+#include "kudu/gutil/map-util.h"
#include "kudu/gutil/stringprintf.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/strcat.h"
@@ -29,8 +30,6 @@
namespace kudu {
-using std::set;
-using std::unordered_map;
using std::unordered_set;
// In a new schema, we typically would start assigning column IDs at 0. However, this
@@ -52,7 +51,44 @@ string ColumnStorageAttributes::ToString() const {
cfile_block_size);
}
-// TODO: include attributes_.ToString() -- need to fix unit tests
+Status ColumnSchema::ApplyDelta(const ColumnSchemaDelta& col_delta) {
+ // This method does all validation up-front before making any changes to
+ // the schema, so that if we return an error then we are guaranteed to
+ // have had no effect
+ if (type_info()->physical_type() != BINARY) {
+ if (col_delta.default_value && col_delta.default_value->size() < type_info()->size()) {
+ return Status::InvalidArgument("wrong size for default value");
+ }
+ }
+
+ if (col_delta.new_name) {
+ name_ = *col_delta.new_name;
+ }
+
+ if (col_delta.default_value) {
+ const void* value = type_info()->physical_type() == BINARY ?
+ reinterpret_cast<const void *>(&col_delta.default_value.get()) :
+ reinterpret_cast<const void *>(col_delta.default_value->data());
+ write_default_ = std::make_shared<Variant>(type_info()->type(), value);
+ }
+
+ if (col_delta.remove_default) {
+ write_default_ = nullptr;
+ }
+
+ if (col_delta.encoding) {
+ attributes_.encoding = *col_delta.encoding;
+ }
+ if (col_delta.compression) {
+ attributes_.compression = *col_delta.compression;
+ }
+ if (col_delta.cfile_block_size) {
+ attributes_.cfile_block_size = *col_delta.cfile_block_size;
+ }
+ return Status::OK();
+}
+
+// TODO(wdb): include attributes_.ToString() -- need to fix unit tests
// first
string ColumnSchema::ToString() const {
return strings::Substitute("$0[$1]",
@@ -417,8 +453,8 @@ Status SchemaBuilder::AddColumn(const string& name,
}
Status SchemaBuilder::RemoveColumn(const string& name) {
- unordered_set<string>::const_iterator it_names;
- if ((it_names = col_names_.find(name)) == col_names_.end()) {
+ unordered_set<string>::const_iterator it_names = col_names_.find(name);
+ if (it_names == col_names_.end()) {
return Status::NotFound("The specified column does not exist", name);
}
@@ -439,19 +475,18 @@ Status SchemaBuilder::RemoveColumn(const string& name) {
}
Status SchemaBuilder::RenameColumn(const string& old_name, const string& new_name) {
- unordered_set<string>::const_iterator it_names;
-
// check if 'new_name' is already in use
- if ((it_names = col_names_.find(new_name)) != col_names_.end()) {
+ if (col_names_.find(new_name) != col_names_.end()) {
return Status::AlreadyPresent("The column already exists", new_name);
}
// check if the 'old_name' column exists
- if ((it_names = col_names_.find(old_name)) == col_names_.end()) {
+ unordered_set<string>::const_iterator it_names = col_names_.find(old_name);
+ if (it_names == col_names_.end()) {
return Status::NotFound("The specified column does not exist", old_name);
}
- col_names_.erase(it_names); // TODO: Should this one stay and marked as alias?
+ col_names_.erase(it_names); // TODO(wdb): Should this one stay and marked as alias?
col_names_.insert(new_name);
for (ColumnSchema& col_schema : cols_) {
@@ -484,4 +519,31 @@ Status SchemaBuilder::AddColumn(const ColumnSchema& column, bool is_key) {
return Status::OK();
}
+Status SchemaBuilder::ApplyColumnSchemaDelta(const ColumnSchemaDelta& col_delta) {
+ // if the column will be renamed, check if 'new_name' is already in use
+ if (col_delta.new_name && ContainsKey(col_names_, *col_delta.new_name)) {
+ return Status::AlreadyPresent("The column already exists", *col_delta.new_name);
+ }
+
+ // check if the column exists
+ unordered_set<string>::const_iterator it_names = col_names_.find(col_delta.name);
+ if (it_names == col_names_.end()) {
+ return Status::NotFound("The specified column does not exist", col_delta.name);
+ }
+
+ for (ColumnSchema& col_schema : cols_) {
+ if (col_delta.name == col_schema.name()) {
+ RETURN_NOT_OK(col_schema.ApplyDelta(col_delta));
+ if (col_delta.new_name) {
+ // TODO(wdb): Should the old one stay, marked as an alias?
+ col_names_.erase(it_names);
+ col_names_.insert(*col_delta.new_name);
+ }
+ return Status::OK();
+ }
+ }
+
+ LOG(FATAL) << "Should not reach here";
+}
+
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/a17efaa3/src/kudu/common/schema.h
----------------------------------------------------------------------
diff --git a/src/kudu/common/schema.h b/src/kudu/common/schema.h
index e794e11..c94c4e9 100644
--- a/src/kudu/common/schema.h
+++ b/src/kudu/common/schema.h
@@ -17,6 +17,7 @@
#ifndef KUDU_COMMON_SCHEMA_H
#define KUDU_COMMON_SCHEMA_H
+#include <boost/optional.hpp>
#include <functional>
#include <glog/logging.h>
#include <memory>
@@ -104,6 +105,37 @@ struct ColumnStorageAttributes {
int32_t cfile_block_size;
};
+// A struct representing changes to a ColumnSchema.
+//
+// In the future, as more complex alter operations need to be supported,
+// this may evolve into a class, but for now it's just POD.
+struct ColumnSchemaDelta {
+public:
+ explicit ColumnSchemaDelta(std::string name)
+ : name(std::move(name)),
+ remove_default(false) {
+ }
+
+ const std::string name;
+
+ boost::optional<std::string> new_name;
+
+ // NB: these properties of a column cannot be changed yet,
+ // ergo type and nullable should always be empty.
+ // TODO(wdberkeley) allow changing of type and nullability
+ boost::optional<DataType> type;
+
+ boost::optional<bool> nullable;
+
+ boost::optional<Slice> default_value;
+
+ bool remove_default;
+
+ boost::optional<EncodingType> encoding;
+ boost::optional<CompressionType> compression;
+ boost::optional<int32_t> cfile_block_size;
+};
+
// The schema for a given column.
//
// Holds the data type as well as information about nullability & column name.
@@ -138,7 +170,6 @@ class ColumnSchema {
if (write_default == read_default) {
write_default_ = read_default_;
} else if (write_default != NULL) {
- DCHECK(read_default != NULL) << "Must have a read default";
write_default_.reset(new Variant(type, write_default));
}
}
@@ -247,6 +278,11 @@ class ColumnSchema {
return true;
}
+ // Apply a ColumnSchemaDelta to this column schema, altering it according to
+ // the changes in the delta.
+ // The original column schema is changed only if the method returns OK.
+ Status ApplyDelta(const ColumnSchemaDelta& col_delta);
+
// Returns extended attributes (such as encoding, compression, etc...)
// associated with the column schema. The reason they are kept in a separate
// struct is so that in the future, they may be moved out to a more
@@ -869,8 +905,11 @@ class SchemaBuilder {
const void *write_default);
Status RemoveColumn(const string& name);
+
Status RenameColumn(const string& old_name, const string& new_name);
+ Status ApplyColumnSchemaDelta(const ColumnSchemaDelta& col_delta);
+
private:
DISALLOW_COPY_AND_ASSIGN(SchemaBuilder);
http://git-wip-us.apache.org/repos/asf/kudu/blob/a17efaa3/src/kudu/common/wire_protocol.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/wire_protocol.cc b/src/kudu/common/wire_protocol.cc
index 62a20a5..6b4d078 100644
--- a/src/kudu/common/wire_protocol.cc
+++ b/src/kudu/common/wire_protocol.cc
@@ -17,6 +17,7 @@
#include "kudu/common/wire_protocol.h"
+#include <boost/optional.hpp>
#include <string>
#include <vector>
@@ -256,6 +257,53 @@ ColumnSchema ColumnSchemaFromPB(const ColumnSchemaPB& pb) {
attributes);
}
+void ColumnSchemaDeltaToPB(const ColumnSchemaDelta& col_delta, ColumnSchemaDeltaPB *pb) {
+ pb->Clear();
+ pb->set_name(col_delta.name);
+ if (col_delta.new_name) {
+ pb->set_new_name(*col_delta.new_name);
+ }
+ if (col_delta.default_value) {
+ pb->set_default_value(col_delta.default_value->data(),
+ col_delta.default_value->size());
+ }
+ if (col_delta.remove_default) {
+ pb->set_remove_default(true);
+ }
+ if (col_delta.encoding) {
+ pb->set_encoding(*col_delta.encoding);
+ }
+ if (col_delta.compression) {
+ pb->set_compression(*col_delta.compression);
+ }
+ if (col_delta.cfile_block_size) {
+ pb->set_block_size(*col_delta.cfile_block_size);
+ }
+}
+
+ColumnSchemaDelta ColumnSchemaDeltaFromPB(const ColumnSchemaDeltaPB& pb) {
+ ColumnSchemaDelta col_delta(pb.name());
+ if (pb.has_new_name()) {
+ col_delta.new_name = boost::optional<string>(pb.new_name());
+ }
+ if (pb.has_default_value()) {
+ col_delta.default_value = boost::optional<Slice>(Slice(pb.default_value()));
+ }
+ if (pb.has_remove_default()) {
+ col_delta.remove_default = true;
+ }
+ if (pb.has_encoding()) {
+ col_delta.encoding = boost::optional<EncodingType>(pb.encoding());
+ }
+ if (pb.has_compression()) {
+ col_delta.compression = boost::optional<CompressionType>(pb.compression());
+ }
+ if (pb.has_block_size()) {
+ col_delta.cfile_block_size = boost::optional<int32_t>(pb.block_size());
+ }
+ return col_delta;
+}
+
Status ColumnPBsToSchema(const RepeatedPtrField<ColumnSchemaPB>& column_pbs,
Schema* schema) {
http://git-wip-us.apache.org/repos/asf/kudu/blob/a17efaa3/src/kudu/common/wire_protocol.h
----------------------------------------------------------------------
diff --git a/src/kudu/common/wire_protocol.h b/src/kudu/common/wire_protocol.h
index d771ab8..bd77171 100644
--- a/src/kudu/common/wire_protocol.h
+++ b/src/kudu/common/wire_protocol.h
@@ -32,6 +32,7 @@ namespace kudu {
class Arena;
class ColumnPredicate;
class ColumnSchema;
+struct ColumnSchemaDelta;
class ConstContiguousRow;
class faststring;
class HostPort;
@@ -54,7 +55,14 @@ Status HostPortToPB(const HostPort& host_port, HostPortPB* host_port_pb);
// Returns the HostPort created from the specified protobuf.
Status HostPortFromPB(const HostPortPB& host_port_pb, HostPort* host_port);
-// Adds addresses in 'addrs' to 'pbs'. If an address is a wildcard
+// Convert the column schema delta `col_delta` to protobuf.
+void ColumnSchemaDeltaToPB(const ColumnSchemaDelta& col_delta, ColumnSchemaDeltaPB *pb);
+
+// Return the ColumnSchemaDelta created from the protobuf `pb`.
+// The protobuf must outlive the returned ColumnSchemaDelta.
+ColumnSchemaDelta ColumnSchemaDeltaFromPB(const ColumnSchemaDeltaPB& pb);
+
+ // Adds addresses in 'addrs' to 'pbs'. If an address is a wildcard
// (e.g., "0.0.0.0"), then the local machine's hostname is used in
// its place.
Status AddHostPortPBs(const std::vector<Sockaddr>& addrs,
http://git-wip-us.apache.org/repos/asf/kudu/blob/a17efaa3/src/kudu/integration-tests/alter_table-randomized-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/alter_table-randomized-test.cc b/src/kudu/integration-tests/alter_table-randomized-test.cc
index 7edf0b2..4e125f7 100644
--- a/src/kudu/integration-tests/alter_table-randomized-test.cc
+++ b/src/kudu/integration-tests/alter_table-randomized-test.cc
@@ -36,10 +36,9 @@
namespace kudu {
using client::KuduClient;
-using client::KuduClientBuilder;
using client::KuduColumnSchema;
+using client::KuduColumnStorageAttributes;
using client::KuduError;
-using client::KuduInsert;
using client::KuduScanner;
using client::KuduSchema;
using client::KuduSchemaBuilder;
@@ -61,6 +60,18 @@ using strings::SubstituteAndAppend;
const char* kTableName = "test-table";
const int kMaxColumns = 30;
const uint32_t kMaxRangePartitions = 32;
+const vector<KuduColumnStorageAttributes::CompressionType> kCompressionTypes =
+ { KuduColumnStorageAttributes::NO_COMPRESSION,
+ KuduColumnStorageAttributes::SNAPPY,
+ KuduColumnStorageAttributes::LZ4,
+ KuduColumnStorageAttributes::ZLIB };
+const vector <KuduColumnStorageAttributes::EncodingType> kInt32Encodings =
+ { KuduColumnStorageAttributes::PLAIN_ENCODING,
+ KuduColumnStorageAttributes::RLE,
+ KuduColumnStorageAttributes::BIT_SHUFFLE };
+// A block size of 0 applies the server-side default.
+const vector<int32_t> kBlockSizes = {0, 2 * 1024 * 1024,
+ 4 * 1024 * 1024, 8 * 1024 * 1024};
class AlterTableRandomized : public KuduTest {
public:
@@ -111,6 +122,10 @@ struct RowState {
// We ensure that we never insert or update to this value except in the case of
// NULLable columns.
static const int32_t kNullValue = 0xdeadbeef;
+ // We use this special value to denote default values.
+ // We ensure that we never insert or update to this value except when the
+ // column should be assigned its current default value.
+ static const int32_t kDefaultValue = 0xbabecafe;
vector<pair<string, int32_t>> cols;
string ToString() const {
@@ -137,6 +152,7 @@ struct TableState {
: rand_(SeedRandom()) {
col_names_.push_back("key");
col_nullable_.push_back(false);
+ col_defaults_.push_back(0);
AddRangePartition();
}
@@ -181,7 +197,7 @@ struct TableState {
int32_t key = GetRandomNewRowKey();
int32_t seed = rand_.Next();
- if (seed == RowState::kNullValue) {
+ if (seed == RowState::kNullValue || seed == RowState::kDefaultValue) {
seed++;
}
@@ -191,6 +207,8 @@ struct TableState {
int32_t val;
if (col_nullable_[i] && seed % 2 == 1) {
val = RowState::kNullValue;
+ } else if (seed % 3 == 0) {
+ val = RowState::kDefaultValue;
} else {
val = seed;
}
@@ -205,6 +223,11 @@ struct TableState {
auto r = new RowState;
r->cols = data;
+ for (int i = 1; i < r->cols.size(); i++) {
+ if (r->cols[i].second == RowState::kDefaultValue) {
+ r->cols[i].second = col_defaults_[i];
+ }
+ }
rows_[key].reset(r);
return true;
}
@@ -226,6 +249,7 @@ struct TableState {
void AddColumnWithDefault(const string& name, int32_t def, bool nullable) {
col_names_.push_back(name);
col_nullable_.push_back(nullable);
+ col_defaults_.push_back(def);
for (auto& e : rows_) {
e.second->cols.push_back(make_pair(name, def));
}
@@ -236,19 +260,26 @@ struct TableState {
int index = col_it - col_names_.begin();
col_names_.erase(col_it);
col_nullable_.erase(col_nullable_.begin() + index);
+ col_defaults_.erase(col_defaults_.begin() + index);
for (auto& e : rows_) {
e.second->cols.erase(e.second->cols.begin() + index);
}
}
- void RenameColumn(const string& existing_name, string new_name) {
+ void RenameColumn(const string& existing_name, const string& new_name) {
auto iter = std::find(col_names_.begin(), col_names_.end(), existing_name);
CHECK(iter != col_names_.end());
int index = iter - col_names_.begin();
for (auto& e : rows_) {
e.second->cols[index].first = new_name;
}
- *iter = std::move(new_name);
+ *iter = new_name;
+ }
+
+ void ChangeDefault(const string& name, int32_t new_def) {
+ auto col_it = std::find(col_names_.begin(), col_names_.end(), name);
+ CHECK(col_it != col_names_.end());
+ col_defaults_[col_it - col_names_.begin()] = new_def;
}
pair<int32_t, int32_t> AddRangePartition() {
@@ -294,6 +325,10 @@ struct TableState {
// Has the same length as col_names_.
vector<bool> col_nullable_;
+ // For each column, its current write default.
+ // Has the same length as col_names_.
+ vector<int32_t> col_defaults_;
+
map<int32_t, unique_ptr<RowState>> rows_;
// The lower and upper bounds of all range partitions in the table.
@@ -359,6 +394,7 @@ struct MirrorTable {
for (int i = 1; i < num_columns(); i++) {
int32_t val = rand * i;
if (val == RowState::kNullValue) val++;
+ if (val == RowState::kDefaultValue) val++;
if (ts_.col_nullable_[i] && val % 2 == 1) {
val = RowState::kNullValue;
}
@@ -391,7 +427,7 @@ struct MirrorTable {
int step_count = 1 + ts_.rand_.Uniform(10);
for (int step = 0; step < step_count; step++) {
- int r = ts_.rand_.Uniform(6);
+ int r = ts_.rand_.Uniform(9);
if (r < 1 && num_columns() < kMaxColumns) {
AddAColumn(table_alterer.get());
} else if (r < 2 && num_columns() > 1) {
@@ -403,6 +439,12 @@ struct MirrorTable {
RenameAColumn(table_alterer.get());
} else if (r < 5 && num_columns() > 1) {
RenamePrimaryKeyColumn(table_alterer.get());
+ } else if (r < 6 && num_columns() > 1) {
+ RenameAColumn(table_alterer.get());
+ } else if (r < 7 && num_columns() > 1) {
+ ChangeADefault(table_alterer.get());
+ } else if (r < 8 && num_columns() > 1) {
+ ChangeAStorageAttribute(table_alterer.get());
} else {
DropARangePartition(schema, table_alterer.get());
}
@@ -445,14 +487,46 @@ struct MirrorTable {
string new_name = ts_.GetRandomNewColumnName();
LOG(INFO) << "Renaming column " << original_name << " to " << new_name;
table_alterer->AlterColumn(original_name)->RenameTo(new_name);
- ts_.RenameColumn(original_name, std::move(new_name));
+ ts_.RenameColumn(original_name, new_name);
}
void RenamePrimaryKeyColumn(KuduTableAlterer* table_alterer) {
string new_name = ts_.GetRandomNewColumnName();
LOG(INFO) << "Renaming PrimaryKey column " << ts_.col_names_[0] << " to " << new_name;
table_alterer->AlterColumn(ts_.col_names_[0])->RenameTo(new_name);
- ts_.RenameColumn(ts_.col_names_[0], std::move(new_name));
+ ts_.RenameColumn(ts_.col_names_[0], new_name);
+ }
+
+ void ChangeADefault(KuduTableAlterer* table_alterer) {
+ string name = ts_.GetRandomExistingColumnName();
+ int32_t new_def = ts_.rand_.Next();
+ if (new_def == RowState::kNullValue || new_def == RowState::kDefaultValue) {
+ new_def++;
+ }
+ LOG(INFO) << "Changing default of column " << name << " to " << new_def;
+ table_alterer->AlterColumn(name)->Default(KuduValue::FromInt(new_def));
+ ts_.ChangeDefault(name, new_def);
+ }
+
+ void ChangeAStorageAttribute(KuduTableAlterer* table_alterer) {
+ string name = ts_.GetRandomExistingColumnName();
+ int type = ts_.rand_.Uniform(3);
+ if (type == 0) {
+ int i = ts_.rand_.Uniform(kCompressionTypes.size());
+ LOG(INFO) << "Changing compression of column " << name <<
+ " to " << kCompressionTypes[i];
+ table_alterer->AlterColumn(name)->Compression(kCompressionTypes[i]);
+ } else if (type == 1) {
+ int i = ts_.rand_.Uniform(kInt32Encodings.size());
+ LOG(INFO) << "Changing encoding of column " << name <<
+ " to " << kInt32Encodings[i];
+ table_alterer->AlterColumn(name)->Encoding(kInt32Encodings[i]);
+ } else {
+ int i = ts_.rand_.Uniform(kBlockSizes.size());
+ LOG(INFO) << "Changing block size of column " << name <<
+ " to " << kBlockSizes[i];
+ table_alterer->AlterColumn(name)->BlockSize(kBlockSizes[i]);
+ }
}
void AddARangePartition(const KuduSchema& schema, KuduTableAlterer* table_alterer) {
@@ -556,9 +630,16 @@ struct MirrorTable {
case UPDATE: op.reset(table->NewUpdate()); break;
case DELETE: op.reset(table->NewDelete()); break;
}
- for (const auto& d : data) {
+ for (int i = 0; i < data.size(); i++) {
+ const auto& d = data[i];
if (d.second == RowState::kNullValue) {
CHECK_OK(op->mutable_row()->SetNull(d.first));
+ } else if (d.second == RowState::kDefaultValue) {
+ if (ts_.col_defaults_[i] == RowState::kNullValue) {
+ CHECK_OK(op->mutable_row()->SetNull(d.first));
+ } else {
+ CHECK_OK(op->mutable_row()->SetInt32(d.first, ts_.col_defaults_[i]));
+ }
} else {
CHECK_OK(op->mutable_row()->SetInt32(d.first, d.second));
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/a17efaa3/src/kudu/integration-tests/alter_table-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/alter_table-test.cc b/src/kudu/integration-tests/alter_table-test.cc
index 8347e44..76ed354 100644
--- a/src/kudu/integration-tests/alter_table-test.cc
+++ b/src/kudu/integration-tests/alter_table-test.cc
@@ -385,6 +385,116 @@ TEST_F(AlterTableTest, TestRenamePrimaryKeyColumn) {
EXPECT_EQ("(int32 pk=33554432, int32 sc=2)", rows[2]);
}
+// Test altering a column to add a default value, change a default value, and
+// remove a default value.
+TEST_F(AlterTableTest, TestAddChangeRemoveColumnDefaultValue) {
+ {
+ unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
+ table_alterer->AddColumn("newInt32")->Type(KuduColumnSchema::INT32);
+ table_alterer->AddColumn("newString")->Type(KuduColumnSchema::STRING);
+ ASSERT_OK(table_alterer->Alter());
+ }
+
+ InsertRows(0, 1);
+ ASSERT_OK(tablet_peer_->tablet()->Flush());
+
+ {
+ unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
+ table_alterer->AlterColumn("newInt32")->Default(KuduValue::FromInt(12345));
+ table_alterer->AlterColumn("newString")->Default(KuduValue::CopyString("taco"));
+ ASSERT_OK(table_alterer->Alter());
+ }
+
+ InsertRows(1, 1);
+ ASSERT_OK(tablet_peer_->tablet()->Flush());
+
+ vector<string> rows;
+ ScanToStrings(&rows);
+ ASSERT_EQ(2, rows.size());
+ EXPECT_EQ("(int32 c0=0, int32 c1=0, int32 newInt32=NULL, string newString=NULL)", rows[0]);
+ EXPECT_EQ("(int32 c0=16777216, int32 c1=1, int32 newInt32=12345, string newString=\"taco\")",
+ rows[1]);
+
+ {
+ unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
+ table_alterer->AlterColumn("newInt32")->Default(KuduValue::FromInt(54321));
+ table_alterer->AlterColumn("newString")->Default(KuduValue::CopyString("ocat"));
+ ASSERT_OK(table_alterer->Alter());
+ }
+
+ InsertRows(2, 1);
+ ASSERT_OK(tablet_peer_->tablet()->Flush());
+
+ ScanToStrings(&rows);
+ ASSERT_EQ(3, rows.size());
+ EXPECT_EQ("(int32 c0=0, int32 c1=0, int32 newInt32=NULL, string newString=NULL)", rows[0]);
+ EXPECT_EQ("(int32 c0=16777216, int32 c1=1, int32 newInt32=12345, string newString=\"taco\")",
+ rows[1]);
+ EXPECT_EQ("(int32 c0=33554432, int32 c1=2, int32 newInt32=54321, string newString=\"ocat\")",
+ rows[2]);
+
+ {
+ unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
+ table_alterer->AlterColumn("newInt32")->RemoveDefault();
+ table_alterer->AlterColumn("newString")->RemoveDefault();
+ // Add an extra rename step
+ table_alterer->AlterColumn("newString")->RenameTo("newNewString");
+ ASSERT_OK(table_alterer->Alter());
+ }
+
+ InsertRows(3, 1);
+ ASSERT_OK(tablet_peer_->tablet()->Flush());
+
+ ScanToStrings(&rows);
+ ASSERT_EQ(4, rows.size());
+ EXPECT_EQ("(int32 c0=0, int32 c1=0, int32 newInt32=NULL, string newNewString=NULL)", rows[0]);
+ EXPECT_EQ("(int32 c0=16777216, int32 c1=1, int32 newInt32=12345, string newNewString=\"taco\")",
+ rows[1]);
+ EXPECT_EQ("(int32 c0=33554432, int32 c1=2, int32 newInt32=54321, string newNewString=\"ocat\")",
+ rows[2]);
+ EXPECT_EQ("(int32 c0=50331648, int32 c1=3, int32 newInt32=NULL, string newNewString=NULL)",
+ rows[3]);
+
+ {
+ unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
+ table_alterer->DropColumn("newInt32");
+ table_alterer->DropColumn("newNewString");
+ ASSERT_OK(table_alterer->Alter());
+ }
+}
+
+// Test for a bug seen when an alter table creates an empty RLE block and the
+// block is subsequently scanned.
+TEST_F(AlterTableTest, TestAlterEmptyRLEBlock) {
+ // Add an RLE-encoded column
+ {
+ unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
+ table_alterer->AddColumn("rle")->Type(KuduColumnSchema::INT32)
+ ->Encoding(client::KuduColumnStorageAttributes::RLE);
+ ASSERT_OK(table_alterer->Alter());
+ }
+
+ // Insert some rows
+ InsertRows(0, 3);
+
+ // Now alter the column
+ {
+ unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
+ table_alterer->AlterColumn("rle")->RenameTo("new_rle");
+ ASSERT_OK(table_alterer->Alter());
+ }
+
+ // Now scan the table, which would trigger a CHECK failure
+ // on trying to seek to position 0 in an empty RLE block
+ {
+ vector<string> rows;
+ ScanToStrings(&rows);
+ EXPECT_EQ("(int32 c0=0, int32 c1=0, int32 new_rle=NULL)", rows[0]);
+ EXPECT_EQ("(int32 c0=16777216, int32 c1=1, int32 new_rle=NULL)", rows[1]);
+ EXPECT_EQ("(int32 c0=33554432, int32 c1=2, int32 new_rle=NULL)", rows[2]);
+ }
+}
+
// Verify that, if a tablet server is down when an alter command is issued,
// it will eventually receive the command when it restarts.
TEST_F(AlterTableTest, TestAlterOnTSRestart) {
http://git-wip-us.apache.org/repos/asf/kudu/blob/a17efaa3/src/kudu/master/catalog_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 8d51b93..a19273c 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -1580,13 +1580,14 @@ Status CatalogManager::ApplyAlterSchemaSteps(const SysTablesEntryPB& current_pb,
}
if (cur_schema.is_key_column(step.drop_column().name())) {
- return Status::InvalidArgument("cannot remove a key column");
+ return Status::InvalidArgument("cannot remove a key column",
+ step.drop_column().name());
}
RETURN_NOT_OK(builder.RemoveColumn(step.drop_column().name()));
break;
}
-
+ // Remains for backwards compatibility.
case AlterTableRequestPB::RENAME_COLUMN: {
if (!step.has_rename_column()) {
return Status::InvalidArgument("RENAME_COLUMN missing column info");
@@ -1597,9 +1598,14 @@ Status CatalogManager::ApplyAlterSchemaSteps(const SysTablesEntryPB& current_pb,
step.rename_column().new_name()));
break;
}
-
- // TODO: EDIT_COLUMN
-
+ case AlterTableRequestPB::ALTER_COLUMN: {
+ if (!step.has_alter_column()) {
+ return Status::InvalidArgument("ALTER_COLUMN missing column info");
+ }
+ const ColumnSchemaDelta col_delta = ColumnSchemaDeltaFromPB(step.alter_column().delta());
+ RETURN_NOT_OK(builder.ApplyColumnSchemaDelta(col_delta));
+ break;
+ }
default: {
return Status::InvalidArgument("Invalid alter schema step type",
SecureShortDebugString(step));
@@ -1793,7 +1799,8 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
switch (step.type()) {
case AlterTableRequestPB::ADD_COLUMN:
case AlterTableRequestPB::DROP_COLUMN:
- case AlterTableRequestPB::RENAME_COLUMN: {
+ case AlterTableRequestPB::RENAME_COLUMN:
+ case AlterTableRequestPB::ALTER_COLUMN: {
alter_schema_steps.emplace_back(step);
break;
}
@@ -1802,7 +1809,6 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
alter_partitioning_steps.emplace_back(step);
break;
}
- case AlterTableRequestPB::ALTER_COLUMN:
case AlterTableRequestPB::UNKNOWN: {
return Status::InvalidArgument("Invalid alter step type", SecureShortDebugString(step));
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/a17efaa3/src/kudu/master/master.proto
----------------------------------------------------------------------
diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto
index 12b8b5c..9f828af 100644
--- a/src/kudu/master/master.proto
+++ b/src/kudu/master/master.proto
@@ -476,9 +476,6 @@ message AlterTableRequestPB {
ADD_COLUMN = 1;
DROP_COLUMN = 2;
RENAME_COLUMN = 3;
-
- // TODO(KUDU-861): this will subsume RENAME_COLUMN, but not yet implemented
- // on the master side.
ALTER_COLUMN = 4;
ADD_RANGE_PARTITION = 5;
DROP_RANGE_PARTITION = 6;
@@ -493,11 +490,15 @@ message AlterTableRequestPB {
// Name of the column to drop.
required string name = 1;
}
+ // This should eventually be deprecated in favor of AlterColumn.
message RenameColumn {
// Name of the column to rename;
required string old_name = 1;
required string new_name = 2;
}
+ message AlterColumn {
+ optional ColumnSchemaDeltaPB delta = 1;
+ }
message AddRangePartition {
// A set of row operations containing the lower and upper range bound for
// the range partition to add or drop.
@@ -518,6 +519,7 @@ message AlterTableRequestPB {
optional RenameColumn rename_column = 4;
optional AddRangePartition add_range_partition = 5;
optional DropRangePartition drop_range_partition = 6;
+ optional AlterColumn alter_column = 7;
}
required TableIdentifierPB table = 1;
[2/2] kudu git commit: external mini cluster: spawn perf record for
each daemon during Start()
Posted by ad...@apache.org.
external mini cluster: spawn perf record for each daemon during Start()
I want this for the dense node test, but it's not really possible to do from
outside ExternalMiniCluster without missing out on time spent in Start(),
which I was interested in measuring. So here's a generic approach that can
be used by any itest.
I wasn't sure whether this should be configured via EMC option or gflag. I
ended up with the latter because it's not really something a test needs
programmatic access to; it's just something the dev running the test might
want to enable manually.
I also changed the existing perf calls in full_stack-insert-scan-test to use
the new subprocess custom destroy signal. While there I fixed the handling
of "--call-graph"; passing it without "fp" is a syntax error on both el6.6
and Ubuntu 16.04.
Change-Id: I92079f616648788b461d550057b8e23bc9174b71
Reviewed-on: http://gerrit.cloudera.org:8080/6742
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <da...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/28869e08
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/28869e08
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/28869e08
Branch: refs/heads/master
Commit: 28869e08618af83261b9a698d9445cb954967771
Parents: a17efaa
Author: Adar Dembo <ad...@cloudera.com>
Authored: Wed Apr 26 21:25:29 2017 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Tue May 2 01:27:56 2017 +0000
----------------------------------------------------------------------
.../integration-tests/external_mini_cluster.cc | 36 ++++++++++++++
.../integration-tests/external_mini_cluster.h | 5 ++
.../full_stack-insert-scan-test.cc | 52 ++++++++------------
3 files changed, 61 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/28869e08/src/kudu/integration-tests/external_mini_cluster.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/external_mini_cluster.cc b/src/kudu/integration-tests/external_mini_cluster.cc
index 72bae63..d476ef0 100644
--- a/src/kudu/integration-tests/external_mini_cluster.cc
+++ b/src/kudu/integration-tests/external_mini_cluster.cc
@@ -68,6 +68,9 @@ using strings::Substitute;
typedef ListTabletsResponsePB::StatusAndSchemaPB StatusAndSchemaPB;
+DEFINE_bool(perf_record, false,
+ "Whether to run \"perf record --call-graph fp\" on each daemon in the cluster");
+
namespace kudu {
static const char* const kMasterBinaryName = "kudu-master";
@@ -268,6 +271,10 @@ Status ExternalMiniCluster::StartSingleMaster() {
opts.exe = GetBinaryPath(kMasterBinaryName);
opts.data_dir = GetDataPath(daemon_id);
opts.log_dir = GetLogPath(daemon_id);
+ if (FLAGS_perf_record) {
+ opts.perf_record_filename =
+ Substitute("$0/perf-$1.data", opts.log_dir, daemon_id);
+ }
opts.extra_flags = SubstituteInFlags(opts_.extra_master_flags, 0);
scoped_refptr<ExternalMaster> master = new ExternalMaster(opts);
if (opts_.enable_kerberos) {
@@ -306,6 +313,10 @@ Status ExternalMiniCluster::StartDistributedMasters() {
opts.exe = exe;
opts.data_dir = GetDataPath(daemon_id);
opts.log_dir = GetLogPath(daemon_id);
+ if (FLAGS_perf_record) {
+ opts.perf_record_filename =
+ Substitute("$0/perf-$1.data", opts.log_dir, daemon_id);
+ }
opts.extra_flags = SubstituteInFlags(flags, i);
scoped_refptr<ExternalMaster> peer = new ExternalMaster(opts, peer_addrs[i]);
@@ -353,6 +364,10 @@ Status ExternalMiniCluster::AddTabletServer() {
opts.exe = GetBinaryPath(kTabletServerBinaryName);
opts.data_dir = GetDataPath(daemon_id);
opts.log_dir = GetLogPath(daemon_id);
+ if (FLAGS_perf_record) {
+ opts.perf_record_filename =
+ Substitute("$0/perf-$1.data", opts.log_dir, daemon_id);
+ }
opts.extra_flags = SubstituteInFlags(opts_.extra_tserver_flags, idx);
scoped_refptr<ExternalTabletServer> ts =
@@ -600,6 +615,7 @@ ExternalDaemon::ExternalDaemon(ExternalDaemonOptions opts)
: messenger_(std::move(opts.messenger)),
data_dir_(std::move(opts.data_dir)),
log_dir_(std::move(opts.log_dir)),
+ perf_record_filename_(std::move(opts.perf_record_filename)),
logtostderr_(opts.logtostderr),
exe_(std::move(opts.exe)),
extra_flags_(std::move(opts.extra_flags)) {}
@@ -690,6 +706,7 @@ Status ExternalDaemon::StartProcess(const vector<string>& user_flags) {
// the previous info file if it's there.
ignore_result(Env::Default()->DeleteFile(info_path));
+ // Start the daemon.
gscoped_ptr<Subprocess> p(new Subprocess(argv));
p->ShareParentStdout(false);
p->SetEnvVars(extra_env_);
@@ -700,6 +717,22 @@ Status ExternalDaemon::StartProcess(const vector<string>& user_flags) {
RETURN_NOT_OK_PREPEND(p->Start(),
Substitute("Failed to start subprocess $0", exe_));
+ // If requested, start a monitoring subprocess.
+ unique_ptr<Subprocess> perf_record;
+ if (!perf_record_filename_.empty()) {
+ perf_record.reset(new Subprocess({
+ "perf",
+ "record",
+ "--call-graph",
+ "fp",
+ "-o",
+ perf_record_filename_,
+ Substitute("--pid=$0", p->pid())
+ }, SIGINT));
+ RETURN_NOT_OK_PREPEND(perf_record->Start(),
+ "Could not start perf record subprocess");
+ }
+
// The process is now starting -- wait for the bound port info to show up.
Stopwatch sw;
sw.start();
@@ -721,6 +754,7 @@ Status ExternalDaemon::StartProcess(const vector<string>& user_flags) {
// and exit as if it had succeeded.
if (WIFEXITED(wait_status) && WEXITSTATUS(wait_status) == fault_injection::kExitStatus) {
process_.swap(p);
+ perf_record_process_.swap(perf_record);
return Status::OK();
}
@@ -744,6 +778,7 @@ Status ExternalDaemon::StartProcess(const vector<string>& user_flags) {
VLOG(1) << exe_ << " instance information:\n" << SecureDebugString(*status_);
process_.swap(p);
+ perf_record_process_.swap(perf_record);
return Status::OK();
}
@@ -866,6 +901,7 @@ void ExternalDaemon::Shutdown() {
WARN_NOT_OK(process_->Wait(), "Waiting on " + exe_);
paused_ = false;
process_.reset();
+ perf_record_process_.reset();
}
void ExternalDaemon::FlushCoverage() {
http://git-wip-us.apache.org/repos/asf/kudu/blob/28869e08/src/kudu/integration-tests/external_mini_cluster.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/external_mini_cluster.h b/src/kudu/integration-tests/external_mini_cluster.h
index b910d35..c3d7106 100644
--- a/src/kudu/integration-tests/external_mini_cluster.h
+++ b/src/kudu/integration-tests/external_mini_cluster.h
@@ -43,6 +43,7 @@ class HostPort;
class MetricPrototype;
class MetricEntityPrototype;
class NodeInstancePB;
+class ScopedSubprocess;
class Sockaddr;
class Subprocess;
@@ -326,6 +327,7 @@ struct ExternalDaemonOptions {
std::string exe;
std::string data_dir;
std::string log_dir;
+ std::string perf_record_filename;
std::vector<std::string> extra_flags;
};
@@ -456,6 +458,7 @@ class ExternalDaemon : public RefCountedThreadSafe<ExternalDaemon> {
const std::shared_ptr<rpc::Messenger> messenger_;
const std::string data_dir_;
const std::string log_dir_;
+ const std::string perf_record_filename_;
const bool logtostderr_;
std::string exe_;
std::vector<std::string> extra_flags_;
@@ -464,6 +467,8 @@ class ExternalDaemon : public RefCountedThreadSafe<ExternalDaemon> {
gscoped_ptr<Subprocess> process_;
bool paused_ = false;
+ std::unique_ptr<Subprocess> perf_record_process_;
+
gscoped_ptr<server::ServerStatusPB> status_;
std::string rpc_bind_address_;
http://git-wip-us.apache.org/repos/asf/kudu/blob/28869e08/src/kudu/integration-tests/full_stack-insert-scan-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/full_stack-insert-scan-test.cc b/src/kudu/integration-tests/full_stack-insert-scan-test.cc
index cf83dba..eab7aff 100644
--- a/src/kudu/integration-tests/full_stack-insert-scan-test.cc
+++ b/src/kudu/integration-tests/full_stack-insert-scan-test.cc
@@ -15,15 +15,17 @@
// specific language governing permissions and limitations
// under the License.
+#include <signal.h>
+
#include <cmath>
#include <cstdlib>
-#include <gflags/gflags.h>
-#include <glog/logging.h>
#include <memory>
-#include <signal.h>
#include <string>
#include <vector>
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+
#include "kudu/client/callbacks.h"
#include "kudu/client/client.h"
#include "kudu/client/client-test-util.h"
@@ -67,13 +69,15 @@ DEFINE_int32(rows_per_batch, -1, "Number of rows per client batch");
// Perf-related FLAGS_perf_stat
DEFINE_bool(perf_record_scan, false, "Call \"perf record --call-graph\" "
"for the duration of the scan, disabled by default");
-DEFINE_bool(perf_stat_scan, false, "Print \"perf stat\" results during"
+DEFINE_bool(perf_record_scan_callgraph, false,
+ "Only applicable with --perf_record_scan, provides argument "
+ "\"--call-graph fp\"");
+DEFINE_bool(perf_stat_scan, false, "Print \"perf stat\" results during "
"scan to stdout, disabled by default");
-DEFINE_bool(perf_fp_flag, false, "Only applicable with --perf_record_scan,"
- " provides argument \"fp\" to the --call-graph flag");
DECLARE_bool(enable_maintenance_manager);
using std::string;
+using std::unique_ptr;
using std::vector;
namespace kudu {
@@ -215,33 +219,20 @@ class FullStackInsertScanTest : public KuduTest {
namespace {
-gscoped_ptr<Subprocess> MakePerfStat() {
- if (!FLAGS_perf_stat_scan) return gscoped_ptr<Subprocess>();
+unique_ptr<Subprocess> MakePerfStat() {
+ if (!FLAGS_perf_stat_scan) return unique_ptr<Subprocess>(nullptr);
// No output flag for perf-stat 2.x, just print to output
string cmd = Substitute("perf stat --pid=$0", getpid());
LOG(INFO) << "Calling: \"" << cmd << "\"";
- return gscoped_ptr<Subprocess>(new Subprocess(Split(cmd, " ")));
+ return unique_ptr<Subprocess>(new Subprocess(Split(cmd, " "), SIGINT));
}
-gscoped_ptr<Subprocess> MakePerfRecord() {
- if (!FLAGS_perf_record_scan) return gscoped_ptr<Subprocess>();
- string cmd = Substitute("perf record --pid=$0 --call-graph", getpid());
- if (FLAGS_perf_fp_flag) cmd += " fp";
+unique_ptr<Subprocess> MakePerfRecord() {
+ if (!FLAGS_perf_record_scan) return unique_ptr<Subprocess>(nullptr);
+ string cmd = Substitute("perf record --pid=$0", getpid());
+ if (FLAGS_perf_record_scan_callgraph) cmd += " --call-graph fp";
LOG(INFO) << "Calling: \"" << cmd << "\"";
- return gscoped_ptr<Subprocess>(new Subprocess(Split(cmd, " ")));
-}
-
-void InterruptNotNull(gscoped_ptr<Subprocess> sub) {
- if (!sub) return;
-
- ASSERT_OK(sub->Kill(SIGINT));
- ASSERT_OK(sub->Wait());
- int exit_status;
- string exit_info_str;
- ASSERT_OK(sub->GetExitStatus(&exit_status, &exit_info_str));
- if (exit_status != 0) {
- LOG(WARNING) << exit_info_str;
- }
+ return unique_ptr<Subprocess>(new Subprocess(Split(cmd, " "), SIGINT));
}
// If key is approximately at an even multiple of 1/10 of the way between
@@ -312,11 +303,11 @@ void FullStackInsertScanTest::DoTestScans() {
}
LOG(INFO) << "Doing test scans on table of " << kNumRows << " rows.";
- gscoped_ptr<Subprocess> stat = MakePerfStat();
+ unique_ptr<Subprocess> stat = MakePerfRecord();
if (stat) {
ASSERT_OK(stat->Start());
}
- gscoped_ptr<Subprocess> record = MakePerfRecord();
+ unique_ptr<Subprocess> record = MakePerfStat();
if (record) {
ASSERT_OK(record->Start());
}
@@ -327,9 +318,6 @@ void FullStackInsertScanTest::DoTestScans() {
NO_FATALS(ScanProjection(StringColumnNames(), "String projection, 1 col"));
NO_FATALS(ScanProjection(Int32ColumnNames(), "Int32 projection, 4 col"));
NO_FATALS(ScanProjection(Int64ColumnNames(), "Int64 projection, 4 col"));
-
- NO_FATALS(InterruptNotNull(std::move(record)));
- NO_FATALS(InterruptNotNull(std::move(stat)));
}
void FullStackInsertScanTest::FlushToDisk() {