You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2018/10/05 02:30:11 UTC

[1/3] kudu git commit: docs: update docs for CFile checksum handling

Repository: kudu
Updated Branches:
  refs/heads/master 131c5d7af -> cf1b1f42c


docs: update docs for CFile checksum handling

Notes that the behavior when encountering a CFile checksum has changed
in 1.8.0. I've kept around the manual steps, since they are still
valuable.

Change-Id: I11ecfe2739122f80894c5bbba13de853d962754a
Reviewed-on: http://gerrit.cloudera.org:8080/11581
Reviewed-by: Grant Henke <gr...@apache.org>
Tested-by: Grant Henke <gr...@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/75472c82
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/75472c82
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/75472c82

Branch: refs/heads/master
Commit: 75472c82b1c0a2a2a91472e405b3ee4639e26575
Parents: 131c5d7
Author: Andrew Wong <aw...@cloudera.com>
Authored: Thu Oct 4 12:52:47 2018 -0700
Committer: Andrew Wong <aw...@cloudera.com>
Committed: Thu Oct 4 20:44:48 2018 +0000

----------------------------------------------------------------------
 docs/troubleshooting.adoc | 12 +++++++-----
 1 file changed, 7 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/75472c82/docs/troubleshooting.adoc
----------------------------------------------------------------------
diff --git a/docs/troubleshooting.adoc b/docs/troubleshooting.adoc
index 0f37913..1bebe07 100644
--- a/docs/troubleshooting.adoc
+++ b/docs/troubleshooting.adoc
@@ -703,11 +703,10 @@ hold no data. They must not be deleted.
 [[cfile_corruption]]
 === Corruption: checksum error on CFile block
 
-If the data on disk becomes corrupt, users will encounter warnings containing
-"Corruption: checksum error on CFile block" in the tablet server logs and
-client side errors when trying to scan tablets with corrupt CFile blocks.
-Until link:https://issues.apache.org/jira/browse/KUDU-2469[KUDU-2469] is
-completed, fixing this corruption is a manual process.
+In versions prior to Kudu 1.8.0, if the data on disk becomes corrupt, users
+will encounter warnings containing "Corruption: checksum error on CFile block"
+in the tablet server logs and client side errors when trying to scan tablets
+with corrupt CFile blocks. Fixing this corruption is a manual process.
 
 To fix the issue, users can first identify all the affected tablets by
 running a checksum scan on the affected tables or tablets using the
@@ -739,3 +738,6 @@ with an empty one using the
 ----
 sudo -u kudu kudu tablet unsafe_replace_tablet <master_addresses> <tablet_id>
 ----
+
+From versions 1.8.0 onwards, Kudu will mark the affected replicas as failed,
+leading to their automatic re-replication elsewhere.


[2/3] kudu git commit: docs: add a note about RAID-0

Posted by gr...@apache.org.
docs: add a note about RAID-0

Adds a note that a single RAID-0 device with multiple disks is not
preferable to specifying multiple fs directories across multiple disks.

Change-Id: Ibb17fae9459be04211bd0a7db6eca8e94e443f71
Reviewed-on: http://gerrit.cloudera.org:8080/11580
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/fb79f8fc
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/fb79f8fc
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/fb79f8fc

Branch: refs/heads/master
Commit: fb79f8fccf6c6000f2569afb7fa2e41b02fdfe00
Parents: 75472c8
Author: Andrew Wong <aw...@cloudera.com>
Authored: Thu Oct 4 12:40:44 2018 -0700
Committer: Andrew Wong <aw...@cloudera.com>
Committed: Thu Oct 4 22:58:49 2018 +0000

----------------------------------------------------------------------
 docs/configuration.adoc | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/fb79f8fc/docs/configuration.adoc
----------------------------------------------------------------------
diff --git a/docs/configuration.adoc b/docs/configuration.adoc
index 6a0a7f0..ebc1636 100644
--- a/docs/configuration.adoc
+++ b/docs/configuration.adoc
@@ -74,7 +74,11 @@ the latency of Kudu writes.
 The `--fs_data_dirs` configuration indicates where Kudu will write its data
 blocks. This is a comma-separated list of directories; if multiple values are
 specified, data will be striped across the directories. If not specified, data
-blocks will be placed in the directory specified by `--fs_wal_dir`.
+blocks will be placed in the directory specified by `--fs_wal_dir`. Note that
+while a single data directory backed by a RAID-0 array will outperform a single
+data directory backed by a single storage device, it is better to let Kudu
+manage its own striping over multiple devices rather than delegating the
+striping to a RAID-0 array.
 
 Additionally, `--fs_wal_dir` and `--fs_metadata_dir` may be the same as _one
 of_ the directories listed in `--fs_data_dirs`, but must not be sub-directories


[3/3] kudu git commit: KUDU-2563: [spark] Use the scanner keepAlive API

Posted by gr...@apache.org.
KUDU-2563: [spark] Use the scanner keepAlive API

Adds scheduled keepAlive calls to the scanner in the
KuduRDD RowIterator. The period in which the calls
are made is configurable via keepAlivePeriodMs and
has a default of 15 seconds (which is 1/4 the default
scanner ttl).

This implementation is similar to the Impala integration.
It checks if a call to the keepAlive API is needed as
it processes each row. Compared to a background
thread, this has the downside of being less consistently
scheduled and susceptible to scenarios in which a single
row takes longer to process than the ttl. However,
because the scanner is not thread safe, this is the most
straightforward solution and has been proven to work.

Change-Id: Ia7f26d6ab8deb24982055d247938a11e188c35db
Reviewed-on: http://gerrit.cloudera.org:8080/11571
Reviewed-by: Grant Henke <gr...@apache.org>
Tested-by: Grant Henke <gr...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/cf1b1f42
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/cf1b1f42
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/cf1b1f42

Branch: refs/heads/master
Commit: cf1b1f42cbcc3ee67477ddc44cd0ff5070f1caac
Parents: fb79f8f
Author: Grant Henke <gr...@apache.org>
Authored: Sun Sep 30 22:31:52 2018 -0500
Committer: Grant Henke <gr...@apache.org>
Committed: Fri Oct 5 02:24:04 2018 +0000

----------------------------------------------------------------------
 .../apache/kudu/client/AsyncKuduScanner.java    |  7 ++
 .../org/apache/kudu/client/KuduScanner.java     |  9 +-
 .../apache/kudu/spark/kudu/DefaultSource.scala  | 18 ++--
 .../org/apache/kudu/spark/kudu/KuduRDD.scala    | 25 +++++-
 .../kudu/spark/kudu/KuduReadOptions.scala       |  4 +
 .../apache/kudu/spark/kudu/KuduRDDTest.scala    | 87 ++++++++++++++++++++
 6 files changed, 139 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/cf1b1f42/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
index 804978e..71b1146 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
@@ -605,6 +605,13 @@ public final class AsyncKuduScanner {
   }
 
   /**
+   * @return true if the scanner has been closed.
+   */
+  public boolean isClosed() {
+    return closed;
+  }
+
+  /**
    * Closes this scanner (don't forget to call this when you're done with it!).
    * <p>
    * Closing a scanner already closed has no effect.  The deferred returned

http://git-wip-us.apache.org/repos/asf/kudu/blob/cf1b1f42/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
index 209fada..f945d8f 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
@@ -79,6 +79,13 @@ public class KuduScanner {
   }
 
   /**
+   * @return true if the scanner has been closed.
+   */
+  public boolean isClosed() {
+    return asyncScanner.isClosed();
+  }
+
+  /**
    * Closes this scanner (don't forget to call this when you're done with it!).
    * <p>
    * Closing a scanner already closed has no effect.
@@ -135,7 +142,7 @@ public class KuduScanner {
    * Returns the RemoteTablet currently being scanned, if any.
    */
   @InterfaceAudience.LimitedPrivate("Test")
-  RemoteTablet currentTablet() {
+  public RemoteTablet currentTablet() {
     return asyncScanner.currentTablet();
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/cf1b1f42/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
index 29635a3..890ecda 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
@@ -62,6 +62,7 @@ class DefaultSource
   val SCAN_REQUEST_TIMEOUT_MS = "kudu.scanRequestTimeoutMs"
   val SOCKET_READ_TIMEOUT_MS = "kudu.socketReadTimeoutMs"
   val BATCH_SIZE = "kudu.batchSize"
+  val KEEP_ALIVE_PERIOD_MS = "kudu.keepAlivePeriodMs"
 
   /**
    * Construct a BaseRelation using the provided context and parameters.
@@ -77,13 +78,13 @@ class DefaultSource
   }
 
   /**
-    * Construct a BaseRelation using the provided context, parameters and schema.
-    *
-    * @param sqlContext SparkSQL context
-    * @param parameters parameters given to us from SparkSQL
-    * @param schema     the schema used to select columns for the relation
-    * @return           a BaseRelation Object
-    */
+   * Construct a BaseRelation using the provided context, parameters and schema.
+   *
+   * @param sqlContext SparkSQL context
+   * @param parameters parameters given to us from SparkSQL
+   * @param schema     the schema used to select columns for the relation
+   * @return           a BaseRelation Object
+   */
   override def createRelation(
       sqlContext: SQLContext,
       parameters: Map[String, String],
@@ -141,11 +142,14 @@ class DefaultSource
       parameters.get(SCAN_LOCALITY).map(getScanLocalityType).getOrElse(defaultScanLocality)
     val scanRequestTimeoutMs = parameters.get(SCAN_REQUEST_TIMEOUT_MS).map(_.toLong)
     val socketReadTimeoutMs = parameters.get(SOCKET_READ_TIMEOUT_MS).map(_.toLong)
+    val keepAlivePeriodMs =
+      parameters.get(KEEP_ALIVE_PERIOD_MS).map(_.toLong).getOrElse(defaultKeepAlivePeriodMs)
 
     KuduReadOptions(
       batchSize,
       scanLocality,
       faultTolerantScanner,
+      keepAlivePeriodMs,
       scanRequestTimeoutMs,
       socketReadTimeoutMs)
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/cf1b1f42/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
index 77dabcc..2deea16 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
@@ -24,7 +24,6 @@ import org.apache.spark.SparkContext
 import org.apache.spark.TaskContext
 import org.apache.yetus.audience.InterfaceAudience
 import org.apache.yetus.audience.InterfaceStability
-
 import org.apache.kudu.client._
 import org.apache.kudu.Type
 import org.apache.kudu.client
@@ -45,6 +44,9 @@ class KuduRDD private[kudu] (
     @transient val sc: SparkContext)
     extends RDD[Row](sc, Nil) {
 
+  // Defined here because the options are transient.
+  private val keepAlivePeriodMs = options.keepAlivePeriodMs
+
   override protected def getPartitions: Array[Partition] = {
     val builder = kuduContext.syncClient
       .newScanTokenBuilder(table)
@@ -91,7 +93,7 @@ class KuduRDD private[kudu] (
     val partition: KuduPartition = part.asInstanceOf[KuduPartition]
     val scanner =
       KuduScanToken.deserializeIntoScanner(partition.scanToken, client)
-    new RowIterator(scanner, kuduContext)
+    new RowIterator(scanner, kuduContext, keepAlivePeriodMs)
   }
 
   override def getPreferredLocations(partition: Partition): Seq[String] = {
@@ -112,11 +114,27 @@ private class KuduPartition(
  * A Spark SQL [[Row]] iterator which wraps a [[KuduScanner]].
  * @param scanner the wrapped scanner
  * @param kuduContext the kudu context
+ * @param keepAlivePeriodMs the period in which to call the keepAlive on the scanners
  */
-private class RowIterator(private val scanner: KuduScanner, private val kuduContext: KuduContext)
+private class RowIterator(
+    val scanner: KuduScanner,
+    val kuduContext: KuduContext,
+    val keepAlivePeriodMs: Long)
     extends Iterator[Row] {
 
   private var currentIterator: RowResultIterator = RowResultIterator.empty
+  private var lastKeepAliveTimeMs = System.currentTimeMillis()
+
+  /**
+   * Calls the keepAlive API on the current scanner if the keepAlivePeriodMs has passed.
+   */
+  private def KeepKuduScannerAlive(): Unit = {
+    val now = System.currentTimeMillis
+    if (now >= lastKeepAliveTimeMs + keepAlivePeriodMs && !scanner.isClosed) {
+      scanner.keepAlive()
+      lastKeepAliveTimeMs = now
+    }
+  }
 
   override def hasNext: Boolean = {
     while (!currentIterator.hasNext && scanner.hasMoreRows) {
@@ -128,6 +146,7 @@ private class RowIterator(private val scanner: KuduScanner, private val kuduCont
       // timestamp on each executor.
       kuduContext.timestampAccumulator.add(kuduContext.syncClient.getLastPropagatedTimestamp)
     }
+    KeepKuduScannerAlive()
     currentIterator.hasNext
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/cf1b1f42/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala
index 7c9b888..a1983b5 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala
@@ -31,6 +31,8 @@ import org.apache.kudu.spark.kudu.KuduReadOptions._
  *                     take place at the closest replica
  * @param faultTolerantScanner scanner type to be used. Fault tolerant if true,
  *                             otherwise, use non fault tolerant one
+ * @param keepAlivePeriodMs The period at which to send keep-alive requests to the tablet
+ *                          server to ensure that scanners do not time out
  * @param scanRequestTimeoutMs Maximum time allowed per scan request, in milliseconds
  * @param socketReadTimeoutMs Maximum time allowed when waiting on data from a socket
  */
@@ -40,6 +42,7 @@ case class KuduReadOptions(
     batchSize: Int = defaultBatchSize,
     scanLocality: ReplicaSelection = defaultScanLocality,
     faultTolerantScanner: Boolean = defaultFaultTolerantScanner,
+    keepAlivePeriodMs: Long = defaultKeepAlivePeriodMs,
     scanRequestTimeoutMs: Option[Long] = None,
     socketReadTimeoutMs: Option[Long] = None)
 
@@ -47,4 +50,5 @@ object KuduReadOptions {
   val defaultBatchSize: Int = 1024 * 1024 * 20 // TODO: Understand/doc this setting?
   val defaultScanLocality: ReplicaSelection = ReplicaSelection.CLOSEST_REPLICA
   val defaultFaultTolerantScanner: Boolean = false
+  val defaultKeepAlivePeriodMs: Long = 15000 // 25% of the default scanner ttl.
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/cf1b1f42/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduRDDTest.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduRDDTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduRDDTest.scala
index f0fb4a0..49bc15e 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduRDDTest.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduRDDTest.scala
@@ -17,6 +17,15 @@
 
 package org.apache.kudu.spark.kudu
 
+import scala.collection.JavaConverters._
+import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder
+import org.apache.kudu.client.CreateTableOptions
+import org.apache.kudu.Schema
+import org.apache.kudu.Type
+import org.apache.kudu.test.KuduTestHarness.TabletServerConfig
+import org.apache.spark.SparkException
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.Row
 import org.junit.Test
 
 class KuduRDDTest extends KuduTestSuite {
@@ -27,4 +36,82 @@ class KuduRDDTest extends KuduTestSuite {
     val rdd = kuduContext.kuduRDD(ss.sparkContext, tableName, List("key"))
     assert(rdd.collect.length == 100)
   }
+
+  @Test
+  @TabletServerConfig(
+    // Hard coded values because Scala doesn't handle array constants in annotations.
+    flags = Array(
+      "--scanner_ttl_ms=5000",
+      "--scanner_gc_check_interval_us=500000" // 10% of the TTL.
+    ))
+  def testKeepAlive() {
+    val rowCount = 500
+    val shortScannerTtlMs = 5000
+
+    // Create a simple table with a single partition.
+    val tableName = "testKeepAlive"
+    val tableSchema = {
+      val columns = List(
+        new ColumnSchemaBuilder("key", Type.INT32).key(true).build(),
+        new ColumnSchemaBuilder("val", Type.INT32).build()).asJava
+      new Schema(columns)
+    }
+    val tableOptions = new CreateTableOptions()
+      .setRangePartitionColumns(List("key").asJava)
+      .setNumReplicas(1)
+    val table = kuduClient.createTable(tableName, tableSchema, tableOptions)
+
+    val session = kuduClient.newSession()
+    Range(0, rowCount).map { i =>
+      val insert = table.newInsert
+      val row = insert.getRow
+      row.addInt(0, i)
+      row.addInt(1, i)
+      session.apply(insert)
+    }
+    session.flush()
+
+    def processRDD(rdd: RDD[Row]): Unit = {
+      // Ensure reading takes longer than the scanner ttl.
+      var i = 0
+      rdd.foreach { row =>
+        // Sleep for half the ttl for the first few rows. This ensures
+        // we are on the same tablet and will go past the ttl without
+        // a new scan request. It also ensures a single row doesn't go
+        // longer than the ttl.
+        if (i < 5) {
+          Thread.sleep(shortScannerTtlMs / 2) // Sleep for half the ttl.
+          i = i + 1
+        }
+      }
+    }
+
+    // Test that a keepAlivePeriodMs less than the scanner ttl is successful.
+    val goodRdd = kuduContext.kuduRDD(
+      ss.sparkContext,
+      tableName,
+      List("key"),
+      KuduReadOptions(
+        batchSize = 100, // Set a small batch size so the first scan doesn't read all the rows.
+        keepAlivePeriodMs = shortScannerTtlMs / 4)
+    )
+    processRDD(goodRdd)
+
+    // Test that a keepAlivePeriodMs greater than the scanner ttl fails.
+    val badRdd = kuduContext.kuduRDD(
+      ss.sparkContext,
+      tableName,
+      List("key"),
+      KuduReadOptions(
+        batchSize = 100, // Set a small batch size so the first scan doesn't read all the rows.
+        keepAlivePeriodMs = shortScannerTtlMs * 2)
+    )
+    try {
+      processRDD(badRdd)
+      fail("Should throw a scanner not found exception")
+    } catch {
+      case ex: SparkException =>
+        assert(ex.getMessage.matches("(?s).*Scanner .* not found.*"))
+    }
+  }
 }