You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2018/12/12 23:47:46 UTC

kudu git commit: [Java] Add a Schema and Data Generator

Repository: kudu
Updated Branches:
  refs/heads/master 93211b34c -> 5395dce78


[Java] Add a Schema and Data Generator

This patch adds schema and data generator utility
classes that can be used to create random tables and
random data. These utilities are useful in fuzz tests
and for various load and scale test applications.

The initial implementation is inteneded to be
fairly flexble without being overengineered.
Follow on patches will improve the API and options.

The classes are currently marked private, but could be
changed in the future.

Change-Id: I750d2d346c3eeb7075b21c3fec0fd25236da4f56
Reviewed-on: http://gerrit.cloudera.org:8080/12061
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/5395dce7
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/5395dce7
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/5395dce7

Branch: refs/heads/master
Commit: 5395dce782d16094c006d5bc023e4c457e5b8bb4
Parents: 93211b3
Author: Grant Henke <gr...@apache.org>
Authored: Mon Dec 10 14:54:41 2018 -0600
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Wed Dec 12 23:45:59 2018 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/backup/TestKuduBackup.scala | 170 +-------
 .../java/org/apache/kudu/client/PartialRow.java |   2 +-
 .../org/apache/kudu/util/DataGenerator.java     | 211 ++++++++++
 .../org/apache/kudu/util/SchemaGenerator.java   | 391 +++++++++++++++++++
 4 files changed, 622 insertions(+), 152 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/5395dce7/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
----------------------------------------------------------------------
diff --git a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
index fda555a..d1fe486 100644
--- a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
+++ b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
@@ -22,9 +22,6 @@ import java.util
 
 import com.google.common.base.Objects
 import org.apache.commons.io.FileUtils
-import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder
-import org.apache.kudu.ColumnSchema.CompressionAlgorithm
-import org.apache.kudu.ColumnSchema.Encoding
 import org.apache.kudu.client.PartitionSchema.HashBucketSchema
 import org.apache.kudu.client.CreateTableOptions
 import org.apache.kudu.client.KuduTable
@@ -35,8 +32,11 @@ import org.apache.kudu.Schema
 import org.apache.kudu.Type
 import org.apache.kudu.spark.kudu._
 import org.apache.kudu.test.RandomUtils
+import org.apache.kudu.util.DataGenerator.DataGeneratorBuilder
+import org.apache.kudu.util.DataGenerator
 import org.apache.kudu.util.DecimalUtil
 import org.apache.kudu.util.HybridTimeUtil
+import org.apache.kudu.util.SchemaGenerator.SchemaGeneratorBuilder
 import org.junit.Assert._
 import org.junit.Before
 import org.junit.Test
@@ -49,11 +49,11 @@ import scala.util.Random
 class TestKuduBackup extends KuduTestSuite {
   val log: Logger = LoggerFactory.getLogger(getClass)
 
-  var random: Random = _
+  var random: util.Random = _
 
   @Before
   def setUp(): Unit = {
-    random = Random.javaRandomToRandom(RandomUtils.getRandom)
+    random = RandomUtils.getRandom
   }
 
   @Test
@@ -190,164 +190,32 @@ class TestKuduBackup extends KuduTestSuite {
     Objects.equal(before.getSeed, after.getSeed)
   }
 
-  // TODO: Move to a test utility in kudu-client since it's generally useful.
   def createRandomTable(): KuduTable = {
     val columnCount = random.nextInt(50) + 1 // At least one column.
-    val keyCount = random.nextInt(columnCount) + 1 // At least one key.
-
-    val types = Type.values()
-    val keyTypes = types.filter { t =>
-      !Array(Type.BOOL, Type.FLOAT, Type.DOUBLE).contains(t)
-    }
-    val compressions =
-      CompressionAlgorithm.values().filter(_ != CompressionAlgorithm.UNKNOWN)
-    val blockSizes = Array(0, 4096, 524288, 1048576) // Default, min, middle, max.
-
-    val columns = (0 until columnCount).map { i =>
-      val key = i < keyCount
-      val t = if (key) {
-        keyTypes(random.nextInt(keyTypes.length))
-      } else {
-        types(random.nextInt(types.length))
-      }
-      val precision = random.nextInt(DecimalUtil.MAX_DECIMAL_PRECISION) + 1
-      val scale = random.nextInt(precision)
-      val typeAttributes = DecimalUtil.typeAttributes(precision, scale)
-      val nullable = random.nextBoolean() && !key
-      val compression = compressions(random.nextInt(compressions.length))
-      val blockSize = blockSizes(random.nextInt(blockSizes.length))
-      val encodings = t match {
-        case Type.INT8 | Type.INT16 | Type.INT32 | Type.INT64 | Type.UNIXTIME_MICROS =>
-          Array(Encoding.AUTO_ENCODING, Encoding.PLAIN_ENCODING, Encoding.BIT_SHUFFLE, Encoding.RLE)
-        case Type.FLOAT | Type.DOUBLE | Type.DECIMAL =>
-          Array(Encoding.AUTO_ENCODING, Encoding.PLAIN_ENCODING, Encoding.BIT_SHUFFLE)
-        case Type.STRING | Type.BINARY =>
-          Array(
-            Encoding.AUTO_ENCODING,
-            Encoding.PLAIN_ENCODING,
-            Encoding.PREFIX_ENCODING,
-            Encoding.DICT_ENCODING)
-        case Type.BOOL =>
-          Array(Encoding.AUTO_ENCODING, Encoding.PLAIN_ENCODING, Encoding.RLE)
-        case _ => throw new IllegalArgumentException(s"Unsupported type $t")
-      }
-      val encoding = encodings(random.nextInt(encodings.length))
-
-      val builder = new ColumnSchemaBuilder(s"${t.getName}-$i", t)
-        .key(key)
-        .nullable(nullable)
-        .compressionAlgorithm(compression)
-        .desiredBlockSize(blockSize)
-        .encoding(encoding)
-      // Add type attributes to decimal columns.
-      if (t == Type.DECIMAL) {
-        builder.typeAttributes(typeAttributes)
-      }
-      // Half the columns have defaults.
-      if (random.nextBoolean()) {
-        val defaultValue =
-          t match {
-            case Type.BOOL => random.nextBoolean()
-            case Type.INT8 => random.nextInt(Byte.MaxValue).asInstanceOf[Byte]
-            case Type.INT16 =>
-              random.nextInt(Short.MaxValue).asInstanceOf[Short]
-            case Type.INT32 => random.nextInt()
-            case Type.INT64 | Type.UNIXTIME_MICROS => random.nextLong()
-            case Type.FLOAT => random.nextFloat()
-            case Type.DOUBLE => random.nextDouble()
-            case Type.DECIMAL =>
-              DecimalUtil
-                .minValue(typeAttributes.getPrecision, typeAttributes.getScale)
-            case Type.STRING => random.nextString(random.nextInt(100))
-            case Type.BINARY =>
-              random.nextString(random.nextInt(100)).getBytes()
-            case _ => throw new IllegalArgumentException(s"Unsupported type $t")
-          }
-        builder.defaultValue(defaultValue)
-      }
-      builder.build()
-    }
-    val keyColumns = columns.filter(_.isKey)
-
-    val schema = new Schema(columns.asJava)
-
-    val options = new CreateTableOptions().setNumReplicas(1)
-    // Add hash partitioning (Max out at 3 levels to avoid being excessive).
-    val hashPartitionLevels = random.nextInt(Math.min(keyCount, 3))
-    (0 to hashPartitionLevels).foreach { level =>
-      val hashColumn = keyColumns(level)
-      val hashBuckets = random.nextInt(8) + 2 // Minimum of 2 hash buckets.
-      val hashSeed = random.nextInt()
-      options.addHashPartitions(List(hashColumn.getName).asJava, hashBuckets, hashSeed)
-    }
-    val hasRangePartition = random.nextBoolean() && keyColumns.exists(_.getType == Type.INT64)
-    if (hasRangePartition) {
-      val rangeColumn = keyColumns.filter(_.getType == Type.INT64).head
-      options.setRangePartitionColumns(List(rangeColumn.getName).asJava)
-      val splits = random.nextInt(8)
-      val used = new util.ArrayList[Long]()
-      var i = 0
-      while (i < splits) {
-        val split = schema.newPartialRow()
-        val value = random.nextLong()
-        if (!used.contains(value)) {
-          used.add(value)
-          split.addLong(rangeColumn.getName, random.nextLong())
-          i = i + 1
-        }
-      }
-    }
-
+    val keyColumnCount = random.nextInt(columnCount) + 1 // At least one key.
+    val schemaGenerator = new SchemaGeneratorBuilder()
+      .random(random)
+      .columnCount(columnCount)
+      .keyColumnCount(keyColumnCount)
+      .build()
+    val schema = schemaGenerator.randomSchema()
+    val options = schemaGenerator.randomCreateTableOptions(schema)
+    options.setNumReplicas(1)
     val name = s"random-${System.currentTimeMillis()}"
     kuduClient.createTable(name, schema, options)
   }
 
   // TODO: Add updates and deletes when incremental backups are supported.
   def loadRandomData(table: KuduTable): IndexedSeq[PartialRow] = {
-    val rowCount = random.nextInt(200)
-
     val kuduSession = kuduClient.newSession()
+    val dataGenerator = new DataGeneratorBuilder()
+      .random(random)
+      .build()
+    val rowCount = random.nextInt(200)
     (0 to rowCount).map { i =>
       val upsert = table.newUpsert()
       val row = upsert.getRow
-      table.getSchema.getColumns.asScala.foreach { col =>
-        // Set nullable columns to null ~10% of the time.
-        if (col.isNullable && random.nextInt(10) == 0) {
-          row.setNull(col.getName)
-        }
-        // Use the column default value  ~10% of the time.
-        if (col.getDefaultValue != null && !col.isKey && random.nextInt(10) == 0) {
-          // Use the default value.
-        } else {
-          col.getType match {
-            case Type.BOOL =>
-              row.addBoolean(col.getName, random.nextBoolean())
-            case Type.INT8 =>
-              row.addByte(col.getName, random.nextInt(Byte.MaxValue).asInstanceOf[Byte])
-            case Type.INT16 =>
-              row.addShort(col.getName, random.nextInt(Short.MaxValue).asInstanceOf[Short])
-            case Type.INT32 =>
-              row.addInt(col.getName, random.nextInt())
-            case Type.INT64 | Type.UNIXTIME_MICROS =>
-              row.addLong(col.getName, random.nextLong())
-            case Type.FLOAT =>
-              row.addFloat(col.getName, random.nextFloat())
-            case Type.DOUBLE =>
-              row.addDouble(col.getName, random.nextDouble())
-            case Type.DECIMAL =>
-              val attributes = col.getTypeAttributes
-              val max = DecimalUtil
-                .maxValue(attributes.getPrecision, attributes.getScale)
-              row.addDecimal(col.getName, max)
-            case Type.STRING =>
-              row.addString(col.getName, random.nextString(random.nextInt(100)))
-            case Type.BINARY =>
-              row.addBinary(col.getName, random.nextString(random.nextInt(100)).getBytes())
-            case _ =>
-              throw new IllegalArgumentException(s"Unsupported type ${col.getType}")
-          }
-        }
-      }
+      dataGenerator.randomizeRow(row)
       kuduSession.apply(upsert)
       row
     }

http://git-wip-us.apache.org/repos/asf/kudu/blob/5395dce7/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java b/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java
index 6984545..585d970 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java
@@ -1516,7 +1516,7 @@ public class PartialRow {
    * Get the schema used for this row.
    * @return a schema that came from KuduTable
    */
-  Schema getSchema() {
+  public Schema getSchema() {
     return schema;
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/5395dce7/java/kudu-client/src/main/java/org/apache/kudu/util/DataGenerator.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/util/DataGenerator.java b/java/kudu-client/src/main/java/org/apache/kudu/util/DataGenerator.java
new file mode 100644
index 0000000..63cff02
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/util/DataGenerator.java
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.util;
+
+import com.google.common.base.Preconditions;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.ColumnTypeAttributes;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.PartialRow;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+import javax.xml.bind.DatatypeConverter;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * A utility class to generate random data and rows.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class DataGenerator {
+
+  private final Random random;
+  private final int stringLength;
+  private final int binaryLength;
+  private final float nullRate;
+  private final float defaultRate;
+
+  private DataGenerator(final Random random,
+                        final int stringLength,
+                        final int binaryLength,
+                        final float nullRate,
+                        final float defaultRate) {
+    this.random = random;
+    this.stringLength = stringLength;
+    this.binaryLength = binaryLength;
+    this.nullRate = nullRate;
+    this.defaultRate = defaultRate;
+  }
+
+  /**
+   * Randomizes the fields in a given PartialRow.
+   * @param row the PartialRow to randomize.
+   */
+  public void randomizeRow(PartialRow row) {
+    Schema schema = row.getSchema();
+    List<ColumnSchema> columns = schema.getColumns();
+    for (int i = 0; i < columns.size(); i++) {
+      ColumnSchema col = columns.get(i);
+      Type type = col.getType();
+      if (col.isNullable() && random.nextFloat() <= nullRate) {
+        // Sometimes set nullable columns to null.
+        row.setNull(i);
+      } else if(col.getDefaultValue() != null && !col.isKey() && random.nextFloat() <= defaultRate) {
+        // Sometimes use the column default value.
+      } else {
+        switch (type) {
+          // TODO(ghenke): Support range bound configuration.
+          case BOOL:
+            row.addBoolean(i, random.nextBoolean()); break;
+          case INT8:
+            row.addByte(i, (byte) random.nextInt()); break;
+          case INT16:
+            row.addShort(i, (short) random.nextInt()); break;
+          case INT32:
+            row.addInt(i, random.nextInt()); break;
+          case INT64:
+          case UNIXTIME_MICROS:
+            row.addLong(i, random.nextLong()); break;
+          case FLOAT:
+            row.addFloat(i, random.nextFloat()); break;
+          case DOUBLE:
+            row.addDouble(i, random.nextDouble()); break;
+          case DECIMAL:
+            row.addDecimal(i, randomDecimal(col.getTypeAttributes(), random)); break;
+          case STRING:
+            row.addString(i, randomString(stringLength, random)); break;
+          case BINARY:
+            row.addBinary(i, randomBinary(binaryLength, random)); break;
+          default:
+            throw new UnsupportedOperationException("Unsupported type " + type);
+        }
+      }
+    }
+  }
+
+  /**
+   * Utility method to return a random decimal value.
+   */
+  public static BigDecimal randomDecimal(ColumnTypeAttributes attributes, Random random) {
+    int numBits = BigInteger.TEN.pow(attributes.getPrecision())
+        .subtract(BigInteger.ONE).bitCount();
+    BigInteger randomUnscaled = new BigInteger(numBits, random);
+    return new BigDecimal(randomUnscaled, attributes.getScale());
+  }
+
+  /**
+   * Utility method to return a random string value.
+   */
+  public static String randomString(int length, Random random) {
+    byte bytes[] = new byte[length];
+    random.nextBytes(bytes);
+    return DatatypeConverter.printBase64Binary(bytes);
+  }
+
+  /**
+   * Utility method to return a random binary value.
+   */
+  public static byte[] randomBinary(int length, Random random) {
+    byte bytes[] = new byte[length];
+    random.nextBytes(bytes);
+    return bytes;
+  }
+
+  /**
+   *  A builder to configure and construct a DataGenerator instance.
+   */
+  public static class DataGeneratorBuilder {
+
+    private Random random = new Random(System.currentTimeMillis());
+    private int stringLength = 128;
+    private int binaryLength = 128;
+    private float nullRate = 0.1f;
+    private float defaultRate = 0.1f;
+
+    public DataGeneratorBuilder() {}
+
+    /**
+     * Define a custom Random instance to use for any random generation.
+     * @return this instance
+     */
+    public DataGeneratorBuilder random(Random random) {
+      this.random = random;
+      return this;
+    }
+
+    /**
+     * Define the length of the data when randomly generating column values for string columns.
+     * @return this instance
+     */
+    public DataGeneratorBuilder stringLength(int stringLength) {
+      this.stringLength = stringLength;
+      return this;
+    }
+
+    /**
+     * Define the length of the data when randomly generating column values for binary columns.
+     * @return this instance
+     */
+    public DataGeneratorBuilder binaryLength(int binaryLength) {
+      this.binaryLength = binaryLength;
+      return this;
+    }
+
+    /**
+     * Define the rate at which null values should be used when randomly generating
+     * column values.
+     * @return this instance
+     */
+    public DataGeneratorBuilder nullRate(float nullRate) {
+      Preconditions.checkArgument(nullRate >= 0f && nullRate <= 1f,
+          "nullRate must be between 0 and 1");
+      this.nullRate = nullRate;
+      return this;
+    }
+
+    /**
+     * Define the rate at which default values should be used when randomly generating
+     * column values.
+     * @return this instance
+     */
+    public DataGeneratorBuilder defaultRate(float defaultRate) {
+      Preconditions.checkArgument(defaultRate >= 0f && defaultRate <= 1f,
+          "defaultRate must be between 0 and 1");
+      this.defaultRate = defaultRate;
+      return this;
+    }
+
+    public DataGenerator build() {
+      return new DataGenerator(
+          random,
+          stringLength,
+          binaryLength,
+          nullRate,
+          defaultRate
+      );
+    }
+  }
+}
+
+
+

http://git-wip-us.apache.org/repos/asf/kudu/blob/5395dce7/java/kudu-client/src/main/java/org/apache/kudu/util/SchemaGenerator.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/util/SchemaGenerator.java b/java/kudu-client/src/main/java/org/apache/kudu/util/SchemaGenerator.java
new file mode 100644
index 0000000..0e945c5
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/util/SchemaGenerator.java
@@ -0,0 +1,391 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.util;
+
+import com.google.common.base.Preconditions;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder;
+import org.apache.kudu.ColumnSchema.CompressionAlgorithm;
+import org.apache.kudu.ColumnSchema.Encoding;
+import org.apache.kudu.ColumnTypeAttributes;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.PartialRow;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import static org.apache.kudu.util.DataGenerator.randomBinary;
+import static org.apache.kudu.util.DataGenerator.randomDecimal;
+import static org.apache.kudu.util.DataGenerator.randomString;
+
+/**
+ * A utility class to generate random schemas and schema components.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class SchemaGenerator {
+
+  // TODO(ghenke): Make string and binary length configurable.
+  private static final int DEFAULT_BINARY_LENGTH = 128;
+  private static final int MIN_HASH_BUCKETS = 2;
+
+  private final Random random;
+  private final int columnCount;
+  private final int keyColumnCount;
+  private final List<Type> types;
+  private final List<Type> keyTypes;
+  private final List<Encoding> encodings;
+  private final List<CompressionAlgorithm> compressions;
+  private final List<Integer> blockSizes;
+  private final Float defaultRate;
+
+  private SchemaGenerator(final Random random,
+                          final int columnCount,
+                          final int keyColumnCount,
+                          final List<Type> types,
+                          final List<Type> keyTypes,
+                          final List<Encoding> encodings,
+                          final List<CompressionAlgorithm> compressions,
+                          final List<Integer> blockSizes,
+                          final Float defaultRate) {
+    this.random = random;
+    this.columnCount = columnCount;
+    this.keyColumnCount = keyColumnCount;
+    this.types = types;
+    this.keyTypes = keyTypes;
+    this.encodings = encodings;
+    this.compressions = compressions;
+    this.blockSizes = blockSizes;
+    this.defaultRate = defaultRate;
+  }
+
+  /**
+   * Generates a random Schema.
+   * @return a random Schema.
+   */
+  public Schema randomSchema() {
+    List<ColumnSchema> columns = new ArrayList<>();
+    for (int i = 0; i < columnCount; i++) {
+      boolean key = i < keyColumnCount;
+      Type colType = randomType(key);
+      String colName = colType.getName() + i;
+      ColumnSchema column = randomColumnSchema(colName, colType, key);
+      columns.add(column);
+    }
+    return new Schema(columns);
+  }
+
+  /**
+   * Generates a random ColumnSchema.
+   * @return a random ColumnSchema.
+   */
+  public ColumnSchema randomColumnSchema(String name, Type type, boolean key) {
+    final ColumnSchemaBuilder builder = new ColumnSchemaBuilder(name, type)
+        .key(key)
+        // TODO(ghenke): Make nullable columns configurable.
+        .nullable(random.nextBoolean() && !key)
+        .compressionAlgorithm(randomCompression())
+        .desiredBlockSize(randomBlockSize())
+        .encoding(randomEncoding(type));
+
+    ColumnTypeAttributes typeAttributes = null;
+    if (type == Type.DECIMAL) {
+      // TODO(ghenke): Make precision and scale configurable.
+      int precision = random.nextInt(DecimalUtil.MAX_DECIMAL_PRECISION) + 1;
+      int scale = random.nextInt(precision);
+      typeAttributes = DecimalUtil.typeAttributes(precision, scale);
+      builder.typeAttributes(typeAttributes);
+    }
+
+    // Sometimes set a column default value.
+    if (random.nextFloat() <= defaultRate) {
+      switch (type) {
+        case BOOL:
+          builder.defaultValue(random.nextBoolean());
+          break;
+        case INT8:
+          builder.defaultValue((byte)random.nextInt());
+          break;
+        case INT16:
+          builder.defaultValue((short)random.nextInt());
+          break;
+        case INT32:
+          builder.defaultValue(random.nextInt());
+          break;
+        case INT64:
+        case UNIXTIME_MICROS:
+          builder.defaultValue(random.nextLong());
+          break;
+        case FLOAT:
+          builder.defaultValue(random.nextFloat());
+          break;
+        case DOUBLE:
+          builder.defaultValue(random.nextDouble());
+          break;
+        case DECIMAL:
+          builder.defaultValue(randomDecimal(typeAttributes, random));
+          break;
+        case STRING:
+          builder.defaultValue(randomString(DEFAULT_BINARY_LENGTH, random));
+          break;
+        case BINARY:
+          builder.defaultValue(randomBinary(DEFAULT_BINARY_LENGTH, random));
+          break;
+        default:
+          throw new UnsupportedOperationException("Unsupported type " + type);
+      }
+    }
+    return builder.build();
+  }
+
+  public int randomBlockSize() {
+    return blockSizes.get(random.nextInt(blockSizes.size()));
+  }
+
+  public CompressionAlgorithm randomCompression() {
+    return compressions.get(random.nextInt(compressions.size()));
+  }
+
+  public Type randomType(boolean key) {
+    if (key) {
+      return keyTypes.get(random.nextInt(keyTypes.size()));
+    } else {
+      return types.get(random.nextInt(types.size()));
+    }
+  }
+
+  public Encoding randomEncoding(Type type) {
+    final List<Encoding> validEncodings = new ArrayList<>(encodings);
+    // Remove the unsupported encodings for the type.
+    switch (type) {
+      case INT8:
+      case INT16:
+      case INT32:
+      case INT64:
+      case UNIXTIME_MICROS:
+        validEncodings.retainAll(Arrays.asList(
+            Encoding.AUTO_ENCODING,
+            Encoding.PLAIN_ENCODING,
+            Encoding.BIT_SHUFFLE,
+            Encoding.RLE));
+        break;
+      case FLOAT:
+      case DOUBLE:
+      case DECIMAL:
+        validEncodings.retainAll(Arrays.asList(
+            Encoding.AUTO_ENCODING,
+            Encoding.PLAIN_ENCODING,
+            Encoding.BIT_SHUFFLE));
+        break;
+      case STRING:
+      case BINARY:
+        validEncodings.retainAll(Arrays.asList(
+            Encoding.AUTO_ENCODING,
+            Encoding.PLAIN_ENCODING,
+            Encoding.PREFIX_ENCODING,
+            Encoding.DICT_ENCODING));
+        break;
+      case BOOL:
+        validEncodings.retainAll(Arrays.asList(
+            Encoding.AUTO_ENCODING,
+            Encoding.PLAIN_ENCODING,
+            Encoding.RLE));
+        break;
+      default: throw new IllegalArgumentException("Unsupported type " + type);
+    }
+
+    if (validEncodings.size() == 0) {
+      throw new IllegalArgumentException("There are no valid encodings for type " + type);
+    }
+
+    return validEncodings.get(random.nextInt(validEncodings.size()));
+  }
+
+  public CreateTableOptions randomCreateTableOptions(Schema schema) {
+    CreateTableOptions options = new CreateTableOptions();
+    final List<ColumnSchema> keyColumns = schema.getPrimaryKeyColumns();
+
+    // Add hash partitioning (Max out at 3 levels to avoid being excessive).
+    int hashPartitionLevels = random.nextInt(Math.min(keyColumns.size(), 3)) + 1;
+    for (int i = 0; i < hashPartitionLevels; i++) {
+      final ColumnSchema hashColumn = keyColumns.get(i);
+      // TODO(ghenke): Make buckets configurable.
+      final int hashBuckets = random.nextInt(8) + MIN_HASH_BUCKETS;
+      final int hashSeed = random.nextInt();
+      options.addHashPartitions(Arrays.asList(hashColumn.getName()), hashBuckets, hashSeed);
+    }
+
+    boolean hasRangePartition = random.nextBoolean();
+    ColumnSchema int64Key = null;
+    for (ColumnSchema col : keyColumns) {
+      if (col.getType() == Type.INT64) {
+        int64Key = col;
+        break;
+      }
+    }
+    // TODO(ghenke): Configurable range partition rate and more supported types.
+    if (hasRangePartition && int64Key != null) {
+      options.setRangePartitionColumns(Arrays.asList(int64Key.getName()));
+      int splits = random.nextInt(8); // TODO(ghenke): Configurable splits.
+      List<Long> used = new ArrayList<>();
+      int i = 0;
+      while (i < splits) {
+        PartialRow split = schema.newPartialRow();
+        long value = random.nextLong();
+        if (!used.contains(value)) {
+          used.add(value);
+          split.addLong(int64Key.getName(), random.nextLong());
+          i++;
+        }
+      }
+    }
+    return options;
+  }
+
+  /**
+   * A builder to configure and construct a SchemaGeneratorBuilder instance.
+   */
+  public static class SchemaGeneratorBuilder {
+
+    private Random random = new Random(System.currentTimeMillis());
+
+    private int columnCount = 10;
+    private int keyColumnCount = 1;
+    private List<Type> types = Arrays.asList(Type.values());
+    private List<Encoding> encodings = new ArrayList<>();
+    private List<CompressionAlgorithm> compressions = new ArrayList<>();
+    // Default, min, middle, max.
+    private List<Integer> blockSizes = Arrays.asList(0, 4096, 524288, 1048576);
+    private float defaultRate = 0.25f;
+
+    public SchemaGeneratorBuilder() {
+      // Add all encoding options and remove any invalid ones.
+      encodings.addAll(Arrays.asList(Encoding.values()));
+      encodings.remove(Encoding.UNKNOWN);
+      // Add all compression options and remove any invalid ones.
+      compressions.addAll(Arrays.asList(CompressionAlgorithm.values()));
+      compressions.remove(CompressionAlgorithm.UNKNOWN);
+    }
+
+    /**
+     * Define a custom Random instance to use for any random generation.
+     * @return this instance
+     */
+    public SchemaGeneratorBuilder random(Random random) {
+      this.random = random;
+      return this;
+    }
+
+    /**
+     * Define the column count of a random schema.
+     * @return this instance
+     */
+    public SchemaGeneratorBuilder columnCount(int columnCount) {
+      Preconditions.checkArgument(columnCount > 0,
+          "columnCount must be greater than 0");
+      this.columnCount = columnCount;
+      return this;
+    }
+
+    /**
+     * Define the key column count of a random schema.
+     * @return this instance
+     */
+    public SchemaGeneratorBuilder keyColumnCount(int keyColumnCount) {
+      Preconditions.checkArgument(columnCount > 0,
+          "keyColumnCount must be greater than 0");
+      this.keyColumnCount = keyColumnCount;
+      return this;
+    }
+
+    /**
+     * Define the types that can be used when randomly generating a column schema.
+     * @return this instance
+     */
+    public SchemaGeneratorBuilder types(Type... types) {
+      this.types = Arrays.asList(types);
+      return this;
+    }
+
+    /**
+     * Define the encoding options that can be used when randomly generating
+     * a column schema.
+     * @return this instance
+     */
+    public SchemaGeneratorBuilder encodings(Encoding... encodings) {
+      this.encodings = Arrays.asList(encodings);
+      return this;
+    }
+
+    /**
+     * Define the compression options that can be used when randomly generating
+     * a column schema.
+     * @return this instance
+     */
+    public SchemaGeneratorBuilder compressions(CompressionAlgorithm... compressions) {
+      this.compressions = Arrays.asList(compressions);
+      return this;
+    }
+
+    /**
+     * Define the rate at which default values should be used when randomly generating
+     * a column schema.
+     * @return this instance
+     */
+    public SchemaGeneratorBuilder defaultRate(float defaultRate) {
+      Preconditions.checkArgument(defaultRate >= 0f && defaultRate <= 1f,
+          "defaultRate must be between 0 and 1");
+      this.defaultRate = defaultRate;
+      return this;
+    }
+
+    public SchemaGenerator build() {
+      Preconditions.checkArgument(keyColumnCount <= columnCount,
+          "keyColumnCount must be less than or equal to the columnCount");
+
+      // Filter the types that are compatible for key columns.
+      List<Type> keyTypes = new ArrayList<>(types);
+      keyTypes.removeAll(Arrays.asList(Type.BOOL, Type.FLOAT, Type.DOUBLE));
+      Preconditions.checkArgument(!keyTypes.isEmpty(),
+          "At least one type must be supported for key columns");
+
+      return new SchemaGenerator(
+          random,
+          columnCount,
+          keyColumnCount,
+          types,
+          keyTypes,
+          encodings,
+          compressions,
+          blockSizes,
+          defaultRate
+      );
+    }
+  }
+
+
+}
+
+
+