You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2018/12/20 19:31:03 UTC

[2/2] kudu git commit: Create parallelized loader Spark job

Create parallelized loader Spark job

This patch adds a new DistributedDataGenerator tool
that can load random or sequential data into an existing
Kudu table.

This tool was written to help test the backup and restore
tools. It is currently marked private but could be made
public in the future.

Change-Id: Ibdfd41a21a7f80d22125c7f4e5ca4ed62c31709d
Reviewed-on: http://gerrit.cloudera.org:8080/12101
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/b3b008bd
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/b3b008bd
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/b3b008bd

Branch: refs/heads/master
Commit: b3b008bda91880ec0577b909572ddd80255508bf
Parents: 97a42f6
Author: Grant Henke <gr...@apache.org>
Authored: Mon Dec 17 14:06:11 2018 -0600
Committer: Grant Henke <gr...@apache.org>
Committed: Thu Dec 20 19:29:37 2018 +0000

----------------------------------------------------------------------
 .../java/org/apache/kudu/util/DecimalUtil.java  |   1 +
 .../org/apache/kudu/util/SchemaGenerator.java   |  62 ++++-
 java/kudu-spark-tools/build.gradle              |   4 +
 .../spark/tools/DistributedDataGenerator.scala  | 245 +++++++++++++++++++
 .../tools/DistributedDataGeneratorTest.scala    |  77 ++++++
 5 files changed, 385 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/b3b008bd/java/kudu-client/src/main/java/org/apache/kudu/util/DecimalUtil.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/util/DecimalUtil.java b/java/kudu-client/src/main/java/org/apache/kudu/util/DecimalUtil.java
index 531329f..d2c7b5a 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/util/DecimalUtil.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/util/DecimalUtil.java
@@ -46,6 +46,7 @@ public class DecimalUtil {
   public static final BigInteger MIN_UNSCALED_DECIMAL128 = MAX_UNSCALED_DECIMAL128.negate();
   public static final int DECIMAL128_SIZE = 128 / Byte.SIZE;
 
+  public static final int MIN_DECIMAL_PRECISION = 1;
   public static final int MAX_DECIMAL_PRECISION = MAX_DECIMAL128_PRECISION;
 
   /**

http://git-wip-us.apache.org/repos/asf/kudu/blob/b3b008bd/java/kudu-client/src/main/java/org/apache/kudu/util/SchemaGenerator.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/util/SchemaGenerator.java b/java/kudu-client/src/main/java/org/apache/kudu/util/SchemaGenerator.java
index 0e945c5..6ac21ec 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/util/SchemaGenerator.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/util/SchemaGenerator.java
@@ -59,6 +59,8 @@ public class SchemaGenerator {
   private final List<CompressionAlgorithm> compressions;
   private final List<Integer> blockSizes;
   private final Float defaultRate;
+  private final int minPrecision;
+  private final int maxPrecision;
 
   private SchemaGenerator(final Random random,
                           final int columnCount,
@@ -68,7 +70,9 @@ public class SchemaGenerator {
                           final List<Encoding> encodings,
                           final List<CompressionAlgorithm> compressions,
                           final List<Integer> blockSizes,
-                          final Float defaultRate) {
+                          final Float defaultRate,
+                          final int minPrecision,
+                          final int maxPrecision) {
     this.random = random;
     this.columnCount = columnCount;
     this.keyColumnCount = keyColumnCount;
@@ -78,6 +82,8 @@ public class SchemaGenerator {
     this.compressions = compressions;
     this.blockSizes = blockSizes;
     this.defaultRate = defaultRate;
+    this.minPrecision = minPrecision;
+    this.maxPrecision = maxPrecision;
   }
 
   /**
@@ -111,8 +117,8 @@ public class SchemaGenerator {
 
     ColumnTypeAttributes typeAttributes = null;
     if (type == Type.DECIMAL) {
-      // TODO(ghenke): Make precision and scale configurable.
-      int precision = random.nextInt(DecimalUtil.MAX_DECIMAL_PRECISION) + 1;
+      int precision = random.nextInt((maxPrecision - minPrecision) + 1) + minPrecision;
+      // TODO(ghenke): Make scale configurable.
       int scale = random.nextInt(precision);
       typeAttributes = DecimalUtil.typeAttributes(precision, scale);
       builder.typeAttributes(typeAttributes);
@@ -278,6 +284,8 @@ public class SchemaGenerator {
     // Default, min, middle, max.
     private List<Integer> blockSizes = Arrays.asList(0, 4096, 524288, 1048576);
     private float defaultRate = 0.25f;
+    private int minPrecision = DecimalUtil.MIN_DECIMAL_PRECISION;
+    private int maxPrecision = DecimalUtil.MAX_DECIMAL_PRECISION;
 
     public SchemaGeneratorBuilder() {
       // Add all encoding options and remove any invalid ones.
@@ -329,6 +337,22 @@ public class SchemaGenerator {
     }
 
     /**
+     * Define the types that can *not* be used when randomly generating a column schema.
+     * @return this instance
+     */
+    public SchemaGeneratorBuilder excludeTypes(Type... types) {
+      List<Type> includedTypes = new ArrayList<>();
+      // Add all possible types.
+      includedTypes.addAll(Arrays.asList(Type.values()));
+      // Remove the excluded types.
+      for (Type type : types) {
+        includedTypes.remove(type);
+      }
+      this.types = includedTypes;
+      return this;
+    }
+
+    /**
      * Define the encoding options that can be used when randomly generating
      * a column schema.
      * @return this instance
@@ -360,6 +384,34 @@ public class SchemaGenerator {
       return this;
     }
 
+    /**
+     * Define the precision value to use when when randomly generating
+     * a column schema with a Decimal type.
+     * @return this instance
+     */
+    public SchemaGeneratorBuilder precision(int precision) {
+      return precisionRange(precision, precision);
+    }
+
+    /**
+     * Define the range of precision values to use when when randomly generating
+     * a column schema with a Decimal type.
+     * @return this instance
+     */
+    public SchemaGeneratorBuilder precisionRange(int minPrecision, int maxPrecision) {
+      Preconditions.checkArgument(minPrecision >= DecimalUtil.MIN_DECIMAL_PRECISION,
+          "minPrecision must be greater than or equal to " +
+              DecimalUtil.MIN_DECIMAL_PRECISION);
+      Preconditions.checkArgument(maxPrecision <= DecimalUtil.MAX_DECIMAL_PRECISION,
+          "maxPrecision must be less than or equal to " +
+              DecimalUtil.MAX_DECIMAL_PRECISION);
+      Preconditions.checkArgument(minPrecision <= maxPrecision,
+          "minPrecision must be less than or equal to " + maxPrecision);
+      this.minPrecision = minPrecision;
+      this.maxPrecision = maxPrecision;
+      return this;
+    }
+
     public SchemaGenerator build() {
       Preconditions.checkArgument(keyColumnCount <= columnCount,
           "keyColumnCount must be less than or equal to the columnCount");
@@ -379,7 +431,9 @@ public class SchemaGenerator {
           encodings,
           compressions,
           blockSizes,
-          defaultRate
+          defaultRate,
+          minPrecision,
+          maxPrecision
       );
     }
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/b3b008bd/java/kudu-spark-tools/build.gradle
----------------------------------------------------------------------
diff --git a/java/kudu-spark-tools/build.gradle b/java/kudu-spark-tools/build.gradle
index fb78e88..58dcb3a 100644
--- a/java/kudu-spark-tools/build.gradle
+++ b/java/kudu-spark-tools/build.gradle
@@ -24,6 +24,10 @@ dependencies {
   compile project(path: ":kudu-spark", configuration: "shadow")
   // TODO(KUDU-2500): Spark uses reflection which requires the annotations at runtime.
   compile libs.yetusAnnotations
+  compile (libs.scopt)  {
+    // Make sure wrong Scala version is not pulled in.
+    exclude group: "org.scala-lang", module: "scala-library"
+  }
 
   provided libs.scalaLibrary
   provided libs.sparkAvro

http://git-wip-us.apache.org/repos/asf/kudu/blob/b3b008bd/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/DistributedDataGenerator.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/DistributedDataGenerator.scala b/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/DistributedDataGenerator.scala
new file mode 100644
index 0000000..2ad4629
--- /dev/null
+++ b/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/DistributedDataGenerator.scala
@@ -0,0 +1,245 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.spark.tools
+
+import java.math.BigDecimal
+import java.math.BigInteger
+import java.nio.charset.StandardCharsets
+
+import org.apache.kudu.Type
+import org.apache.kudu.client.PartialRow
+import org.apache.kudu.client.SessionConfiguration
+import org.apache.kudu.spark.kudu.KuduContext
+import org.apache.kudu.spark.tools.DistributedDataGeneratorOptions._
+import org.apache.kudu.util.DataGenerator
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.util.LongAccumulator
+import org.apache.spark.SparkConf
+import org.apache.spark.SparkContext
+import org.apache.yetus.audience.InterfaceAudience
+import org.apache.yetus.audience.InterfaceStability
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
+import scopt.OptionParser
+
+import scala.collection.JavaConverters._
+
+case class GeneratorMetrics(rowsWritten: LongAccumulator, collisions: LongAccumulator)
+
+object GeneratorMetrics {
+
+  def apply(sc: SparkContext): GeneratorMetrics = {
+    GeneratorMetrics(sc.longAccumulator("rows_written"), sc.longAccumulator("row_collisions"))
+  }
+}
+
+object DistributedDataGenerator {
+  val log: Logger = LoggerFactory.getLogger(getClass)
+
+  def generateRows(
+      context: KuduContext,
+      options: DistributedDataGeneratorOptions,
+      taskNum: Int,
+      metrics: GeneratorMetrics) {
+
+    val kuduClient = context.syncClient
+    val session = kuduClient.newSession()
+    session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND)
+    val kuduTable = kuduClient.openTable(options.tableName)
+
+    val generator = new DataGenerator.DataGeneratorBuilder()
+    // Add taskNum to the seed otherwise each task will try to generate the same rows.
+      .random(new java.util.Random(options.seed + taskNum))
+      .stringLength(options.stringLength)
+      .binaryLength(options.binaryLength)
+      .build()
+
+    val rowsToWrite = options.numRows / options.numTasks
+    var currentRow: Long = rowsToWrite * taskNum
+    var rowsWritten: Long = 0
+    while (rowsWritten < rowsToWrite) {
+      val insert = kuduTable.newInsert()
+      if (options.generatorType == SequentialGenerator) {
+        setRow(insert.getRow, currentRow)
+      } else if (options.generatorType == RandomGenerator) {
+        generator.randomizeRow(insert.getRow)
+      }
+      session.apply(insert)
+
+      // Synchronously flush on potentially the last iteration of the
+      // loop, so we can check whether we need to retry any collisions.
+      if (rowsWritten + 1 == rowsToWrite) session.flush()
+
+      for (error <- session.getPendingErrors.getRowErrors) {
+        if (error.getErrorStatus.isAlreadyPresent) {
+          // Because we can't check for collisions every time, but instead
+          // only when the rows are flushed, we subtract any rows that may
+          // have failed from the counter.
+          rowsWritten -= 1
+          metrics.collisions.add(1)
+        } else {
+          throw new RuntimeException("Kudu write error: " + error.getErrorStatus.toString)
+        }
+      }
+      currentRow += 1
+      rowsWritten += 1
+    }
+    metrics.rowsWritten.add(rowsWritten)
+    session.close()
+  }
+
+  /**
+   * Sets all the columns in the passed row to the passed value.
+   * TODO(ghenke): Consider failing when value doesn't fit into the type.
+   */
+  private def setRow(row: PartialRow, value: Long): Unit = {
+    val schema = row.getSchema
+    val columns = schema.getColumns.asScala
+    columns.indices.foreach { i =>
+      val col = columns(i)
+      col.getType match {
+        case Type.BOOL =>
+          row.addBoolean(i, value % 2 == 1)
+        case Type.INT8 =>
+          row.addByte(i, value.toByte)
+        case Type.INT16 =>
+          row.addShort(i, value.toShort)
+        case Type.INT32 =>
+          row.addInt(i, value.toInt)
+        case Type.INT64 =>
+          row.addLong(i, value)
+        case Type.UNIXTIME_MICROS =>
+          row.addLong(i, value)
+        case Type.FLOAT =>
+          row.addFloat(i, value.toFloat)
+        case Type.DOUBLE =>
+          row.addDouble(i, value.toDouble)
+        case Type.DECIMAL =>
+          row.addDecimal(
+            i,
+            new BigDecimal(BigInteger.valueOf(value), col.getTypeAttributes.getScale))
+        case Type.STRING =>
+          row.addString(i, String.valueOf(value))
+        case Type.BINARY =>
+          val bytes: Array[Byte] = String.valueOf(value).getBytes(StandardCharsets.UTF_8)
+          row.addBinary(i, bytes)
+        case _ =>
+          throw new UnsupportedOperationException("Unsupported type " + col.getType)
+      }
+    }
+  }
+
+  def run(options: DistributedDataGeneratorOptions, ss: SparkSession): Unit = {
+    log.info(s"Running a DistributedDataGenerator with options: $options")
+    val sc = ss.sparkContext
+    val context = new KuduContext(options.masterAddresses, sc)
+    val metrics = GeneratorMetrics(sc)
+    sc.parallelize(0 until options.numTasks)
+      .foreach(taskNum => generateRows(context, options, taskNum, metrics))
+    log.info(s"Rows written: ${metrics.rowsWritten.value}")
+    log.info(s"Collisions: ${metrics.collisions.value}")
+  }
+
+  /**
+   * Entry point for testing. SparkContext is a singleton,
+   * so tests must create and manage their own.
+   */
+  @InterfaceAudience.LimitedPrivate(Array("Test"))
+  def testMain(args: Array[String], ss: SparkSession): Unit = {
+    DistributedDataGeneratorOptions.parse(args) match {
+      case None => throw new IllegalArgumentException("Could not parse arguments")
+      case Some(config) => run(config, ss)
+    }
+  }
+
+  def main(args: Array[String]): Unit = {
+    val conf = new SparkConf().setAppName("DistributedDataGenerator")
+    val ss = SparkSession.builder().config(conf).getOrCreate()
+    testMain(args, ss)
+  }
+}
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+case class DistributedDataGeneratorOptions(
+    tableName: String,
+    masterAddresses: String,
+    generatorType: String = DistributedDataGeneratorOptions.DefaultGeneratorType,
+    numRows: Long = DistributedDataGeneratorOptions.DefaultNumRows,
+    numTasks: Int = DistributedDataGeneratorOptions.DefaultNumTasks,
+    stringLength: Int = DistributedDataGeneratorOptions.DefaultStringLength,
+    binaryLength: Int = DistributedDataGeneratorOptions.DefaultStringLength,
+    seed: Long = System.currentTimeMillis())
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+object DistributedDataGeneratorOptions {
+  val DefaultNumRows: Long = 10000
+  val DefaultNumTasks: Int = 1
+  val DefaultStringLength: Int = 128
+  val DefaultBinaryLength: Int = 128
+  val RandomGenerator: String = "random"
+  val SequentialGenerator: String = "sequential"
+  val DefaultGeneratorType: String = RandomGenerator
+
+  private val parser: OptionParser[DistributedDataGeneratorOptions] =
+    new OptionParser[DistributedDataGeneratorOptions]("LoadRandomData") {
+
+      arg[String]("table-name")
+        .action((v, o) => o.copy(tableName = v))
+        .text("The table to load with random data")
+
+      arg[String]("master-addresses")
+        .action((v, o) => o.copy(masterAddresses = v))
+        .text("Comma-separated addresses of Kudu masters")
+
+      opt[String]("type")
+        .action((v, o) => o.copy(generatorType = v))
+        .text(s"The type of data generator. Must be one of 'random' or 'sequential'. " +
+          s"Default: ${DefaultGeneratorType}")
+        .optional()
+
+      opt[Long]("num-rows")
+        .action((v, o) => o.copy(numRows = v))
+        .text(s"The total number of unique rows to generate. Default: ${DefaultNumRows}")
+        .optional()
+
+      opt[Int]("num-tasks")
+        .action((v, o) => o.copy(numTasks = v))
+        .text(s"The total number of Spark tasks to generate. Default: ${DefaultNumTasks}")
+        .optional()
+
+      opt[Int]("string-length")
+        .action((v, o) => o.copy(stringLength = v))
+        .text(s"The length of generated string fields. Default: ${DefaultStringLength}")
+        .optional()
+
+      opt[Int]("binary-length")
+        .action((v, o) => o.copy(binaryLength = v))
+        .text(s"The length of generated binary fields. Default: ${DefaultBinaryLength}")
+        .optional()
+
+      opt[Long]("seed")
+        .action((v, o) => o.copy(seed = v))
+        .text(s"The seed to use in the random data generator. " +
+          s"Default: `System.currentTimeMillis()`")
+    }
+
+  def parse(args: Seq[String]): Option[DistributedDataGeneratorOptions] = {
+    parser.parse(args, DistributedDataGeneratorOptions("", ""))
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/b3b008bd/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/DistributedDataGeneratorTest.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/DistributedDataGeneratorTest.scala b/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/DistributedDataGeneratorTest.scala
new file mode 100644
index 0000000..e8c2e0d
--- /dev/null
+++ b/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/DistributedDataGeneratorTest.scala
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.spark.tools
+
+import org.apache.kudu.Type
+import org.apache.kudu.spark.kudu.KuduTestSuite
+import org.apache.kudu.test.RandomUtils
+import org.apache.kudu.util.DecimalUtil
+import org.apache.kudu.util.SchemaGenerator
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.Row
+import org.junit.Test
+import org.junit.Assert.assertEquals
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
+
+class DistributedDataGeneratorTest extends KuduTestSuite {
+  val log: Logger = LoggerFactory.getLogger(getClass)
+
+  private val generator = new SchemaGenerator.SchemaGeneratorBuilder()
+    .random(RandomUtils.getRandom)
+    // These types don't have enough values to prevent collisions.
+    .excludeTypes(Type.BOOL, Type.INT8)
+    // Ensure decimals have enough values to prevent collisions.
+    .precisionRange(DecimalUtil.MAX_DECIMAL32_PRECISION, DecimalUtil.MAX_DECIMAL_PRECISION)
+    .build()
+
+  private val randomTableName: String = "random-table"
+
+  @Test
+  def testGenerateRandomData() {
+    val numRows = 100
+    val args = Array(
+      s"--num-rows=$numRows",
+      "--num-tasks=10",
+      "--type=random",
+      randomTableName,
+      harness.getMasterAddressesAsString)
+    val rdd = runGeneratorTest(args)
+    assertEquals(numRows, rdd.collect.length)
+  }
+
+  @Test
+  def testGenerateSequentialData() {
+    val numRows = 100
+    val args = Array(
+      s"--num-rows=$numRows",
+      "--num-tasks=10",
+      "--type=sequential",
+      randomTableName,
+      harness.getMasterAddressesAsString)
+    val rdd = runGeneratorTest(args)
+    assertEquals(numRows, rdd.collect.length)
+  }
+
+  def runGeneratorTest(args: Array[String]): RDD[Row] = {
+    val schema = generator.randomSchema()
+    val options = generator.randomCreateTableOptions(schema)
+    kuduClient.createTable(randomTableName, schema, options)
+    DistributedDataGenerator.testMain(args, ss)
+    kuduContext.kuduRDD(ss.sparkContext, randomTableName)
+  }
+}