You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2018/10/05 02:30:11 UTC
[1/3] kudu git commit: docs: update docs for CFile checksum handling
Repository: kudu
Updated Branches:
refs/heads/master 131c5d7af -> cf1b1f42c
docs: update docs for CFile checksum handling
Notes that the behavior when encountering a CFile checksum has changed
in 1.8.0. I've kept around the manual steps, since they are still
valuable.
Change-Id: I11ecfe2739122f80894c5bbba13de853d962754a
Reviewed-on: http://gerrit.cloudera.org:8080/11581
Reviewed-by: Grant Henke <gr...@apache.org>
Tested-by: Grant Henke <gr...@apache.org>
Tested-by: Kudu Jenkins
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/75472c82
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/75472c82
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/75472c82
Branch: refs/heads/master
Commit: 75472c82b1c0a2a2a91472e405b3ee4639e26575
Parents: 131c5d7
Author: Andrew Wong <aw...@cloudera.com>
Authored: Thu Oct 4 12:52:47 2018 -0700
Committer: Andrew Wong <aw...@cloudera.com>
Committed: Thu Oct 4 20:44:48 2018 +0000
----------------------------------------------------------------------
docs/troubleshooting.adoc | 12 +++++++-----
1 file changed, 7 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/75472c82/docs/troubleshooting.adoc
----------------------------------------------------------------------
diff --git a/docs/troubleshooting.adoc b/docs/troubleshooting.adoc
index 0f37913..1bebe07 100644
--- a/docs/troubleshooting.adoc
+++ b/docs/troubleshooting.adoc
@@ -703,11 +703,10 @@ hold no data. They must not be deleted.
[[cfile_corruption]]
=== Corruption: checksum error on CFile block
-If the data on disk becomes corrupt, users will encounter warnings containing
-"Corruption: checksum error on CFile block" in the tablet server logs and
-client side errors when trying to scan tablets with corrupt CFile blocks.
-Until link:https://issues.apache.org/jira/browse/KUDU-2469[KUDU-2469] is
-completed, fixing this corruption is a manual process.
+In versions prior to Kudu 1.8.0, if the data on disk becomes corrupt, users
+will encounter warnings containing "Corruption: checksum error on CFile block"
+in the tablet server logs and client side errors when trying to scan tablets
+with corrupt CFile blocks. Fixing this corruption is a manual process.
To fix the issue, users can first identify all the affected tablets by
running a checksum scan on the affected tables or tablets using the
@@ -739,3 +738,6 @@ with an empty one using the
----
sudo -u kudu kudu tablet unsafe_replace_tablet <master_addresses> <tablet_id>
----
+
+From versions 1.8.0 onwards, Kudu will mark the affected replicas as failed,
+leading to their automatic re-replication elsewhere.
[2/3] kudu git commit: docs: add a note about RAID-0
Posted by gr...@apache.org.
docs: add a note about RAID-0
Adds a note that a single RAID-0 device with multiple disks is not
preferable to specifying multiple fs directories across multiple disks.
Change-Id: Ibb17fae9459be04211bd0a7db6eca8e94e443f71
Reviewed-on: http://gerrit.cloudera.org:8080/11580
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Kudu Jenkins
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/fb79f8fc
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/fb79f8fc
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/fb79f8fc
Branch: refs/heads/master
Commit: fb79f8fccf6c6000f2569afb7fa2e41b02fdfe00
Parents: 75472c8
Author: Andrew Wong <aw...@cloudera.com>
Authored: Thu Oct 4 12:40:44 2018 -0700
Committer: Andrew Wong <aw...@cloudera.com>
Committed: Thu Oct 4 22:58:49 2018 +0000
----------------------------------------------------------------------
docs/configuration.adoc | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/fb79f8fc/docs/configuration.adoc
----------------------------------------------------------------------
diff --git a/docs/configuration.adoc b/docs/configuration.adoc
index 6a0a7f0..ebc1636 100644
--- a/docs/configuration.adoc
+++ b/docs/configuration.adoc
@@ -74,7 +74,11 @@ the latency of Kudu writes.
The `--fs_data_dirs` configuration indicates where Kudu will write its data
blocks. This is a comma-separated list of directories; if multiple values are
specified, data will be striped across the directories. If not specified, data
-blocks will be placed in the directory specified by `--fs_wal_dir`.
+blocks will be placed in the directory specified by `--fs_wal_dir`. Note that
+while a single data directory backed by a RAID-0 array will outperform a single
+data directory backed by a single storage device, it is better to let Kudu
+manage its own striping over multiple devices rather than delegating the
+striping to a RAID-0 array.
Additionally, `--fs_wal_dir` and `--fs_metadata_dir` may be the same as _one
of_ the directories listed in `--fs_data_dirs`, but must not be sub-directories
[3/3] kudu git commit: KUDU-2563: [spark] Use the scanner keepAlive
API
Posted by gr...@apache.org.
KUDU-2563: [spark] Use the scanner keepAlive API
Adds scheduled keepAlive calls to the scanner in the
KuduRDD RowIterator. The period in which the calls
are made is configurable via keepAlivePeriodMs and
has a default of 15 seconds (which is 1/4 the default
scanner ttl).
This implementation is similar to the Impala integration.
It checks if a call to the keepAlive API is needed as
it processes each row. Compared to a background
thread, this has the downside of being less consistently
scheduled and susceptible to scenarios in which a single
row takes longer to process than the ttl. However,
because the scanner is not thread safe, this is the most
straightforward solution and has been proven to work.
Change-Id: Ia7f26d6ab8deb24982055d247938a11e188c35db
Reviewed-on: http://gerrit.cloudera.org:8080/11571
Reviewed-by: Grant Henke <gr...@apache.org>
Tested-by: Grant Henke <gr...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/cf1b1f42
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/cf1b1f42
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/cf1b1f42
Branch: refs/heads/master
Commit: cf1b1f42cbcc3ee67477ddc44cd0ff5070f1caac
Parents: fb79f8f
Author: Grant Henke <gr...@apache.org>
Authored: Sun Sep 30 22:31:52 2018 -0500
Committer: Grant Henke <gr...@apache.org>
Committed: Fri Oct 5 02:24:04 2018 +0000
----------------------------------------------------------------------
.../apache/kudu/client/AsyncKuduScanner.java | 7 ++
.../org/apache/kudu/client/KuduScanner.java | 9 +-
.../apache/kudu/spark/kudu/DefaultSource.scala | 18 ++--
.../org/apache/kudu/spark/kudu/KuduRDD.scala | 25 +++++-
.../kudu/spark/kudu/KuduReadOptions.scala | 4 +
.../apache/kudu/spark/kudu/KuduRDDTest.scala | 87 ++++++++++++++++++++
6 files changed, 139 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/cf1b1f42/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
index 804978e..71b1146 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
@@ -605,6 +605,13 @@ public final class AsyncKuduScanner {
}
/**
+ * @return true if the scanner has been closed.
+ */
+ public boolean isClosed() {
+ return closed;
+ }
+
+ /**
* Closes this scanner (don't forget to call this when you're done with it!).
* <p>
* Closing a scanner already closed has no effect. The deferred returned
http://git-wip-us.apache.org/repos/asf/kudu/blob/cf1b1f42/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
index 209fada..f945d8f 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
@@ -79,6 +79,13 @@ public class KuduScanner {
}
/**
+ * @return true if the scanner has been closed.
+ */
+ public boolean isClosed() {
+ return asyncScanner.isClosed();
+ }
+
+ /**
* Closes this scanner (don't forget to call this when you're done with it!).
* <p>
* Closing a scanner already closed has no effect.
@@ -135,7 +142,7 @@ public class KuduScanner {
* Returns the RemoteTablet currently being scanned, if any.
*/
@InterfaceAudience.LimitedPrivate("Test")
- RemoteTablet currentTablet() {
+ public RemoteTablet currentTablet() {
return asyncScanner.currentTablet();
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/cf1b1f42/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
index 29635a3..890ecda 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
@@ -62,6 +62,7 @@ class DefaultSource
val SCAN_REQUEST_TIMEOUT_MS = "kudu.scanRequestTimeoutMs"
val SOCKET_READ_TIMEOUT_MS = "kudu.socketReadTimeoutMs"
val BATCH_SIZE = "kudu.batchSize"
+ val KEEP_ALIVE_PERIOD_MS = "kudu.keepAlivePeriodMs"
/**
* Construct a BaseRelation using the provided context and parameters.
@@ -77,13 +78,13 @@ class DefaultSource
}
/**
- * Construct a BaseRelation using the provided context, parameters and schema.
- *
- * @param sqlContext SparkSQL context
- * @param parameters parameters given to us from SparkSQL
- * @param schema the schema used to select columns for the relation
- * @return a BaseRelation Object
- */
+ * Construct a BaseRelation using the provided context, parameters and schema.
+ *
+ * @param sqlContext SparkSQL context
+ * @param parameters parameters given to us from SparkSQL
+ * @param schema the schema used to select columns for the relation
+ * @return a BaseRelation Object
+ */
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String],
@@ -141,11 +142,14 @@ class DefaultSource
parameters.get(SCAN_LOCALITY).map(getScanLocalityType).getOrElse(defaultScanLocality)
val scanRequestTimeoutMs = parameters.get(SCAN_REQUEST_TIMEOUT_MS).map(_.toLong)
val socketReadTimeoutMs = parameters.get(SOCKET_READ_TIMEOUT_MS).map(_.toLong)
+ val keepAlivePeriodMs =
+ parameters.get(KEEP_ALIVE_PERIOD_MS).map(_.toLong).getOrElse(defaultKeepAlivePeriodMs)
KuduReadOptions(
batchSize,
scanLocality,
faultTolerantScanner,
+ keepAlivePeriodMs,
scanRequestTimeoutMs,
socketReadTimeoutMs)
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/cf1b1f42/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
index 77dabcc..2deea16 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
@@ -24,7 +24,6 @@ import org.apache.spark.SparkContext
import org.apache.spark.TaskContext
import org.apache.yetus.audience.InterfaceAudience
import org.apache.yetus.audience.InterfaceStability
-
import org.apache.kudu.client._
import org.apache.kudu.Type
import org.apache.kudu.client
@@ -45,6 +44,9 @@ class KuduRDD private[kudu] (
@transient val sc: SparkContext)
extends RDD[Row](sc, Nil) {
+ // Defined here because the options are transient.
+ private val keepAlivePeriodMs = options.keepAlivePeriodMs
+
override protected def getPartitions: Array[Partition] = {
val builder = kuduContext.syncClient
.newScanTokenBuilder(table)
@@ -91,7 +93,7 @@ class KuduRDD private[kudu] (
val partition: KuduPartition = part.asInstanceOf[KuduPartition]
val scanner =
KuduScanToken.deserializeIntoScanner(partition.scanToken, client)
- new RowIterator(scanner, kuduContext)
+ new RowIterator(scanner, kuduContext, keepAlivePeriodMs)
}
override def getPreferredLocations(partition: Partition): Seq[String] = {
@@ -112,11 +114,27 @@ private class KuduPartition(
* A Spark SQL [[Row]] iterator which wraps a [[KuduScanner]].
* @param scanner the wrapped scanner
* @param kuduContext the kudu context
+ * @param keepAlivePeriodMs the period in which to call the keepAlive on the scanners
*/
-private class RowIterator(private val scanner: KuduScanner, private val kuduContext: KuduContext)
+private class RowIterator(
+ val scanner: KuduScanner,
+ val kuduContext: KuduContext,
+ val keepAlivePeriodMs: Long)
extends Iterator[Row] {
private var currentIterator: RowResultIterator = RowResultIterator.empty
+ private var lastKeepAliveTimeMs = System.currentTimeMillis()
+
+ /**
+ * Calls the keepAlive API on the current scanner if the keepAlivePeriodMs has passed.
+ */
+ private def KeepKuduScannerAlive(): Unit = {
+ val now = System.currentTimeMillis
+ if (now >= lastKeepAliveTimeMs + keepAlivePeriodMs && !scanner.isClosed) {
+ scanner.keepAlive()
+ lastKeepAliveTimeMs = now
+ }
+ }
override def hasNext: Boolean = {
while (!currentIterator.hasNext && scanner.hasMoreRows) {
@@ -128,6 +146,7 @@ private class RowIterator(private val scanner: KuduScanner, private val kuduCont
// timestamp on each executor.
kuduContext.timestampAccumulator.add(kuduContext.syncClient.getLastPropagatedTimestamp)
}
+ KeepKuduScannerAlive()
currentIterator.hasNext
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/cf1b1f42/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala
index 7c9b888..a1983b5 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala
@@ -31,6 +31,8 @@ import org.apache.kudu.spark.kudu.KuduReadOptions._
* take place at the closest replica
* @param faultTolerantScanner scanner type to be used. Fault tolerant if true,
* otherwise, use non fault tolerant one
+ * @param keepAlivePeriodMs The period at which to send keep-alive requests to the tablet
+ * server to ensure that scanners do not time out
* @param scanRequestTimeoutMs Maximum time allowed per scan request, in milliseconds
* @param socketReadTimeoutMs Maximum time allowed when waiting on data from a socket
*/
@@ -40,6 +42,7 @@ case class KuduReadOptions(
batchSize: Int = defaultBatchSize,
scanLocality: ReplicaSelection = defaultScanLocality,
faultTolerantScanner: Boolean = defaultFaultTolerantScanner,
+ keepAlivePeriodMs: Long = defaultKeepAlivePeriodMs,
scanRequestTimeoutMs: Option[Long] = None,
socketReadTimeoutMs: Option[Long] = None)
@@ -47,4 +50,5 @@ object KuduReadOptions {
val defaultBatchSize: Int = 1024 * 1024 * 20 // TODO: Understand/doc this setting?
val defaultScanLocality: ReplicaSelection = ReplicaSelection.CLOSEST_REPLICA
val defaultFaultTolerantScanner: Boolean = false
+ val defaultKeepAlivePeriodMs: Long = 15000 // 25% of the default scanner ttl.
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/cf1b1f42/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduRDDTest.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduRDDTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduRDDTest.scala
index f0fb4a0..49bc15e 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduRDDTest.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduRDDTest.scala
@@ -17,6 +17,15 @@
package org.apache.kudu.spark.kudu
+import scala.collection.JavaConverters._
+import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder
+import org.apache.kudu.client.CreateTableOptions
+import org.apache.kudu.Schema
+import org.apache.kudu.Type
+import org.apache.kudu.test.KuduTestHarness.TabletServerConfig
+import org.apache.spark.SparkException
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.Row
import org.junit.Test
class KuduRDDTest extends KuduTestSuite {
@@ -27,4 +36,82 @@ class KuduRDDTest extends KuduTestSuite {
val rdd = kuduContext.kuduRDD(ss.sparkContext, tableName, List("key"))
assert(rdd.collect.length == 100)
}
+
+ @Test
+ @TabletServerConfig(
+ // Hard coded values because Scala doesn't handle array constants in annotations.
+ flags = Array(
+ "--scanner_ttl_ms=5000",
+ "--scanner_gc_check_interval_us=500000" // 10% of the TTL.
+ ))
+ def testKeepAlive() {
+ val rowCount = 500
+ val shortScannerTtlMs = 5000
+
+ // Create a simple table with a single partition.
+ val tableName = "testKeepAlive"
+ val tableSchema = {
+ val columns = List(
+ new ColumnSchemaBuilder("key", Type.INT32).key(true).build(),
+ new ColumnSchemaBuilder("val", Type.INT32).build()).asJava
+ new Schema(columns)
+ }
+ val tableOptions = new CreateTableOptions()
+ .setRangePartitionColumns(List("key").asJava)
+ .setNumReplicas(1)
+ val table = kuduClient.createTable(tableName, tableSchema, tableOptions)
+
+ val session = kuduClient.newSession()
+ Range(0, rowCount).map { i =>
+ val insert = table.newInsert
+ val row = insert.getRow
+ row.addInt(0, i)
+ row.addInt(1, i)
+ session.apply(insert)
+ }
+ session.flush()
+
+ def processRDD(rdd: RDD[Row]): Unit = {
+ // Ensure reading takes longer than the scanner ttl.
+ var i = 0
+ rdd.foreach { row =>
+ // Sleep for half the ttl for the first few rows. This ensures
+ // we are on the same tablet and will go past the ttl without
+ // a new scan request. It also ensures a single row doesn't go
+ // longer than the ttl.
+ if (i < 5) {
+ Thread.sleep(shortScannerTtlMs / 2) // Sleep for half the ttl.
+ i = i + 1
+ }
+ }
+ }
+
+ // Test that a keepAlivePeriodMs less than the scanner ttl is successful.
+ val goodRdd = kuduContext.kuduRDD(
+ ss.sparkContext,
+ tableName,
+ List("key"),
+ KuduReadOptions(
+ batchSize = 100, // Set a small batch size so the first scan doesn't read all the rows.
+ keepAlivePeriodMs = shortScannerTtlMs / 4)
+ )
+ processRDD(goodRdd)
+
+ // Test that a keepAlivePeriodMs greater than the scanner ttl fails.
+ val badRdd = kuduContext.kuduRDD(
+ ss.sparkContext,
+ tableName,
+ List("key"),
+ KuduReadOptions(
+ batchSize = 100, // Set a small batch size so the first scan doesn't read all the rows.
+ keepAlivePeriodMs = shortScannerTtlMs * 2)
+ )
+ try {
+ processRDD(badRdd)
+ fail("Should throw a scanner not found exception")
+ } catch {
+ case ex: SparkException =>
+ assert(ex.getMessage.matches("(?s).*Scanner .* not found.*"))
+ }
+ }
}