You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2019/10/18 03:30:12 UTC

[kudu] branch master updated (24aebba -> 03cbd75)

This is an automated email from the ASF dual-hosted git repository.

adar pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git.


    from 24aebba  [nvm_cache] set --nvm_cache_allocation_retry_count to 0
     new 930d544  [spark] Separate out DefaultSourceTests
     new 5ca01ce  webserver: enable HTTP keep-alive
     new 03cbd75  www: miscellaneous mustache updates

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/kudu/spark/kudu/DefaultSourceTest.scala | 497 -------------------
 .../org/apache/kudu/spark/kudu/KuduTestSuite.scala |  14 +
 .../org/apache/kudu/spark/kudu/SparkSQLTest.scala  | 534 +++++++++++++++++++++
 src/kudu/master/master_path_handlers.cc            |  10 +-
 src/kudu/server/default_path_handlers.cc           |  14 +-
 src/kudu/server/pprof_path_handlers.cc             |  18 +-
 src/kudu/server/rpcz-path-handler.cc               |   2 +-
 src/kudu/server/tracing_path_handlers.cc           |   4 +-
 src/kudu/server/webserver-test.cc                  |  21 +
 src/kudu/server/webserver.cc                       | 231 +++++----
 src/kudu/server/webserver.h                        |  14 +-
 src/kudu/tserver/tserver_path_handlers.cc          |  70 +--
 src/kudu/tserver/tserver_path_handlers.h           |   6 +-
 src/kudu/util/curl_util.cc                         |  12 +-
 src/kudu/util/curl_util.h                          |   7 +
 src/kudu/util/thread.cc                            | 119 ++---
 src/kudu/util/web_callback_registry.h              |   9 +-
 www/{home.mustache => dashboards.mustache}         |  15 +-
 www/kudu.js                                        |  27 +-
 www/tablets.mustache                               |  10 +-
 www/threadz.mustache                               |  68 +++
 21 files changed, 919 insertions(+), 783 deletions(-)
 create mode 100644 java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/SparkSQLTest.scala
 copy www/{home.mustache => dashboards.mustache} (58%)
 create mode 100644 www/threadz.mustache


[kudu] 01/03: [spark] Separate out DefaultSourceTests

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 930d54483a8ae850175ef1f1aff94a8f4342705f
Author: Grant Henke <gr...@apache.org>
AuthorDate: Thu Oct 17 14:13:57 2019 -0500

    [spark] Separate out DefaultSourceTests
    
    We have seen flaky test failures due to timeouts of DefaultSourceTest.
    This is primarily due to the sheer number of tests in that class.
    
    This patch break out the SQL based tests, ones using `sqlContext.sql(…)`
    into their own class. There is no change in test methods or coverage.
    
    The result is 22 DefaultSourceTests and 22 SparkSQLTests.
    
    Change-Id: I54aa0327ffb5254c03fcfe8a0a08dba230360a40
    Reviewed-on: http://gerrit.cloudera.org:8080/14491
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
    Reviewed-by: Hao Hao <ha...@cloudera.com>
    Tested-by: Kudu Jenkins
---
 .../apache/kudu/spark/kudu/DefaultSourceTest.scala | 497 -------------------
 .../org/apache/kudu/spark/kudu/KuduTestSuite.scala |  14 +
 .../org/apache/kudu/spark/kudu/SparkSQLTest.scala  | 534 +++++++++++++++++++++
 3 files changed, 548 insertions(+), 497 deletions(-)

diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
index 7dc96d2..521c83a 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
@@ -18,8 +18,6 @@ package org.apache.kudu.spark.kudu
 
 import scala.collection.JavaConverters._
 import scala.collection.immutable.IndexedSeq
-import scala.util.control.NonFatal
-import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.functions._
@@ -28,16 +26,10 @@ import org.apache.spark.sql.types.StructField
 import org.apache.spark.sql.types.StructType
 import org.junit.Assert._
 import org.scalatest.Matchers
-import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder
 import org.apache.kudu.client.CreateTableOptions
-import org.apache.kudu.Schema
-import org.apache.kudu.Type
 import org.apache.kudu.test.RandomUtils
 import org.apache.kudu.spark.kudu.SparkListenerUtil.withJobTaskCounter
 import org.apache.kudu.test.KuduTestHarness.MasterServerConfig
-import org.apache.kudu.test.KuduTestHarness.TabletServerConfig
-import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
 import org.junit.Before
 import org.junit.Test
 
@@ -505,380 +497,6 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
   }
 
   @Test
-  def testTableNonFaultTolerantScan() {
-    val results = sqlContext.sql(s"SELECT * FROM $tableName").collectAsList()
-    assert(results.size() == rowCount)
-
-    assert(!results.get(0).isNullAt(2))
-    assert(results.get(1).isNullAt(2))
-  }
-
-  @Test
-  def testTableFaultTolerantScan() {
-    kuduOptions = Map(
-      "kudu.table" -> tableName,
-      "kudu.master" -> harness.getMasterAddressesAsString,
-      "kudu.faultTolerantScan" -> "true")
-
-    val table = "faultTolerantScanTest"
-    sqlContext.read.options(kuduOptions).format("kudu").load.createOrReplaceTempView(table)
-    val results = sqlContext.sql(s"SELECT * FROM $table").collectAsList()
-    assert(results.size() == rowCount)
-
-    assert(!results.get(0).isNullAt(2))
-    assert(results.get(1).isNullAt(2))
-  }
-
-  @Test
-  def testTableScanWithProjection() {
-    assertEquals(10, sqlContext.sql(s"""SELECT key FROM $tableName""").count())
-  }
-
-  @Test
-  def testTableScanWithProjectionAndPredicateDouble() {
-    assertEquals(
-      rows.count { case (key, i, s, ts) => i > 5 },
-      sqlContext
-        .sql(s"""SELECT key, c3_double FROM $tableName where c3_double > "5.0"""")
-        .count())
-  }
-
-  @Test
-  def testTableScanWithProjectionAndPredicateLong() {
-    assertEquals(
-      rows.count { case (key, i, s, ts) => i > 5 },
-      sqlContext
-        .sql(s"""SELECT key, c4_long FROM $tableName where c4_long > "5"""")
-        .count())
-  }
-
-  @Test
-  def testTableScanWithProjectionAndPredicateBool() {
-    assertEquals(
-      rows.count { case (key, i, s, ts) => i % 2 == 0 },
-      sqlContext
-        .sql(s"""SELECT key, c5_bool FROM $tableName where c5_bool = true""")
-        .count())
-  }
-
-  @Test
-  def testTableScanWithProjectionAndPredicateShort() {
-    assertEquals(
-      rows.count { case (key, i, s, ts) => i > 5 },
-      sqlContext
-        .sql(s"""SELECT key, c6_short FROM $tableName where c6_short > 5""")
-        .count())
-
-  }
-
-  @Test
-  def testTableScanWithProjectionAndPredicateFloat() {
-    assertEquals(
-      rows.count { case (key, i, s, ts) => i > 5 },
-      sqlContext
-        .sql(s"""SELECT key, c7_float FROM $tableName where c7_float > 5""")
-        .count())
-
-  }
-
-  @Test
-  def testTableScanWithProjectionAndPredicateDecimal32() {
-    assertEquals(
-      rows.count { case (key, i, s, ts) => i > 5 },
-      sqlContext
-        .sql(s"""SELECT key, c11_decimal32 FROM $tableName where c11_decimal32 > 5""")
-        .count())
-  }
-
-  @Test
-  def testTableScanWithProjectionAndPredicateDecimal64() {
-    assertEquals(
-      rows.count { case (key, i, s, ts) => i > 5 },
-      sqlContext
-        .sql(s"""SELECT key, c12_decimal64 FROM $tableName where c12_decimal64 > 5""")
-        .count())
-  }
-
-  @Test
-  def testTableScanWithProjectionAndPredicateDecimal128() {
-    assertEquals(
-      rows.count { case (key, i, s, ts) => i > 5 },
-      sqlContext
-        .sql(s"""SELECT key, c13_decimal128 FROM $tableName where c13_decimal128 > 5""")
-        .count())
-  }
-
-  @Test
-  def testTableScanWithProjectionAndPredicate() {
-    assertEquals(
-      rows.count { case (key, i, s, ts) => s != null && s > "5" },
-      sqlContext
-        .sql(s"""SELECT key FROM $tableName where c2_s > "5"""")
-        .count())
-
-    assertEquals(
-      rows.count { case (key, i, s, ts) => s != null },
-      sqlContext
-        .sql(s"""SELECT key, c2_s FROM $tableName where c2_s IS NOT NULL""")
-        .count())
-  }
-
-  @Test
-  def testBasicSparkSQL() {
-    val results = sqlContext.sql("SELECT * FROM " + tableName).collectAsList()
-    assert(results.size() == rowCount)
-
-    assert(results.get(1).isNullAt(2))
-    assert(!results.get(0).isNullAt(2))
-  }
-
-  @Test
-  def testBasicSparkSQLWithProjection() {
-    val results = sqlContext.sql("SELECT key FROM " + tableName).collectAsList()
-    assert(results.size() == rowCount)
-    assert(results.get(0).size.equals(1))
-    assert(results.get(0).getInt(0).equals(0))
-  }
-
-  @Test
-  def testBasicSparkSQLWithPredicate() {
-    val results = sqlContext
-      .sql("SELECT key FROM " + tableName + " where key=1")
-      .collectAsList()
-    assert(results.size() == 1)
-    assert(results.get(0).size.equals(1))
-    assert(results.get(0).getInt(0).equals(1))
-
-  }
-
-  @Test
-  def testBasicSparkSQLWithTwoPredicates() {
-    val results = sqlContext
-      .sql("SELECT key FROM " + tableName + " where key=2 and c2_s='2'")
-      .collectAsList()
-    assert(results.size() == 1)
-    assert(results.get(0).size.equals(1))
-    assert(results.get(0).getInt(0).equals(2))
-  }
-
-  @Test
-  def testBasicSparkSQLWithInListPredicate() {
-    val keys = Array(1, 5, 7)
-    val results = sqlContext
-      .sql(s"SELECT key FROM $tableName where key in (${keys.mkString(", ")})")
-      .collectAsList()
-    assert(results.size() == keys.length)
-    keys.zipWithIndex.foreach {
-      case (v, i) =>
-        assert(results.get(i).size.equals(1))
-        assert(results.get(i).getInt(0).equals(v))
-    }
-  }
-
-  @Test
-  def testBasicSparkSQLWithInListPredicateOnString() {
-    val keys = Array(1, 4, 6)
-    val results = sqlContext
-      .sql(s"SELECT key FROM $tableName where c2_s in (${keys.mkString("'", "', '", "'")})")
-      .collectAsList()
-    assert(results.size() == keys.count(_ % 2 == 0))
-    keys.filter(_ % 2 == 0).zipWithIndex.foreach {
-      case (v, i) =>
-        assert(results.get(i).size.equals(1))
-        assert(results.get(i).getInt(0).equals(v))
-    }
-  }
-
-  @Test
-  def testBasicSparkSQLWithInListAndComparisonPredicate() {
-    val keys = Array(1, 5, 7)
-    val results = sqlContext
-      .sql(s"SELECT key FROM $tableName where key>2 and key in (${keys.mkString(", ")})")
-      .collectAsList()
-    assert(results.size() == keys.count(_ > 2))
-    keys.filter(_ > 2).zipWithIndex.foreach {
-      case (v, i) =>
-        assert(results.get(i).size.equals(1))
-        assert(results.get(i).getInt(0).equals(v))
-    }
-  }
-
-  @Test
-  def testBasicSparkSQLWithTwoPredicatesNegative() {
-    val results = sqlContext
-      .sql("SELECT key FROM " + tableName + " where key=1 and c2_s='2'")
-      .collectAsList()
-    assert(results.size() == 0)
-  }
-
-  @Test
-  def testBasicSparkSQLWithTwoPredicatesIncludingString() {
-    val results = sqlContext
-      .sql("SELECT key FROM " + tableName + " where c2_s='2'")
-      .collectAsList()
-    assert(results.size() == 1)
-    assert(results.get(0).size.equals(1))
-    assert(results.get(0).getInt(0).equals(2))
-  }
-
-  @Test
-  def testBasicSparkSQLWithTwoPredicatesAndProjection() {
-    val results = sqlContext
-      .sql("SELECT key, c2_s FROM " + tableName + " where c2_s='2'")
-      .collectAsList()
-    assert(results.size() == 1)
-    assert(results.get(0).size.equals(2))
-    assert(results.get(0).getInt(0).equals(2))
-    assert(results.get(0).getString(1).equals("2"))
-  }
-
-  @Test
-  def testBasicSparkSQLWithTwoPredicatesGreaterThan() {
-    val results = sqlContext
-      .sql("SELECT key, c2_s FROM " + tableName + " where c2_s>='2'")
-      .collectAsList()
-    assert(results.size() == 4)
-    assert(results.get(0).size.equals(2))
-    assert(results.get(0).getInt(0).equals(2))
-    assert(results.get(0).getString(1).equals("2"))
-  }
-
-  @Test
-  def testSparkSQLStringStartsWithFilters() {
-    // This test requires a special table.
-    val testTableName = "startswith"
-    val schema = new Schema(
-      List(new ColumnSchemaBuilder("key", Type.STRING).key(true).build()).asJava)
-    val tableOptions = new CreateTableOptions()
-      .setRangePartitionColumns(List("key").asJava)
-      .setNumReplicas(1)
-    val testTable = kuduClient.createTable(testTableName, schema, tableOptions)
-
-    val kuduSession = kuduClient.newSession()
-    val chars = List('a', 'b', '乕', Char.MaxValue, '\u0000')
-    val keys = for {
-      x <- chars
-      y <- chars
-      z <- chars
-      w <- chars
-    } yield Array(x, y, z, w).mkString
-    keys.foreach { key =>
-      val insert = testTable.newInsert
-      val row = insert.getRow
-      val r = Array(1, 2, 3)
-      row.addString(0, key)
-      kuduSession.apply(insert)
-    }
-    val options: Map[String, String] =
-      Map("kudu.table" -> testTableName, "kudu.master" -> harness.getMasterAddressesAsString)
-    sqlContext.read.options(options).format("kudu").load.createOrReplaceTempView(testTableName)
-
-    val checkPrefixCount = { prefix: String =>
-      val results = sqlContext.sql(s"SELECT key FROM $testTableName WHERE key LIKE '$prefix%'")
-      assertEquals(keys.count(k => k.startsWith(prefix)), results.count())
-    }
-    // empty string
-    checkPrefixCount("")
-    // one character
-    for (x <- chars) {
-      checkPrefixCount(Array(x).mkString)
-    }
-    // all two character combos
-    for {
-      x <- chars
-      y <- chars
-    } {
-      checkPrefixCount(Array(x, y).mkString)
-    }
-  }
-
-  @Test
-  def testSparkSQLIsNullPredicate() {
-    var results = sqlContext
-      .sql("SELECT key FROM " + tableName + " where c2_s IS NULL")
-      .collectAsList()
-    assert(results.size() == 5)
-
-    results = sqlContext
-      .sql("SELECT key FROM " + tableName + " where key IS NULL")
-      .collectAsList()
-    assert(results.isEmpty())
-  }
-
-  @Test
-  def testSparkSQLIsNotNullPredicate() {
-    var results = sqlContext
-      .sql("SELECT key FROM " + tableName + " where c2_s IS NOT NULL")
-      .collectAsList()
-    assert(results.size() == 5)
-
-    results = sqlContext
-      .sql("SELECT key FROM " + tableName + " where key IS NOT NULL")
-      .collectAsList()
-    assert(results.size() == 10)
-  }
-
-  @Test
-  def testSQLInsertInto() {
-    val insertTable = "insertintotest"
-
-    // read 0 rows just to get the schema
-    val df = sqlContext.sql(s"SELECT * FROM $tableName LIMIT 0")
-    kuduContext.createTable(
-      insertTable,
-      df.schema,
-      Seq("key"),
-      new CreateTableOptions()
-        .setRangePartitionColumns(List("key").asJava)
-        .setNumReplicas(1))
-
-    val newOptions: Map[String, String] =
-      Map("kudu.table" -> insertTable, "kudu.master" -> harness.getMasterAddressesAsString)
-    sqlContext.read
-      .options(newOptions)
-      .format("kudu")
-      .load
-      .createOrReplaceTempView(insertTable)
-
-    sqlContext.sql(s"INSERT INTO TABLE $insertTable SELECT * FROM $tableName")
-    val results =
-      sqlContext.sql(s"SELECT key FROM $insertTable").collectAsList()
-    assertEquals(10, results.size())
-  }
-
-  @Test
-  def testSQLInsertOverwriteUnsupported() {
-    val insertTable = "insertoverwritetest"
-
-    // read 0 rows just to get the schema
-    val df = sqlContext.sql(s"SELECT * FROM $tableName LIMIT 0")
-    kuduContext.createTable(
-      insertTable,
-      df.schema,
-      Seq("key"),
-      new CreateTableOptions()
-        .setRangePartitionColumns(List("key").asJava)
-        .setNumReplicas(1))
-
-    val newOptions: Map[String, String] =
-      Map("kudu.table" -> insertTable, "kudu.master" -> harness.getMasterAddressesAsString)
-    sqlContext.read
-      .options(newOptions)
-      .format("kudu")
-      .load
-      .createOrReplaceTempView(insertTable)
-
-    try {
-      sqlContext.sql(s"INSERT OVERWRITE TABLE $insertTable SELECT * FROM $tableName")
-      fail("insert overwrite should throw UnsupportedOperationException")
-    } catch {
-      case _: UnsupportedOperationException => // good
-      case NonFatal(_) =>
-        fail("insert overwrite should throw UnsupportedOperationException")
-    }
-  }
-
-  @Test
   def testWriteUsingDefaultSource() {
     val df = sqlContext.read.options(kuduOptions).format("kudu").load
 
@@ -935,22 +553,6 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
     }.getMessage should include("Unknown column: foo")
   }
 
-  @Test
-  def testScanLocality() {
-    kuduOptions = Map(
-      "kudu.table" -> tableName,
-      "kudu.master" -> harness.getMasterAddressesAsString,
-      "kudu.scanLocality" -> "closest_replica")
-
-    val table = "scanLocalityTest"
-    sqlContext.read.options(kuduOptions).format("kudu").load.createOrReplaceTempView(table)
-    val results = sqlContext.sql(s"SELECT * FROM $table").collectAsList()
-    assert(results.size() == rowCount)
-
-    assert(!results.get(0).isNullAt(2))
-    assert(results.get(1).isNullAt(2))
-  }
-
   // Verify that the propagated timestamp is properly updated inside
   // the same client.
   @Test
@@ -1000,18 +602,6 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
   }
 
   /**
-   * Assuming that the only part of the logical plan is a Kudu scan, this
-   * function extracts the KuduRelation from the passed DataFrame for
-   * testing purposes.
-   */
-  def kuduRelationFromDataFrame(dataFrame: DataFrame) = {
-    val logicalPlan = dataFrame.queryExecution.logical
-    val logicalRelation = logicalPlan.asInstanceOf[LogicalRelation]
-    val baseRelation = logicalRelation.relation
-    baseRelation.asInstanceOf[KuduRelation]
-  }
-
-  /**
    * Verify that the kudu.scanRequestTimeoutMs parameter is parsed by the
    * DefaultSource and makes it into the KuduRelation as a configuration
    * parameter.
@@ -1028,35 +618,6 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
   }
 
   @Test
-  @TabletServerConfig(
-    flags = Array(
-      "--flush_threshold_mb=1",
-      "--flush_threshold_secs=1",
-      // Disable rowset compact to prevent DRSs being merged because they are too small.
-      "--enable_rowset_compaction=false"
-    ))
-  def testScanWithKeyRange() {
-    upsertRowsWithRowDataSize(table, rowCount * 100, 32 * 1024)
-
-    // Wait for mrs flushed
-    Thread.sleep(5 * 1000)
-
-    kuduOptions = Map(
-      "kudu.table" -> tableName,
-      "kudu.master" -> harness.getMasterAddressesAsString,
-      "kudu.splitSizeBytes" -> "1024")
-
-    // count the number of tasks that end.
-    val actualNumTasks = withJobTaskCounter(ss.sparkContext) { () =>
-      val t = "scanWithKeyRangeTest"
-      sqlContext.read.options(kuduOptions).format("kudu").load.createOrReplaceTempView(t)
-      val results = sqlContext.sql(s"SELECT * FROM $t").collectAsList()
-      assertEquals(rowCount * 100, results.size())
-    }
-    assert(actualNumTasks > 2)
-  }
-
-  @Test
   @MasterServerConfig(
     flags = Array(
       "--mock_table_metrics_for_testing=true",
@@ -1068,62 +629,4 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
     val kuduRelation = kuduRelationFromDataFrame(dataFrame)
     assert(kuduRelation.sizeInBytes == 1024)
   }
-
-  @Test
-  @MasterServerConfig(
-    flags = Array(
-      "--mock_table_metrics_for_testing=true",
-      "--on_disk_size_for_testing=1024",
-      "--live_row_count_for_testing=100"
-    ))
-  def testJoinWithTableStatistics(): Unit = {
-    val df = sqlContext.read.options(kuduOptions).format("kudu").load
-
-    // 1. Create two tables.
-    val table1 = "table1"
-    kuduContext.createTable(
-      table1,
-      df.schema,
-      Seq("key"),
-      new CreateTableOptions()
-        .setRangePartitionColumns(List("key").asJava)
-        .setNumReplicas(1))
-    var options1: Map[String, String] =
-      Map("kudu.table" -> table1, "kudu.master" -> harness.getMasterAddressesAsString)
-    df.write.options(options1).mode("append").format("kudu").save
-    val df1 = sqlContext.read.options(options1).format("kudu").load
-    df1.createOrReplaceTempView(table1)
-
-    val table2 = "table2"
-    kuduContext.createTable(
-      table2,
-      df.schema,
-      Seq("key"),
-      new CreateTableOptions()
-        .setRangePartitionColumns(List("key").asJava)
-        .setNumReplicas(1))
-    var options2: Map[String, String] =
-      Map("kudu.table" -> table2, "kudu.master" -> harness.getMasterAddressesAsString)
-    df.write.options(options2).mode("append").format("kudu").save
-    val df2 = sqlContext.read.options(options2).format("kudu").load
-    df2.createOrReplaceTempView(table2)
-
-    // 2. Get the table statistics of each table and verify.
-    val relation1 = kuduRelationFromDataFrame(df1)
-    val relation2 = kuduRelationFromDataFrame(df2)
-    assert(relation1.sizeInBytes == relation2.sizeInBytes)
-    assert(relation1.sizeInBytes == 1024)
-
-    // 3. Test join with table size should be able to broadcast.
-    val sqlStr = s"SELECT * FROM $table1 JOIN $table2 ON $table1.key = $table2.key"
-    var physical = sqlContext.sql(sqlStr).queryExecution.sparkPlan
-    var operators = physical.collect {
-      case j: BroadcastHashJoinExec => j
-    }
-    assert(operators.size == 1)
-
-    // Verify result.
-    var results = sqlContext.sql(sqlStr).collectAsList()
-    assert(results.size() == rowCount)
-  }
 }
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala
index 54f0ffa..61e1069 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala
@@ -31,6 +31,8 @@ import org.apache.kudu.Schema
 import org.apache.kudu.Type
 import org.apache.kudu.test.KuduTestHarness
 import org.apache.kudu.util.DecimalUtil
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.SparkSession
 import org.junit.After
 import org.junit.Before
@@ -229,4 +231,16 @@ trait KuduTestSuite extends JUnitSuite {
     }
     rows
   }
+
+  /**
+   * Assuming that the only part of the logical plan is a Kudu scan, this
+   * function extracts the KuduRelation from the passed DataFrame for
+   * testing purposes.
+   */
+  def kuduRelationFromDataFrame(dataFrame: DataFrame) = {
+    val logicalPlan = dataFrame.queryExecution.logical
+    val logicalRelation = logicalPlan.asInstanceOf[LogicalRelation]
+    val baseRelation = logicalRelation.relation
+    baseRelation.asInstanceOf[KuduRelation]
+  }
 }
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/SparkSQLTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/SparkSQLTest.scala
new file mode 100644
index 0000000..a6aaf41
--- /dev/null
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/SparkSQLTest.scala
@@ -0,0 +1,534 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kudu.spark.kudu
+
+import scala.collection.JavaConverters._
+import scala.collection.immutable.IndexedSeq
+import scala.util.control.NonFatal
+import org.apache.spark.sql.SQLContext
+import org.junit.Assert._
+import org.scalatest.Matchers
+import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder
+import org.apache.kudu.client.CreateTableOptions
+import org.apache.kudu.Schema
+import org.apache.kudu.Type
+import org.apache.kudu.spark.kudu.SparkListenerUtil.withJobTaskCounter
+import org.apache.kudu.test.KuduTestHarness.MasterServerConfig
+import org.apache.kudu.test.KuduTestHarness.TabletServerConfig
+import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
+import org.junit.Before
+import org.junit.Test
+
+class SparkSQLTest extends KuduTestSuite with Matchers {
+  val rowCount = 10
+  var sqlContext: SQLContext = _
+  var rows: IndexedSeq[(Int, Int, String, Long)] = _
+  var kuduOptions: Map[String, String] = _
+
+  @Before
+  def setUp(): Unit = {
+    rows = insertRows(table, rowCount)
+
+    sqlContext = ss.sqlContext
+
+    kuduOptions =
+      Map("kudu.table" -> tableName, "kudu.master" -> harness.getMasterAddressesAsString)
+
+    sqlContext.read
+      .options(kuduOptions)
+      .format("kudu")
+      .load()
+      .createOrReplaceTempView(tableName)
+  }
+
+  @Test
+  def testBasicSparkSQL() {
+    val results = sqlContext.sql("SELECT * FROM " + tableName).collectAsList()
+    assert(results.size() == rowCount)
+
+    assert(results.get(1).isNullAt(2))
+    assert(!results.get(0).isNullAt(2))
+  }
+
+  @Test
+  def testBasicSparkSQLWithProjection() {
+    val results = sqlContext.sql("SELECT key FROM " + tableName).collectAsList()
+    assert(results.size() == rowCount)
+    assert(results.get(0).size.equals(1))
+    assert(results.get(0).getInt(0).equals(0))
+  }
+
+  @Test
+  def testBasicSparkSQLWithPredicate() {
+    val results = sqlContext
+      .sql("SELECT key FROM " + tableName + " where key=1")
+      .collectAsList()
+    assert(results.size() == 1)
+    assert(results.get(0).size.equals(1))
+    assert(results.get(0).getInt(0).equals(1))
+
+  }
+
+  @Test
+  def testBasicSparkSQLWithTwoPredicates() {
+    val results = sqlContext
+      .sql("SELECT key FROM " + tableName + " where key=2 and c2_s='2'")
+      .collectAsList()
+    assert(results.size() == 1)
+    assert(results.get(0).size.equals(1))
+    assert(results.get(0).getInt(0).equals(2))
+  }
+
+  @Test
+  def testBasicSparkSQLWithInListPredicate() {
+    val keys = Array(1, 5, 7)
+    val results = sqlContext
+      .sql(s"SELECT key FROM $tableName where key in (${keys.mkString(", ")})")
+      .collectAsList()
+    assert(results.size() == keys.length)
+    keys.zipWithIndex.foreach {
+      case (v, i) =>
+        assert(results.get(i).size.equals(1))
+        assert(results.get(i).getInt(0).equals(v))
+    }
+  }
+
+  @Test
+  def testBasicSparkSQLWithInListPredicateOnString() {
+    val keys = Array(1, 4, 6)
+    val results = sqlContext
+      .sql(s"SELECT key FROM $tableName where c2_s in (${keys.mkString("'", "', '", "'")})")
+      .collectAsList()
+    assert(results.size() == keys.count(_ % 2 == 0))
+    keys.filter(_ % 2 == 0).zipWithIndex.foreach {
+      case (v, i) =>
+        assert(results.get(i).size.equals(1))
+        assert(results.get(i).getInt(0).equals(v))
+    }
+  }
+
+  @Test
+  def testBasicSparkSQLWithInListAndComparisonPredicate() {
+    val keys = Array(1, 5, 7)
+    val results = sqlContext
+      .sql(s"SELECT key FROM $tableName where key>2 and key in (${keys.mkString(", ")})")
+      .collectAsList()
+    assert(results.size() == keys.count(_ > 2))
+    keys.filter(_ > 2).zipWithIndex.foreach {
+      case (v, i) =>
+        assert(results.get(i).size.equals(1))
+        assert(results.get(i).getInt(0).equals(v))
+    }
+  }
+
+  @Test
+  def testBasicSparkSQLWithTwoPredicatesNegative() {
+    val results = sqlContext
+      .sql("SELECT key FROM " + tableName + " where key=1 and c2_s='2'")
+      .collectAsList()
+    assert(results.size() == 0)
+  }
+
+  @Test
+  def testBasicSparkSQLWithTwoPredicatesIncludingString() {
+    val results = sqlContext
+      .sql("SELECT key FROM " + tableName + " where c2_s='2'")
+      .collectAsList()
+    assert(results.size() == 1)
+    assert(results.get(0).size.equals(1))
+    assert(results.get(0).getInt(0).equals(2))
+  }
+
+  @Test
+  def testBasicSparkSQLWithTwoPredicatesAndProjection() {
+    val results = sqlContext
+      .sql("SELECT key, c2_s FROM " + tableName + " where c2_s='2'")
+      .collectAsList()
+    assert(results.size() == 1)
+    assert(results.get(0).size.equals(2))
+    assert(results.get(0).getInt(0).equals(2))
+    assert(results.get(0).getString(1).equals("2"))
+  }
+
+  @Test
+  def testBasicSparkSQLWithTwoPredicatesGreaterThan() {
+    val results = sqlContext
+      .sql("SELECT key, c2_s FROM " + tableName + " where c2_s>='2'")
+      .collectAsList()
+    assert(results.size() == 4)
+    assert(results.get(0).size.equals(2))
+    assert(results.get(0).getInt(0).equals(2))
+    assert(results.get(0).getString(1).equals("2"))
+  }
+
+  @Test
+  def testSparkSQLStringStartsWithFilters() {
+    // This test requires a special table.
+    val testTableName = "startswith"
+    val schema = new Schema(
+      List(new ColumnSchemaBuilder("key", Type.STRING).key(true).build()).asJava)
+    val tableOptions = new CreateTableOptions()
+      .setRangePartitionColumns(List("key").asJava)
+      .setNumReplicas(1)
+    val testTable = kuduClient.createTable(testTableName, schema, tableOptions)
+
+    val kuduSession = kuduClient.newSession()
+    val chars = List('a', 'b', '乕', Char.MaxValue, '\u0000')
+    val keys = for {
+      x <- chars
+      y <- chars
+      z <- chars
+      w <- chars
+    } yield Array(x, y, z, w).mkString
+    keys.foreach { key =>
+      val insert = testTable.newInsert
+      val row = insert.getRow
+      val r = Array(1, 2, 3)
+      row.addString(0, key)
+      kuduSession.apply(insert)
+    }
+    val options: Map[String, String] =
+      Map("kudu.table" -> testTableName, "kudu.master" -> harness.getMasterAddressesAsString)
+    sqlContext.read.options(options).format("kudu").load.createOrReplaceTempView(testTableName)
+
+    val checkPrefixCount = { prefix: String =>
+      val results = sqlContext.sql(s"SELECT key FROM $testTableName WHERE key LIKE '$prefix%'")
+      assertEquals(keys.count(k => k.startsWith(prefix)), results.count())
+    }
+    // empty string
+    checkPrefixCount("")
+    // one character
+    for (x <- chars) {
+      checkPrefixCount(Array(x).mkString)
+    }
+    // all two character combos
+    for {
+      x <- chars
+      y <- chars
+    } {
+      checkPrefixCount(Array(x, y).mkString)
+    }
+  }
+
+  @Test
+  def testSparkSQLIsNullPredicate() {
+    var results = sqlContext
+      .sql("SELECT key FROM " + tableName + " where c2_s IS NULL")
+      .collectAsList()
+    assert(results.size() == 5)
+
+    results = sqlContext
+      .sql("SELECT key FROM " + tableName + " where key IS NULL")
+      .collectAsList()
+    assert(results.isEmpty())
+  }
+
+  @Test
+  def testSparkSQLIsNotNullPredicate() {
+    var results = sqlContext
+      .sql("SELECT key FROM " + tableName + " where c2_s IS NOT NULL")
+      .collectAsList()
+    assert(results.size() == 5)
+
+    results = sqlContext
+      .sql("SELECT key FROM " + tableName + " where key IS NOT NULL")
+      .collectAsList()
+    assert(results.size() == 10)
+  }
+
+  @Test
+  def testSQLInsertInto() {
+    val insertTable = "insertintotest"
+
+    // read 0 rows just to get the schema
+    val df = sqlContext.sql(s"SELECT * FROM $tableName LIMIT 0")
+    kuduContext.createTable(
+      insertTable,
+      df.schema,
+      Seq("key"),
+      new CreateTableOptions()
+        .setRangePartitionColumns(List("key").asJava)
+        .setNumReplicas(1))
+
+    val newOptions: Map[String, String] =
+      Map("kudu.table" -> insertTable, "kudu.master" -> harness.getMasterAddressesAsString)
+    sqlContext.read
+      .options(newOptions)
+      .format("kudu")
+      .load
+      .createOrReplaceTempView(insertTable)
+
+    sqlContext.sql(s"INSERT INTO TABLE $insertTable SELECT * FROM $tableName")
+    val results =
+      sqlContext.sql(s"SELECT key FROM $insertTable").collectAsList()
+    assertEquals(10, results.size())
+  }
+
+  @Test
+  def testSQLInsertOverwriteUnsupported() {
+    val insertTable = "insertoverwritetest"
+
+    // read 0 rows just to get the schema
+    val df = sqlContext.sql(s"SELECT * FROM $tableName LIMIT 0")
+    kuduContext.createTable(
+      insertTable,
+      df.schema,
+      Seq("key"),
+      new CreateTableOptions()
+        .setRangePartitionColumns(List("key").asJava)
+        .setNumReplicas(1))
+
+    val newOptions: Map[String, String] =
+      Map("kudu.table" -> insertTable, "kudu.master" -> harness.getMasterAddressesAsString)
+    sqlContext.read
+      .options(newOptions)
+      .format("kudu")
+      .load
+      .createOrReplaceTempView(insertTable)
+
+    try {
+      sqlContext.sql(s"INSERT OVERWRITE TABLE $insertTable SELECT * FROM $tableName")
+      fail("insert overwrite should throw UnsupportedOperationException")
+    } catch {
+      case _: UnsupportedOperationException => // good
+      case NonFatal(_) =>
+        fail("insert overwrite should throw UnsupportedOperationException")
+    }
+  }
+
+  @Test
+  def testTableScanWithProjection() {
+    assertEquals(10, sqlContext.sql(s"""SELECT key FROM $tableName""").count())
+  }
+
+  @Test
+  def testTableScanWithProjectionAndPredicateDouble() {
+    assertEquals(
+      rows.count { case (key, i, s, ts) => i > 5 },
+      sqlContext
+        .sql(s"""SELECT key, c3_double FROM $tableName where c3_double > "5.0"""")
+        .count())
+  }
+
+  @Test
+  def testTableScanWithProjectionAndPredicateLong() {
+    assertEquals(
+      rows.count { case (key, i, s, ts) => i > 5 },
+      sqlContext
+        .sql(s"""SELECT key, c4_long FROM $tableName where c4_long > "5"""")
+        .count())
+  }
+
+  @Test
+  def testTableScanWithProjectionAndPredicateBool() {
+    assertEquals(
+      rows.count { case (key, i, s, ts) => i % 2 == 0 },
+      sqlContext
+        .sql(s"""SELECT key, c5_bool FROM $tableName where c5_bool = true""")
+        .count())
+  }
+
+  @Test
+  def testTableScanWithProjectionAndPredicateShort() {
+    assertEquals(
+      rows.count { case (key, i, s, ts) => i > 5 },
+      sqlContext
+        .sql(s"""SELECT key, c6_short FROM $tableName where c6_short > 5""")
+        .count())
+
+  }
+
+  @Test
+  def testTableScanWithProjectionAndPredicateFloat() {
+    assertEquals(
+      rows.count { case (key, i, s, ts) => i > 5 },
+      sqlContext
+        .sql(s"""SELECT key, c7_float FROM $tableName where c7_float > 5""")
+        .count())
+
+  }
+
+  @Test
+  def testTableScanWithProjectionAndPredicateDecimal32() {
+    assertEquals(
+      rows.count { case (key, i, s, ts) => i > 5 },
+      sqlContext
+        .sql(s"""SELECT key, c11_decimal32 FROM $tableName where c11_decimal32 > 5""")
+        .count())
+  }
+
+  @Test
+  def testTableScanWithProjectionAndPredicateDecimal64() {
+    assertEquals(
+      rows.count { case (key, i, s, ts) => i > 5 },
+      sqlContext
+        .sql(s"""SELECT key, c12_decimal64 FROM $tableName where c12_decimal64 > 5""")
+        .count())
+  }
+
+  @Test
+  def testTableScanWithProjectionAndPredicateDecimal128() {
+    assertEquals(
+      rows.count { case (key, i, s, ts) => i > 5 },
+      sqlContext
+        .sql(s"""SELECT key, c13_decimal128 FROM $tableName where c13_decimal128 > 5""")
+        .count())
+  }
+
+  @Test
+  def testTableScanWithProjectionAndPredicate() {
+    assertEquals(
+      rows.count { case (key, i, s, ts) => s != null && s > "5" },
+      sqlContext
+        .sql(s"""SELECT key FROM $tableName where c2_s > "5"""")
+        .count())
+
+    assertEquals(
+      rows.count { case (key, i, s, ts) => s != null },
+      sqlContext
+        .sql(s"""SELECT key, c2_s FROM $tableName where c2_s IS NOT NULL""")
+        .count())
+  }
+
+  @Test
+  def testScanLocality() {
+    kuduOptions = Map(
+      "kudu.table" -> tableName,
+      "kudu.master" -> harness.getMasterAddressesAsString,
+      "kudu.scanLocality" -> "closest_replica")
+
+    val table = "scanLocalityTest"
+    sqlContext.read.options(kuduOptions).format("kudu").load.createOrReplaceTempView(table)
+    val results = sqlContext.sql(s"SELECT * FROM $table").collectAsList()
+    assert(results.size() == rowCount)
+
+    assert(!results.get(0).isNullAt(2))
+    assert(results.get(1).isNullAt(2))
+  }
+
+  @Test
+  def testTableNonFaultTolerantScan() {
+    val results = sqlContext.sql(s"SELECT * FROM $tableName").collectAsList()
+    assert(results.size() == rowCount)
+
+    assert(!results.get(0).isNullAt(2))
+    assert(results.get(1).isNullAt(2))
+  }
+
+  @Test
+  def testTableFaultTolerantScan() {
+    kuduOptions = Map(
+      "kudu.table" -> tableName,
+      "kudu.master" -> harness.getMasterAddressesAsString,
+      "kudu.faultTolerantScan" -> "true")
+
+    val table = "faultTolerantScanTest"
+    sqlContext.read.options(kuduOptions).format("kudu").load.createOrReplaceTempView(table)
+    val results = sqlContext.sql(s"SELECT * FROM $table").collectAsList()
+    assert(results.size() == rowCount)
+
+    assert(!results.get(0).isNullAt(2))
+    assert(results.get(1).isNullAt(2))
+  }
+
+  @Test
+  @TabletServerConfig(
+    flags = Array(
+      "--flush_threshold_mb=1",
+      "--flush_threshold_secs=1",
+      // Disable rowset compact to prevent DRSs being merged because they are too small.
+      "--enable_rowset_compaction=false"
+    ))
+  def testScanWithKeyRange() {
+    upsertRowsWithRowDataSize(table, rowCount * 100, 32 * 1024)
+
+    // Wait for mrs flushed
+    Thread.sleep(5 * 1000)
+
+    kuduOptions = Map(
+      "kudu.table" -> tableName,
+      "kudu.master" -> harness.getMasterAddressesAsString,
+      "kudu.splitSizeBytes" -> "1024")
+
+    // count the number of tasks that end.
+    val actualNumTasks = withJobTaskCounter(ss.sparkContext) { () =>
+      val t = "scanWithKeyRangeTest"
+      sqlContext.read.options(kuduOptions).format("kudu").load.createOrReplaceTempView(t)
+      val results = sqlContext.sql(s"SELECT * FROM $t").collectAsList()
+      assertEquals(rowCount * 100, results.size())
+    }
+    assert(actualNumTasks > 2)
+  }
+
+  @Test
+  @MasterServerConfig(
+    flags = Array(
+      "--mock_table_metrics_for_testing=true",
+      "--on_disk_size_for_testing=1024",
+      "--live_row_count_for_testing=100"
+    ))
+  def testJoinWithTableStatistics(): Unit = {
+    val df = sqlContext.read.options(kuduOptions).format("kudu").load
+
+    // 1. Create two tables.
+    val table1 = "table1"
+    kuduContext.createTable(
+      table1,
+      df.schema,
+      Seq("key"),
+      new CreateTableOptions()
+        .setRangePartitionColumns(List("key").asJava)
+        .setNumReplicas(1))
+    var options1: Map[String, String] =
+      Map("kudu.table" -> table1, "kudu.master" -> harness.getMasterAddressesAsString)
+    df.write.options(options1).mode("append").format("kudu").save
+    val df1 = sqlContext.read.options(options1).format("kudu").load
+    df1.createOrReplaceTempView(table1)
+
+    val table2 = "table2"
+    kuduContext.createTable(
+      table2,
+      df.schema,
+      Seq("key"),
+      new CreateTableOptions()
+        .setRangePartitionColumns(List("key").asJava)
+        .setNumReplicas(1))
+    var options2: Map[String, String] =
+      Map("kudu.table" -> table2, "kudu.master" -> harness.getMasterAddressesAsString)
+    df.write.options(options2).mode("append").format("kudu").save
+    val df2 = sqlContext.read.options(options2).format("kudu").load
+    df2.createOrReplaceTempView(table2)
+
+    // 2. Get the table statistics of each table and verify.
+    val relation1 = kuduRelationFromDataFrame(df1)
+    val relation2 = kuduRelationFromDataFrame(df2)
+    assert(relation1.sizeInBytes == relation2.sizeInBytes)
+    assert(relation1.sizeInBytes == 1024)
+
+    // 3. Test join with table size should be able to broadcast.
+    val sqlStr = s"SELECT * FROM $table1 JOIN $table2 ON $table1.key = $table2.key"
+    var physical = sqlContext.sql(sqlStr).queryExecution.sparkPlan
+    var operators = physical.collect {
+      case j: BroadcastHashJoinExec => j
+    }
+    assert(operators.size == 1)
+
+    // Verify result.
+    var results = sqlContext.sql(sqlStr).collectAsList()
+    assert(results.size() == rowCount)
+  }
+}


[kudu] 02/03: webserver: enable HTTP keep-alive

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 5ca01ce0ccc3f4d15b8d9aff41dd5c6bd00ec1a7
Author: Adar Dembo <ad...@cloudera.com>
AuthorDate: Mon Oct 14 18:18:57 2019 -0700

    webserver: enable HTTP keep-alive
    
    This is a port of IMPALA-8869. The motivation is, to quote from IMPALA-8869:
    "...we mishandle HTTP keep-alive semantics when returning a 401 because we
    close the connection but don't return a 'Connection: close' header, even
    though we're using HTTP/1.1 where keep-alive is assumed, which can cause
    clients to incorrectly believe that the connection has remained open."
    
    More tangibly, this leads to issues when proxying via Apache Knox.
    Specifically, every other request fails because the browser expected its
    connection to remain open, but Kudu closed it.
    
    The main challenge is that now we need to buffer up all parts of the
    response to avoid triggering a combination of Nagle's algorithm and TCP
    delayed acks, which add ~40ms to the RTT. To be safe, I refactored all
    response handling to go through a single function. Since I was knee deep in
    refactoring, I took the opportunity to fix the "SPNEGO header not returned
    with 200 response" issue.
    
    Change-Id: Ic08ef5a268fdf6dea6a8c428b4ab8dac27418dd6
    Reviewed-on: http://gerrit.cloudera.org:8080/14440
    Tested-by: Adar Dembo <ad...@cloudera.com>
    Reviewed-by: Thomas Tauber-Marshall <tm...@cloudera.com>
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
---
 src/kudu/master/master_path_handlers.cc   |  10 +-
 src/kudu/server/default_path_handlers.cc  |  14 +-
 src/kudu/server/pprof_path_handlers.cc    |  18 +--
 src/kudu/server/rpcz-path-handler.cc      |   2 +-
 src/kudu/server/tracing_path_handlers.cc  |   4 +-
 src/kudu/server/webserver-test.cc         |  21 +++
 src/kudu/server/webserver.cc              | 231 ++++++++++++++----------------
 src/kudu/server/webserver.h               |  14 +-
 src/kudu/tserver/tserver_path_handlers.cc |  30 ++--
 src/kudu/util/curl_util.cc                |  12 +-
 src/kudu/util/curl_util.h                 |   7 +
 src/kudu/util/thread.cc                   |   2 +-
 src/kudu/util/web_callback_registry.h     |   9 +-
 13 files changed, 200 insertions(+), 174 deletions(-)

diff --git a/src/kudu/master/master_path_handlers.cc b/src/kudu/master/master_path_handlers.cc
index f261312..a92315d 100644
--- a/src/kudu/master/master_path_handlers.cc
+++ b/src/kudu/master/master_path_handlers.cc
@@ -111,7 +111,7 @@ MasterPathHandlers::~MasterPathHandlers() {
 
 void MasterPathHandlers::HandleTabletServers(const Webserver::WebRequest& /*req*/,
                                              Webserver::WebResponse* resp) {
-  EasyJson* output = resp->output;
+  EasyJson* output = &resp->output;
   vector<shared_ptr<TSDescriptor>> descs;
   master_->ts_manager()->GetAllDescriptors(&descs);
 
@@ -234,7 +234,7 @@ int ExtractRedirectsFromRequest(const Webserver::WebRequest& req) {
 
 void MasterPathHandlers::HandleCatalogManager(const Webserver::WebRequest& req,
                                               Webserver::WebResponse* resp) {
-  EasyJson* output = resp->output;
+  EasyJson* output = &resp->output;
   CatalogManager::ScopedLeaderSharedLock l(master_->catalog_manager());
   if (!l.catalog_status().ok()) {
     (*output)["error"] = Substitute("Master is not ready: $0",  l.catalog_status().ToString());
@@ -306,7 +306,7 @@ bool CompareByRole(const pair<TabletDetailPeerInfo, RaftPeerPB::Role>& a,
 
 void MasterPathHandlers::HandleTablePage(const Webserver::WebRequest& req,
                                          Webserver::WebResponse* resp) {
-  EasyJson* output = resp->output;
+  EasyJson* output = &resp->output;
   // Parse argument.
   string table_id;
   if (!FindCopy(req.parsed_args, "id", &table_id)) {
@@ -542,7 +542,7 @@ void MasterPathHandlers::HandleTablePage(const Webserver::WebRequest& req,
 
 void MasterPathHandlers::HandleMasters(const Webserver::WebRequest& /*req*/,
                                        Webserver::WebResponse* resp) {
-  EasyJson* output = resp->output;
+  EasyJson* output = &resp->output;
   vector<ServerEntryPB> masters;
   Status s = master_->ListMasters(&masters);
   if (!s.ok()) {
@@ -688,7 +688,7 @@ void JsonError(const Status& s, ostringstream* out) {
 
 void MasterPathHandlers::HandleDumpEntities(const Webserver::WebRequest& /*req*/,
                                             Webserver::PrerenderedWebResponse* resp) {
-  ostringstream* output = resp->output;
+  ostringstream* output = &resp->output;
   Status s = master_->catalog_manager()->CheckOnline();
   if (!s.ok()) {
     JsonError(s, output);
diff --git a/src/kudu/server/default_path_handlers.cc b/src/kudu/server/default_path_handlers.cc
index 0397144..b139bd9 100644
--- a/src/kudu/server/default_path_handlers.cc
+++ b/src/kudu/server/default_path_handlers.cc
@@ -113,7 +113,7 @@ struct Tags {
 // Writes the last FLAGS_web_log_bytes of the INFO logfile to a webpage
 // Note to get best performance, set GLOG_logbuflevel=-1 to prevent log buffering
 static void LogsHandler(const Webserver::WebRequest& req, Webserver::WebResponse* resp) {
-  EasyJson* output = resp->output;
+  EasyJson* output = &resp->output;
   (*output)["raw"] = (req.parsed_args.find("raw") != req.parsed_args.end());
   string logfile;
   GetFullLogFilename(google::INFO, &logfile);
@@ -141,7 +141,7 @@ static void LogsHandler(const Webserver::WebRequest& req, Webserver::WebResponse
 // escaped if in the raw text mode, e.g. "/varz?raw".
 static void FlagsHandler(const Webserver::WebRequest& req,
                          Webserver::PrerenderedWebResponse* resp) {
-  ostringstream* output = resp->output;
+  ostringstream* output = &resp->output;
   bool as_text = (req.parsed_args.find("raw") != req.parsed_args.end());
   Tags tags(as_text);
 
@@ -156,7 +156,7 @@ static void FlagsHandler(const Webserver::WebRequest& req,
 // Prints out the current stack trace of all threads in the process.
 static void StacksHandler(const Webserver::WebRequest& /*req*/,
                           Webserver::PrerenderedWebResponse* resp) {
-  ostringstream* output = resp->output;
+  ostringstream* output = &resp->output;
 
   StackTraceSnapshot snap;
   auto start = MonoTime::Now();
@@ -189,7 +189,7 @@ static void StacksHandler(const Webserver::WebRequest& /*req*/,
 // Registered to handle "/memz", and prints out memory allocation statistics.
 static void MemUsageHandler(const Webserver::WebRequest& req,
                             Webserver::PrerenderedWebResponse* resp) {
-  ostringstream* output = resp->output;
+  ostringstream* output = &resp->output;
   bool as_text = (req.parsed_args.find("raw") != req.parsed_args.end());
   Tags tags(as_text);
 
@@ -210,7 +210,7 @@ static void MemUsageHandler(const Webserver::WebRequest& req,
 // Registered to handle "/mem-trackers", and prints out memory tracker information.
 static void MemTrackersHandler(const Webserver::WebRequest& /*req*/,
                                Webserver::PrerenderedWebResponse* resp) {
-  ostringstream* output = resp->output;
+  ostringstream* output = &resp->output;
   int64_t current_consumption = process_memory::CurrentConsumption();
   int64_t hard_limit = process_memory::HardLimit();
   *output << "<h1>Process memory usage</h1>\n";
@@ -269,7 +269,7 @@ static void MemTrackersHandler(const Webserver::WebRequest& /*req*/,
 
 static void ConfigurationHandler(const Webserver::WebRequest& /* req */,
                                  Webserver::WebResponse* resp) {
-  EasyJson* output = resp->output;
+  EasyJson* output = &resp->output;
   EasyJson security_configs = output->Set("security_configs", EasyJson::kArray);
 
   EasyJson rpc_encryption = security_configs.PushBack(EasyJson::kObject);
@@ -370,7 +370,7 @@ static void WriteMetricsAsJson(const MetricRegistry* const metrics,
     resp->status_code = HttpStatusCode::BadRequest;
     WARN_NOT_OK(Status::InvalidArgument(""), "The parameter of 'attributes' is wrong");
   } else {
-    JsonWriter writer(resp->output, json_mode);
+    JsonWriter writer(&resp->output, json_mode);
     WARN_NOT_OK(metrics->WriteAsJson(&writer, opts), "Couldn't write JSON metrics over HTTP");
   }
 }
diff --git a/src/kudu/server/pprof_path_handlers.cc b/src/kudu/server/pprof_path_handlers.cc
index 9d725d0..971eb29 100644
--- a/src/kudu/server/pprof_path_handlers.cc
+++ b/src/kudu/server/pprof_path_handlers.cc
@@ -80,15 +80,15 @@ static void PprofCmdLineHandler(const Webserver::WebRequest& /*req*/,
   string executable_path;
   Env* env = Env::Default();
   WARN_NOT_OK(env->GetExecutablePath(&executable_path), "Failed to get executable path");
-  *resp->output << executable_path;
+  resp->output << executable_path;
 }
 
 // pprof asks for the url /pprof/heap to get heap information. This should be implemented
 // by calling HeapProfileStart(filename), continue to do work, and then, some number of
 // seconds later, call GetHeapProfile() followed by HeapProfilerStop().
-static void PprofHeapHandler(const Webserver::WebRequest& req,
+static void PprofHeapHandler(const Webserver::WebRequest& /*req*/,
                              Webserver::PrerenderedWebResponse* resp) {
-  ostringstream* output = resp->output;
+  ostringstream* output = &resp->output;
 #ifndef TCMALLOC_ENABLED
   *output << "%warn Heap profiling is not available without tcmalloc.\n";
 #else
@@ -125,7 +125,7 @@ static void PprofHeapHandler(const Webserver::WebRequest& req,
 // and then, XX seconds later, calling ProfilerStop().
 static void PprofCpuProfileHandler(const Webserver::WebRequest& req,
                                    Webserver::PrerenderedWebResponse* resp) {
-  ostringstream* output = resp->output;
+  ostringstream* output = &resp->output;
 #ifndef TCMALLOC_ENABLED
   *output << "%warn CPU profiling is not available without tcmalloc.\n";
 #else
@@ -155,11 +155,11 @@ static void PprofCpuProfileHandler(const Webserver::WebRequest& req,
 static void PprofGrowthHandler(const Webserver::WebRequest& /*req*/,
                                Webserver::PrerenderedWebResponse* resp) {
 #ifndef TCMALLOC_ENABLED
-  *resp->output << "%warn Growth profiling is not available without tcmalloc.\n";
+  resp->output << "%warn Growth profiling is not available without tcmalloc.\n";
 #else
   string heap_growth_stack;
   MallocExtension::instance()->GetHeapGrowthStacks(&heap_growth_stack);
-  *resp->output << heap_growth_stack;
+  resp->output << heap_growth_stack;
 #endif
 }
 
@@ -180,7 +180,7 @@ static void PprofContentionHandler(const Webserver::WebRequest& req,
   StopSynchronizationProfiling();
   FlushSynchronizationProfile(&profile, &discarded_samples);
 
-  ostringstream* output = resp->output;
+  ostringstream* output = &resp->output;
   *output << "--- contention:" << endl;
   *output << "sampling period = 1" << endl;
   *output << "cycles/second = " << static_cast<int64_t>(base::CyclesPerSecond()) << endl;
@@ -220,7 +220,7 @@ static void PprofSymbolHandler(const Webserver::WebRequest& req,
   if (req.request_method == "GET") {
     // Per the above comment, pprof doesn't expect to know the actual number of symbols.
     // Any non-zero value indicates that we support symbol lookup.
-    *resp->output << "num_symbols: 1";
+    resp->output << "num_symbols: 1";
     return;
   }
 
@@ -242,7 +242,7 @@ static void PprofSymbolHandler(const Webserver::WebRequest& req,
     }
     char symbol_buf[1024];
     if (google::Symbolize(reinterpret_cast<void*>(addr), symbol_buf, sizeof(symbol_buf))) {
-      *resp->output << p << "\t" << symbol_buf << std::endl;
+      resp->output << p << "\t" << symbol_buf << std::endl;
     } else {
       missing_symbols++;
     }
diff --git a/src/kudu/server/rpcz-path-handler.cc b/src/kudu/server/rpcz-path-handler.cc
index 97f8faa..1313aaf 100644
--- a/src/kudu/server/rpcz-path-handler.cc
+++ b/src/kudu/server/rpcz-path-handler.cc
@@ -64,7 +64,7 @@ void RpczPathHandler(const shared_ptr<Messenger>& messenger,
     messenger->rpcz_store()->DumpPB(dump_req, &sampled_rpcs);
   }
 
-  JsonWriter writer(resp->output, JsonWriter::PRETTY);
+  JsonWriter writer(&resp->output, JsonWriter::PRETTY);
   writer.StartObject();
   writer.String("running");
   writer.Protobuf(running_rpcs);
diff --git a/src/kudu/server/tracing_path_handlers.cc b/src/kudu/server/tracing_path_handlers.cc
index 51fb343..a7eed20 100644
--- a/src/kudu/server/tracing_path_handlers.cc
+++ b/src/kudu/server/tracing_path_handlers.cc
@@ -245,14 +245,14 @@ Status DoHandleRequest(Handler handler,
 void HandleRequest(Handler handler,
                    const Webserver::WebRequest& req,
                    Webserver::PrerenderedWebResponse* resp) {
-  Status s = DoHandleRequest(handler, req, resp->output);
+  Status s = DoHandleRequest(handler, req, &resp->output);
   if (!s.ok()) {
     LOG(WARNING) << "Tracing error for handler " << handler << ": "
                  << s.ToString();
     // The trace-viewer JS expects '##ERROR##' to indicate that an error
     // occurred. TODO: change the JS to bubble up the actual error message
     // to the user.
-    *resp->output << "##ERROR##";
+    resp->output << "##ERROR##";
   }
 }
 } // anonymous namespace
diff --git a/src/kudu/server/webserver-test.cc b/src/kudu/server/webserver-test.cc
index 82ee16b..9e02cdf 100644
--- a/src/kudu/server/webserver-test.cc
+++ b/src/kudu/server/webserver-test.cc
@@ -94,6 +94,7 @@ class WebserverTest : public KuduTest {
     WebserverOptions opts;
     opts.port = 0;
     opts.doc_root = static_dir_;
+    opts.enable_doc_root = enable_doc_root();
     if (use_ssl()) SetSslOptions(&opts);
     if (use_htpasswd()) SetHTPasswdOptions(&opts);
     MaybeSetupSpnego(&opts);
@@ -122,6 +123,7 @@ class WebserverTest : public KuduTest {
   virtual void MaybeSetupSpnego(WebserverOptions* /*opts*/) {}
 
   // Overridden by subclasses.
+  virtual bool enable_doc_root() const { return true; }
   virtual bool use_ssl() const { return false; }
   virtual bool use_htpasswd() const { return false; }
 
@@ -463,11 +465,30 @@ TEST_F(WebserverTest, TestStaticFiles) {
   ASSERT_EQ("Remote error: HTTP 403", s.ToString());
 }
 
+class DisabledDocRootWebserverTest : public WebserverTest {
+ protected:
+  bool enable_doc_root() const override { return false; }
+};
+
+TEST_F(DisabledDocRootWebserverTest, TestHandlerNotFound) {
+  Status s = curl_.FetchURL(Substitute("$0/foo", url_), &buf_);
+  ASSERT_EQ("Remote error: HTTP 404", s.ToString());
+  ASSERT_STR_CONTAINS(buf_.ToString(), "No handler for URI /foo");
+}
+
 // Test that HTTP OPTIONS requests are permitted.
 TEST_F(WebserverTest, TestHttpOptions) {
   NO_FATALS(RunTestOptions());
 }
 
+// Test that we're able to reuse connections for subsequent fetches.
+TEST_F(WebserverTest, TestConnectionReuse) {
+  ASSERT_OK(curl_.FetchURL(url_, &buf_));
+  ASSERT_EQ(1, curl_.num_connects());
+  ASSERT_OK(curl_.FetchURL(url_, &buf_));
+  ASSERT_EQ(0, curl_.num_connects());
+}
+
 class WebserverAdvertisedAddressesTest : public KuduTest {
  public:
   void SetUp() override {
diff --git a/src/kudu/server/webserver.cc b/src/kudu/server/webserver.cc
index 5efcf1d..4a7684e 100644
--- a/src/kudu/server/webserver.cc
+++ b/src/kudu/server/webserver.cc
@@ -72,6 +72,7 @@ using mustache::RenderTemplate;
 using std::ostringstream;
 using std::stringstream;
 using std::string;
+using std::unordered_set;
 using std::vector;
 using strings::Substitute;
 
@@ -102,6 +103,8 @@ string HttpStatusCodeToString(kudu::HttpStatusCode code) {
       return "200 OK";
     case kudu::HttpStatusCode::BadRequest:
       return "400 Bad Request";
+    case kudu::HttpStatusCode::AuthenticationRequired:
+      return "401 Authentication Required";
     case kudu::HttpStatusCode::NotFound:
       return "404 Not Found";
     case kudu::HttpStatusCode::LengthRequired:
@@ -116,19 +119,6 @@ string HttpStatusCodeToString(kudu::HttpStatusCode code) {
   LOG(FATAL) << "Unexpected HTTP response code";
 }
 
-void SendPlainResponse(struct sq_connection* connection,
-                       const string& response_code_line,
-                       const string& content,
-                       const vector<string>& header_lines) {
-  sq_printf(connection, "HTTP/1.1 %s\r\n", response_code_line.c_str());
-  for (const auto& h : header_lines) {
-    sq_printf(connection, "%s\r\n", h.c_str());
-  }
-  sq_printf(connection, "Content-Type: text/plain\r\n");
-  sq_printf(connection, "Content-Length: %zd\r\n\r\n", content.size());
-  sq_printf(connection, "%s", content.c_str());
-}
-
 // Return the address of the remote user from the squeasel request info.
 Sockaddr GetRemoteAddress(const struct sq_request_info* req) {
   struct sockaddr_in addr;
@@ -146,16 +136,18 @@ Sockaddr GetRemoteAddress(const struct sq_request_info* req) {
 // returned (and the other out-parameters left untouched). Otherwise, the
 // out-parameters will be written to, and the function will return either OK or
 // Incomplete depending on whether additional SPNEGO steps are required.
-Status RunSpnegoStep(const char* authz_header, string* resp_header,
+Status RunSpnegoStep(const char* authz_header,
+                     WebCallbackRegistry::HttpResponseHeaders* resp_headers,
                      string* authn_user) {
-  static const char* const kNegotiateStr = "WWW-Authenticate: Negotiate";
+  static const char* const kNegotiateHdrName = "WWW-Authenticate";
+  static const char* const kNegotiateHdrValue = "Negotiate";
   static const Status kIncomplete = Status::Incomplete("authn incomplete");
 
   VLOG(2) << "Handling Authorization header "
           << (authz_header ? KUDU_REDACT(authz_header) : "<null>");
 
   if (!authz_header) {
-    *resp_header = kNegotiateStr;
+    EmplaceOrDie(resp_headers, kNegotiateHdrName, kNegotiateHdrValue);
     return kIncomplete;
   }
 
@@ -171,7 +163,8 @@ Status RunSpnegoStep(const char* authz_header, string* resp_header,
   VLOG(2) << "SPNEGO step complete, response token: " << KUDU_REDACT(resp_token_b64);
 
   if (!resp_token_b64.empty()) {
-    *resp_header = Substitute("$0 $1", kNegotiateStr, resp_token_b64);
+    EmplaceOrDie(resp_headers, kNegotiateHdrName,
+                 Substitute("$0 $1", kNegotiateHdrValue, resp_token_b64));
   }
   return is_complete ? Status::OK() : kIncomplete;
 }
@@ -192,7 +185,7 @@ Webserver::~Webserver() {
 
 void Webserver::RootHandler(const Webserver::WebRequest& /* args */,
                             Webserver::WebResponse* resp) {
-  EasyJson path_handlers = resp->output->Set("path_handlers", EasyJson::kArray);
+  EasyJson path_handlers = resp->output.Set("path_handlers", EasyJson::kArray);
   for (const PathHandlerMap::value_type& handler : path_handlers_) {
     if (handler.second->is_on_nav_bar()) {
       EasyJson path_handler = path_handlers.PushBack(EasyJson::kObject);
@@ -200,7 +193,7 @@ void Webserver::RootHandler(const Webserver::WebRequest& /* args */,
       path_handler["alias"] = handler.second->alias();
     }
   }
-  (*resp->output)["version_info"] = EscapeForHtmlToString(VersionInfo::GetAllVersionInfo());
+  resp->output["version_info"] = EscapeForHtmlToString(VersionInfo::GetAllVersionInfo());
 }
 
 void Webserver::BuildArgumentMap(const string& args, ArgumentMap* output) {
@@ -326,6 +319,9 @@ Status Webserver::Start() {
   options.emplace_back("num_threads");
   options.push_back(std::to_string(opts_.num_worker_threads));
 
+  options.emplace_back("enable_keep_alive");
+  options.emplace_back("yes");
+
   // mongoose ignores SIGCHLD and we need it to run kinit. This means that since
   // mongoose does not reap its own children CGI programs must be avoided.
   // Save the signal handler so we can restore it after mongoose sets it to be ignored.
@@ -465,14 +461,17 @@ sq_callback_result_t Webserver::BeginRequestCallback(
     return SQ_CONTINUE_HANDLING;
   }
 
+  // The last SPNEGO step in a successful authentication may include a response
+  // header (e.g. when using mutual authentication).
+  PrerenderedWebResponse resp;
   if (opts_.require_spnego) {
     const char* authz_header = sq_get_header(connection, "Authorization");
-    string resp_header, authn_princ;
-    Status s = RunSpnegoStep(authz_header, &resp_header, &authn_princ);
+    string authn_princ;
+    Status s = RunSpnegoStep(authz_header, &resp.response_headers, &authn_princ);
     if (s.IsIncomplete()) {
-      SendPlainResponse(connection, "401 Authentication Required",
-                         "Must authenticate with SPNEGO.",
-                         { resp_header });
+      resp.output << "Must authenticate with SPNEGO.";
+      resp.status_code = HttpStatusCode::AuthenticationRequired;
+      SendResponse(connection, &resp);
       return SQ_HANDLED_OK;
     }
     if (s.ok() && authn_princ.empty()) {
@@ -488,10 +487,11 @@ sq_callback_result_t Webserver::BeginRequestCallback(
       LOG(WARNING) << "Failed to authenticate request from "
                    << GetRemoteAddress(request_info).ToString()
                    << " via SPNEGO: " << s.ToString();
-      const char* http_status = s.IsNotAuthorized() ? "401 Authentication Required" :
-          "500 Internal Server Error";
-
-      SendPlainResponse(connection, http_status, s.ToString(), {});
+      resp.output << s.ToString();
+      resp.status_code = s.IsNotAuthorized() ?
+                           HttpStatusCode::AuthenticationRequired :
+                           HttpStatusCode::InternalServerError;
+      SendResponse(connection, &resp);
       return SQ_HANDLED_OK;
     }
 
@@ -500,51 +500,6 @@ sq_callback_result_t Webserver::BeginRequestCallback(
     }
 
     request_info->remote_user = strdup(authn_princ.c_str());
-
-    // NOTE: According to the SPNEGO RFC (https://tools.ietf.org/html/rfc4559) it
-    // is possible that a non-empty token will be returned along with the HTTP 200
-    // response:
-    //
-    //     A status code 200 status response can also carry a "WWW-Authenticate"
-    //     response header containing the final leg of an authentication.  In
-    //     this case, the gssapi-data will be present.  Before using the
-    //     contents of the response, the gssapi-data should be processed by
-    //     gss_init_security_context to determine the state of the security
-    //     context.  If this function indicates success, the response can be
-    //     used by the application.  Otherwise, an appropriate action, based on
-    //     the authentication status, should be taken.
-    //
-    //     For example, the authentication could have failed on the final leg if
-    //     mutual authentication was requested and the server was not able to
-    //     prove its identity.  In this case, the returned results are suspect.
-    //     It is not always possible to mutually authenticate the server before
-    //     the HTTP operation.  POST methods are in this category.
-    //
-    // In fact, from inspecting the MIT krb5 source code, it appears that this
-    // only happens when the client requests mutual authentication by passing
-    // 'GSS_C_MUTUAL_FLAG' when establishing its side of the protocol. In practice,
-    // this seems to be widely unimplemented:
-    //
-    // - curl has some source code to support GSS_C_MUTUAL_FLAG, but in order to
-    //   enable it, you have to modify a FALSE constant to TRUE and recompile curl.
-    //   In fact, it was broken for all of 2015 without anyone noticing (see curl
-    //   commit 73f1096335d468b5be7c3cc99045479c3314f433)
-    //
-    // - Chrome doesn't support mutual auth at all -- see DelegationTypeToFlag(...)
-    //   in src/net/http/http_auth_gssapi_posix.cc.
-    //
-    // In practice, users depend on TLS to authenticate the server, and SPNEGO
-    // is used to authenticate the client.
-    //
-    // Given this, and because actually sending back the token on an OK response
-    // would require significant code restructuring (eg buffering the header until
-    // after the response handler has run) we just ignore any response token, but
-    // log a periodic warning just in case it turns out we're wrong about the above.
-    if (!resp_header.empty()) {
-      KLOG_EVERY_N_SECS(WARNING, 5) << "ignoring SPNEGO token on HTTP 200 response "
-                                    << "for user " << authn_princ << " at host "
-                                    << GetRemoteAddress(request_info).ToString();
-    }
   }
 
   PathHandler* handler;
@@ -556,24 +511,26 @@ sq_callback_result_t Webserver::BeginRequestCallback(
       // to the default handler which will serve files.
       if (!opts_.doc_root.empty() && opts_.enable_doc_root) {
         VLOG(2) << "HTTP File access: " << request_info->uri;
+        // TODO(adar): if using SPNEGO, do we need to somehow send the
+        // authentication response header here?
         return SQ_CONTINUE_HANDLING;
       }
-      sq_printf(connection,
-                "HTTP/1.1 %s\r\nContent-Type: text/plain\r\n\r\n",
-                HttpStatusCodeToString(HttpStatusCode::NotFound).c_str());
-      sq_printf(connection, "No handler for URI %s\r\n\r\n", request_info->uri);
+      resp.output << Substitute("No handler for URI $0\r\n\r\n", request_info->uri);
+      resp.status_code = HttpStatusCode::NotFound;
+      SendResponse(connection, &resp);
       return SQ_HANDLED_OK;
     }
     handler = it->second;
   }
 
-  return RunPathHandler(*handler, connection, request_info);
+  return RunPathHandler(*handler, connection, request_info, &resp);
 }
 
 sq_callback_result_t Webserver::RunPathHandler(
     const PathHandler& handler,
     struct sq_connection* connection,
-    struct sq_request_info* request_info) {
+    struct sq_request_info* request_info,
+    PrerenderedWebResponse* resp) {
   // Should we render with css styles?
   bool use_style = true;
 
@@ -588,18 +545,16 @@ sq_callback_result_t Webserver::RunPathHandler(
     int32_t content_len = 0;
     if (content_len_str == nullptr ||
         !safe_strto32(content_len_str, &content_len)) {
-      sq_printf(connection,
-                "HTTP/1.1 %s\r\n",
-                HttpStatusCodeToString(HttpStatusCode::LengthRequired).c_str());
+      resp->status_code = HttpStatusCode::LengthRequired;
+      SendResponse(connection, resp);
       return SQ_HANDLED_CLOSE_CONNECTION;
     }
     if (content_len > FLAGS_webserver_max_post_length_bytes) {
       // TODO(wdb): for this and other HTTP requests, we should log the
       // remote IP, etc.
       LOG(WARNING) << "Rejected POST with content length " << content_len;
-      sq_printf(connection,
-                "HTTP/1.1 %s\r\n",
-                HttpStatusCodeToString(HttpStatusCode::RequestEntityTooLarge).c_str());
+      resp->status_code = HttpStatusCode::RequestEntityTooLarge;
+      SendResponse(connection, resp);
       return SQ_HANDLED_CLOSE_CONNECTION;
     }
 
@@ -611,9 +566,8 @@ sq_callback_result_t Webserver::RunPathHandler(
         LOG(WARNING) << "error reading POST data: expected "
                      << content_len << " bytes but only read "
                      << req.post_data.size();
-        sq_printf(connection,
-                  "HTTP/1.1 %s\r\n",
-                  HttpStatusCodeToString(HttpStatusCode::InternalServerError).c_str());
+        resp->status_code = HttpStatusCode::InternalServerError;
+        SendResponse(connection, resp);
         return SQ_HANDLED_CLOSE_CONNECTION;
       }
 
@@ -626,37 +580,47 @@ sq_callback_result_t Webserver::RunPathHandler(
     use_style = false;
   }
 
-  ostringstream content;
-  PrerenderedWebResponse resp { HttpStatusCode::Ok, HttpResponseHeaders{}, &content };
   // Enable or disable redaction from the web UI based on the setting of --redact.
   // This affects operations like default value and scan predicate pretty printing.
   if (kudu::g_should_redact == kudu::RedactContext::ALL) {
-    handler.callback()(req, &resp);
+    handler.callback()(req, resp);
   } else {
     ScopedDisableRedaction s;
-    handler.callback()(req, &resp);
+    handler.callback()(req, resp);
   }
 
-  string full_content;
-  if (use_style) {
-    stringstream output;
-    RenderMainTemplate(content.str(), &output);
-    full_content = output.str();
-  } else {
-    full_content = content.str();
+  SendResponse(connection, resp, use_style ? StyleMode::STYLED : StyleMode::UNSTYLED);
+  return SQ_HANDLED_OK;
+}
+
+void Webserver::SendResponse(struct sq_connection* connection,
+                             PrerenderedWebResponse* resp,
+                             StyleMode mode) {
+  // If styling was requested, rerender and replace the prerendered output.
+  if (mode == StyleMode::STYLED) {
+    stringstream ss;
+    RenderMainTemplate(resp->output.str(), &ss);
+    resp->output.str(ss.str());
   }
 
-  // Check if the gzip compression is accepted by the caller. If so, compress the content.
+  // Check if gzip compression is accepted by the caller. If so, compress the
+  // content and replace the prerendered output.
   const char* accept_encoding_str = sq_get_header(connection, "Accept-Encoding");
   bool is_compressed = false;
   vector<string> encodings = strings::Split(accept_encoding_str, ",");
   for (string& encoding : encodings) {
     StripWhiteSpace(&encoding);
     if (encoding == "gzip") {
+      // Don't bother compressing empty content.
+      string uncompressed = resp->output.str();
+      if (uncompressed.empty()) {
+        break;
+      }
+
       ostringstream oss;
-      Status s = zlib::Compress(Slice(full_content), &oss);
+      Status s = zlib::Compress(uncompressed, &oss);
       if (s.ok()) {
-        full_content = oss.str();
+        resp->output.str(oss.str());
         is_compressed = true;
       } else {
         LOG(WARNING) << "Could not compress output: " << s.ToString();
@@ -665,42 +629,61 @@ sq_callback_result_t Webserver::RunPathHandler(
     }
   }
 
-  ostringstream headers_stream;
-  headers_stream << Substitute("HTTP/1.1 $0\r\n", HttpStatusCodeToString(resp.status_code));
-  headers_stream << Substitute("Content-Type: $0\r\n", use_style ? "text/html" : "text/plain");
-  headers_stream << Substitute("Content-Length: $0\r\n", full_content.length());
-  if (is_compressed) headers_stream << "Content-Encoding: gzip\r\n";
-  headers_stream << Substitute("X-Frame-Options: $0\r\n", FLAGS_webserver_x_frame_options);
-  std::unordered_set<string> invalid_headers{"Content-Type", "Content-Length", "X-Frame-Options"};
-  for (const auto& entry : resp.response_headers) {
+  // We've deferred constructing the content for as long as possible; we must
+  // do so now so that we can determine the content length.
+  string body = resp->output.str();
+
+  // Buffers up the headers and content as follows:
+  //
+  // <header 1>
+  // <header 2>
+  // ...
+  // <header N>
+  // <body>
+  ostringstream oss;
+
+  // Write the headers to the buffer first, then write the body.
+  oss << Substitute("HTTP/1.1 $0\r\n", HttpStatusCodeToString(resp->status_code));
+  oss << Substitute("Content-Type: $0\r\n",
+                    mode == StyleMode::STYLED ? "text/html" : "text/plain");
+  oss << Substitute("Content-Length: $0\r\n", body.length());
+  if (is_compressed) oss << "Content-Encoding: gzip\r\n";
+  oss << Substitute("X-Frame-Options: $0\r\n", FLAGS_webserver_x_frame_options);
+  static const unordered_set<string> kInvalidHeaders = {
+    "Content-Length",
+    "Content-Type",
+    "X-Frame-Options"
+  };
+  for (const auto& entry : resp->response_headers) {
     // It's forbidden to override the above headers.
-    if (ContainsKey(invalid_headers, entry.first)) {
-      LOG(FATAL) << "Reserved header " << entry.first << " was overridden "
-          "by handler for " << handler.alias();
+    if (ContainsKey(kInvalidHeaders, entry.first)) {
+      LOG(FATAL) << Substitute("Reserved header $0 was overridden by handler",
+                               entry.first);
     }
-    headers_stream << Substitute("$0: $1\r\n", entry.first, entry.second);
+    oss << Substitute("$0: $1\r\n", entry.first, entry.second);
   }
-  headers_stream << "\r\n";
-  string headers = headers_stream.str();
+  oss << "\r\n";
+  oss << body;
 
-  // Make sure to use sq_write for printing the body; sq_printf truncates at 8KB.
-  sq_write(connection, headers.c_str(), headers.length());
-  sq_write(connection, full_content.c_str(), full_content.length());
-  return SQ_HANDLED_OK;
+  // Send the buffered response to Squeasel in one go to avoid the latency hit
+  // of Nagle's algorithm + delayed TCP acknowledgements.
+  //
+  // Make sure to use sq_write; sq_printf truncates at 8KB.
+  string complete_response = oss.str();
+  sq_write(connection, complete_response.c_str(), complete_response.length());
 }
 
 void Webserver::RegisterPathHandler(const string& path, const string& alias,
     const PathHandlerCallback& callback, bool is_styled, bool is_on_nav_bar) {
   string render_path = (path == "/") ? "/home" : path;
   auto wrapped_cb = [=](const WebRequest& args, PrerenderedWebResponse* rendered_resp) {
-    EasyJson ej;
-    WebResponse resp { HttpStatusCode::Ok, HttpResponseHeaders{}, &ej };
+    WebResponse resp;
     callback(args, &resp);
     stringstream out;
-    Render(render_path, ej, is_styled, &out);
+    Render(render_path, resp.output, is_styled, &out);
     rendered_resp->status_code = resp.status_code;
     rendered_resp->response_headers = std::move(resp.response_headers);
-    *rendered_resp->output << out.rdbuf();
+    rendered_resp->output << out.rdbuf();
   };
   RegisterPrerenderedPathHandler(path, alias, wrapped_cb, is_styled, is_on_nav_bar);
 }
diff --git a/src/kudu/server/webserver.h b/src/kudu/server/webserver.h
index 728b539..68568a6 100644
--- a/src/kudu/server/webserver.h
+++ b/src/kudu/server/webserver.h
@@ -144,7 +144,8 @@ class Webserver : public WebCallbackRegistry {
   sq_callback_result_t RunPathHandler(
       const PathHandler& handler,
       struct sq_connection* connection,
-      struct sq_request_info* request_info);
+      struct sq_request_info* request_info,
+      PrerenderedWebResponse* resp);
 
   // Callback to funnel mongoose logs through glog.
   static int LogMessageCallbackStatic(const struct sq_connection* connection,
@@ -158,6 +159,17 @@ class Webserver : public WebCallbackRegistry {
   // key, it is entered into the map as (key, "").
   void BuildArgumentMap(const std::string& args, ArgumentMap* output);
 
+  // Sends a response back thru 'connection'.
+  //
+  // If 'mode' is STYLED, includes page styling elements like CSS, navigation bar, etc.
+  enum class StyleMode {
+    STYLED,
+    UNSTYLED,
+  };
+  void SendResponse(struct sq_connection* connection,
+                    PrerenderedWebResponse* resp,
+                    StyleMode mode = StyleMode::UNSTYLED);
+
   const WebserverOptions opts_;
 
   // Lock guarding the path_handlers_ map and footer_html.
diff --git a/src/kudu/tserver/tserver_path_handlers.cc b/src/kudu/tserver/tserver_path_handlers.cc
index d0e7951..8030054 100644
--- a/src/kudu/tserver/tserver_path_handlers.cc
+++ b/src/kudu/tserver/tserver_path_handlers.cc
@@ -148,7 +148,7 @@ bool GetTabletID(const Webserver::WebRequest& req,
                  Webserver::WebResponse* resp) {
   if (!FindCopy(req.parsed_args, "id", id)) {
     resp->status_code = HttpStatusCode::BadRequest;
-    resp->output->Set("error", "Request missing 'id' argument");
+    resp->output.Set("error", "Request missing 'id' argument");
     return false;
   }
   return true;
@@ -161,8 +161,8 @@ bool GetTabletReplica(TabletServer* tserver,
                       Webserver::WebResponse* resp) {
   if (!tserver->tablet_manager()->LookupTablet(tablet_id, replica)) {
     resp->status_code = HttpStatusCode::NotFound;
-    resp->output->Set("error",
-                      Substitute("Tablet $0 not found", tablet_id));
+    resp->output.Set("error",
+                     Substitute("Tablet $0 not found", tablet_id));
     return false;
   }
   return true;
@@ -173,8 +173,8 @@ bool TabletBootstrapping(const scoped_refptr<TabletReplica>& replica,
                          Webserver::WebResponse* resp) {
   if (replica->state() == tablet::BOOTSTRAPPING) {
     resp->status_code = HttpStatusCode::ServiceUnavailable;
-    resp->output->Set("error",
-                      Substitute("Tablet $0 is still bootstrapping", tablet_id));
+    resp->output.Set("error",
+                     Substitute("Tablet $0 is still bootstrapping", tablet_id));
     return true;
   }
   return false;
@@ -239,7 +239,7 @@ Status TabletServerPathHandlers::Register(Webserver* server) {
 
 void TabletServerPathHandlers::HandleTransactionsPage(const Webserver::WebRequest& req,
                                                       Webserver::PrerenderedWebResponse* resp) {
-  ostringstream* output = resp->output;
+  ostringstream* output = &resp->output;
   bool as_text = ContainsKey(req.parsed_args, "raw");
 
   vector<scoped_refptr<TabletReplica> > replicas;
@@ -302,7 +302,7 @@ void TabletServerPathHandlers::HandleTransactionsPage(const Webserver::WebReques
 
 void TabletServerPathHandlers::HandleTabletsPage(const Webserver::WebRequest& /*req*/,
                                                  Webserver::WebResponse* resp) {
-  EasyJson* output = resp->output;
+  EasyJson* output = &resp->output;
   vector<scoped_refptr<TabletReplica>> replicas;
   tserver_->tablet_manager()->GetTabletReplicas(&replicas);
 
@@ -406,7 +406,7 @@ void TabletServerPathHandlers::HandleTabletPage(const Webserver::WebRequest& req
     role = consensus->role();
   }
 
-  EasyJson* output = resp->output;
+  EasyJson* output = &resp->output;
   output->Set("tablet_id", tablet_id);
   output->Set("state", replica->HumanReadableState());
   output->Set("role", RaftPeerPB::Role_Name(role));
@@ -433,7 +433,7 @@ void TabletServerPathHandlers::HandleTabletSVGPage(const Webserver::WebRequest&
   scoped_refptr<TabletReplica> replica;
   if (!LoadTablet(tserver_, req, &tablet_id, &replica, resp)) return;
   shared_ptr<Tablet> tablet = replica->shared_tablet();
-  auto* output = resp->output;
+  auto* output = &resp->output;
   if (!tablet) {
     output->Set("error", Substitute("Tablet $0 is not running", tablet_id));
     return;
@@ -451,7 +451,7 @@ void TabletServerPathHandlers::HandleLogAnchorsPage(const Webserver::WebRequest&
   scoped_refptr<TabletReplica> replica;
   if (!LoadTablet(tserver_, req, &tablet_id, &replica, resp)) return;
 
-  auto* output = resp->output;
+  auto* output = &resp->output;
   output->Set("tablet_id", tablet_id);
   output->Set("log_anchors", replica->log_anchor_registry()->DumpAnchorInfo());
 }
@@ -462,7 +462,7 @@ void TabletServerPathHandlers::HandleConsensusStatusPage(const Webserver::WebReq
   scoped_refptr<TabletReplica> replica;
   if (!LoadTablet(tserver_, req, &tablet_id, &replica, resp)) return;
   shared_ptr<consensus::RaftConsensus> consensus = replica->shared_consensus();
-  auto* output = resp->output;
+  auto* output = &resp->output;
   if (!consensus) {
     output->Set("error", Substitute("Tablet $0 not initialized", tablet_id));
     return;
@@ -577,8 +577,8 @@ const char* kLongTimingTitle = "wall time, user cpu time, and system cpu time "
 
 void TabletServerPathHandlers::HandleScansPage(const Webserver::WebRequest& /*req*/,
                                                Webserver::WebResponse* resp) {
-  resp->output->Set("timing_title", kLongTimingTitle);
-  EasyJson scans = resp->output->Set("scans", EasyJson::kArray);
+  resp->output.Set("timing_title", kLongTimingTitle);
+  EasyJson scans = resp->output.Set("scans", EasyJson::kArray);
   vector<ScanDescriptor> descriptors = tserver_->scanner_manager()->ListScans();
 
   for (const auto& descriptor : descriptors) {
@@ -589,7 +589,7 @@ void TabletServerPathHandlers::HandleScansPage(const Webserver::WebRequest& /*re
 
 void TabletServerPathHandlers::HandleDashboardsPage(const Webserver::WebRequest& /*req*/,
                                                     Webserver::PrerenderedWebResponse* resp) {
-  ostringstream* output = resp->output;
+  ostringstream* output = &resp->output;
   *output << "<h3>Dashboards</h3>\n";
   *output << "<table class='table table-striped'>\n";
   *output << "  <thead><tr><th>Dashboard</th><th>Description</th></tr></thead>\n";
@@ -615,7 +615,7 @@ string TabletServerPathHandlers::GetDashboardLine(const std::string& link,
 
 void TabletServerPathHandlers::HandleMaintenanceManagerPage(const Webserver::WebRequest& req,
                                                             Webserver::WebResponse* resp) {
-  EasyJson* output = resp->output;
+  EasyJson* output = &resp->output;
   MaintenanceManager* manager = tserver_->maintenance_manager();
   MaintenanceManagerStatusPB pb;
   manager->GetMaintenanceManagerStatusDump(&pb);
diff --git a/src/kudu/util/curl_util.cc b/src/kudu/util/curl_util.cc
index 14e247c..aeb61dd 100644
--- a/src/kudu/util/curl_util.cc
+++ b/src/kudu/util/curl_util.cc
@@ -156,12 +156,14 @@ Status EasyCurl::DoRequest(const string& url,
         timeout_.ToMilliseconds())));
   }
   RETURN_NOT_OK(TranslateError(curl_easy_perform(curl_)));
-  long rc; // NOLINT(*) curl wants a long
-  RETURN_NOT_OK(TranslateError(curl_easy_getinfo(curl_, CURLINFO_RESPONSE_CODE, &rc)));
-  if (rc != 200) {
-    return Status::RemoteError(strings::Substitute("HTTP $0", rc));
-  }
+  long val; // NOLINT(*) curl wants a long
+  RETURN_NOT_OK(TranslateError(curl_easy_getinfo(curl_, CURLINFO_NUM_CONNECTS, &val)));
+  num_connects_ = val;
 
+  RETURN_NOT_OK(TranslateError(curl_easy_getinfo(curl_, CURLINFO_RESPONSE_CODE, &val)));
+  if (val != 200) {
+    return Status::RemoteError(strings::Substitute("HTTP $0", val));
+  }
   return Status::OK();
 }
 
diff --git a/src/kudu/util/curl_util.h b/src/kudu/util/curl_util.h
index 86f01da..2289e84 100644
--- a/src/kudu/util/curl_util.h
+++ b/src/kudu/util/curl_util.h
@@ -83,6 +83,11 @@ class EasyCurl {
     custom_method_ = std::move(m);
   }
 
+  // Returns the number of new connections created to achieve the previous transfer.
+  int num_connects() const {
+    return num_connects_;
+  }
+
  private:
   // Do a request. If 'post_data' is non-NULL, does a POST.
   // Otherwise, does a GET.
@@ -106,6 +111,8 @@ class EasyCurl {
 
   MonoDelta timeout_;
 
+  int num_connects_ = 0;
+
   DISALLOW_COPY_AND_ASSIGN(EasyCurl);
 };
 
diff --git a/src/kudu/util/thread.cc b/src/kudu/util/thread.cc
index 22106da..1c16763 100644
--- a/src/kudu/util/thread.cc
+++ b/src/kudu/util/thread.cc
@@ -384,7 +384,7 @@ void ThreadMgr::PrintThreadDescriptorRow(const ThreadDescriptor& desc,
 void ThreadMgr::ThreadPathHandler(
     const WebCallbackRegistry::WebRequest& req,
     WebCallbackRegistry::PrerenderedWebResponse* resp) const {
-  ostringstream& output = *(resp->output);
+  ostringstream& output = resp->output;
   vector<ThreadDescriptor> descriptors_to_print;
   const auto category_name = req.parsed_args.find("group");
   if (category_name != req.parsed_args.end()) {
diff --git a/src/kudu/util/web_callback_registry.h b/src/kudu/util/web_callback_registry.h
index 600bddd..f48a628 100644
--- a/src/kudu/util/web_callback_registry.h
+++ b/src/kudu/util/web_callback_registry.h
@@ -30,6 +30,7 @@ namespace kudu {
 enum class HttpStatusCode {
   Ok, // 200
   BadRequest, // 400
+  AuthenticationRequired, // 401
   NotFound, // 404
   LengthRequired, // 411
   RequestEntityTooLarge, // 413
@@ -68,25 +69,25 @@ class WebCallbackRegistry {
   // A response to an HTTP request whose body is rendered by template.
   struct WebResponse {
     // Determines the status code of the HTTP response.
-    HttpStatusCode status_code;
+    HttpStatusCode status_code = HttpStatusCode::Ok;
 
     // Additional headers added to the HTTP response.
     HttpResponseHeaders response_headers;
 
     // A JSON object to be rendered to HTML by a mustache template.
-    EasyJson* output;
+    EasyJson output;
   };
 
   // A response to an HTTP request.
   struct PrerenderedWebResponse {
     // Determines the status code of the HTTP response.
-    HttpStatusCode status_code;
+    HttpStatusCode status_code = HttpStatusCode::Ok;
 
     // Additional headers added to the HTTP response.
     HttpResponseHeaders response_headers;
 
     // The fully-rendered HTML response body.
-    std::ostringstream* output;
+    std::ostringstream output;
   };
 
   // A function that handles an HTTP request where the response body will be rendered


[kudu] 03/03: www: miscellaneous mustache updates

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 03cbd7559cf24e916924ddd747438f8f7e362cde
Author: Adar Dembo <ad...@cloudera.com>
AuthorDate: Wed Oct 16 16:44:38 2019 -0700

    www: miscellaneous mustache updates
    
    1. Converted /dashboards and /threadz into mustache templates. In /threadz,
       I tried to defer as much work as possible outside the lock, since it must
       be taken in write mode to create a new thread. While I was there I added
       "fancy tables" to /threadz; /dashboards remains visually unchanged.
    2. In /tablets, changed the handling of tablet links. In doing so, all links
       (except for the main template) are now in templates instead of in code.
    
    Screenshots: https://imgur.com/a/AOTQuH4
    
    Change-Id: Idb97a9e3bbefb8ee607638af6e069959c5354225
    Reviewed-on: http://gerrit.cloudera.org:8080/14473
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
---
 src/kudu/tserver/tserver_path_handlers.cc |  42 ++---------
 src/kudu/tserver/tserver_path_handlers.h  |   6 +-
 src/kudu/util/thread.cc                   | 119 +++++++++++++++---------------
 www/dashboards.mustache                   |  27 +++++++
 www/kudu.js                               |  27 +++++--
 www/tablets.mustache                      |  10 ++-
 www/threadz.mustache                      |  68 +++++++++++++++++
 7 files changed, 192 insertions(+), 107 deletions(-)

diff --git a/src/kudu/tserver/tserver_path_handlers.cc b/src/kudu/tserver/tserver_path_handlers.cc
index 8030054..d51f27f 100644
--- a/src/kudu/tserver/tserver_path_handlers.cc
+++ b/src/kudu/tserver/tserver_path_handlers.cc
@@ -104,12 +104,6 @@ bool CompareByMemberType(const RaftPeerPB& a, const RaftPeerPB& b) {
   return a.member_type() < b.member_type();
 }
 
-string TabletLink(const string& id) {
-  return Substitute("<a href=\"/tablet?id=$0\">$1</a>",
-                    UrlEncodeToString(id),
-                    EscapeForHtmlToString(id));
-}
-
 bool IsTombstoned(const scoped_refptr<TabletReplica>& replica) {
   return replica->data_state() == tablet::TABLET_DATA_TOMBSTONED;
 }
@@ -225,7 +219,7 @@ Status TabletServerPathHandlers::Register(Webserver* server) {
     "/log-anchors", "",
     boost::bind(&TabletServerPathHandlers::HandleLogAnchorsPage, this, _1, _2),
     true /* styled */, false /* is_on_nav_bar */);
-  server->RegisterPrerenderedPathHandler(
+  server->RegisterPathHandler(
     "/dashboards", "Dashboards",
     boost::bind(&TabletServerPathHandlers::HandleDashboardsPage, this, _1, _2),
     true /* styled */, true /* is_on_nav_bar */);
@@ -342,15 +336,15 @@ void TabletServerPathHandlers::HandleTabletsPage(const Webserver::WebRequest& /*
     EasyJson details_json = replicas_json->Set("replicas", EasyJson::kArray);
     for (const scoped_refptr<TabletReplica>& replica : replicas) {
       EasyJson replica_json = details_json.PushBack(EasyJson::kObject);
-      const auto* tablet = replica->tablet();
       const auto& tmeta = replica->tablet_metadata();
       TabletStatusPB status;
       replica->GetTabletStatusPB(&status);
       replica_json["table_name"] = status.table_name();
-      if (tablet != nullptr) {
-        replica_json["id_or_link"] = TabletLink(status.tablet_id());
-      } else {
-        replica_json["id_or_link"] = status.tablet_id();
+      replica_json["id"] = status.tablet_id();
+      if (replica->tablet() != nullptr) {
+        EasyJson link_json = replica_json.Set("link", EasyJson::kObject);
+        link_json["id"] = status.tablet_id();
+        link_json["url"] = Substitute("/tablet?id=$0", UrlEncodeToString(status.tablet_id()));
       }
       replica_json["partition"] =
           tmeta->partition_schema().PartitionDebugString(tmeta->partition(),
@@ -588,29 +582,7 @@ void TabletServerPathHandlers::HandleScansPage(const Webserver::WebRequest& /*re
 }
 
 void TabletServerPathHandlers::HandleDashboardsPage(const Webserver::WebRequest& /*req*/,
-                                                    Webserver::PrerenderedWebResponse* resp) {
-  ostringstream* output = &resp->output;
-  *output << "<h3>Dashboards</h3>\n";
-  *output << "<table class='table table-striped'>\n";
-  *output << "  <thead><tr><th>Dashboard</th><th>Description</th></tr></thead>\n";
-  *output << "  <tbody\n";
-  *output << GetDashboardLine("scans", "Scans", "List of currently running and recently "
-                                                "completed scans.");
-  *output << GetDashboardLine("transactions", "Transactions", "List of transactions that are "
-                                                              "currently running.");
-  *output << GetDashboardLine("maintenance-manager", "Maintenance Manager",
-                              "List of operations that are currently running and those "
-                              "that are registered.");
-  *output << "</tbody></table>\n";
-}
-
-string TabletServerPathHandlers::GetDashboardLine(const std::string& link,
-                                                  const std::string& text,
-                                                  const std::string& desc) {
-  return Substitute("  <tr><td><a href=\"$0\">$1</a></td><td>$2</td></tr>\n",
-                    EscapeForHtmlToString(link),
-                    EscapeForHtmlToString(text),
-                    EscapeForHtmlToString(desc));
+                                                    Webserver::WebResponse* /*resp*/) {
 }
 
 void TabletServerPathHandlers::HandleMaintenanceManagerPage(const Webserver::WebRequest& req,
diff --git a/src/kudu/tserver/tserver_path_handlers.h b/src/kudu/tserver/tserver_path_handlers.h
index 7b7beed..7cf370b 100644
--- a/src/kudu/tserver/tserver_path_handlers.h
+++ b/src/kudu/tserver/tserver_path_handlers.h
@@ -17,8 +17,6 @@
 #ifndef KUDU_TSERVER_TSERVER_PATH_HANDLERS_H
 #define KUDU_TSERVER_TSERVER_PATH_HANDLERS_H
 
-#include <string>
-
 #include "kudu/gutil/macros.h"
 #include "kudu/server/webserver.h"
 #include "kudu/util/status.h"
@@ -54,11 +52,9 @@ class TabletServerPathHandlers {
   void HandleConsensusStatusPage(const Webserver::WebRequest& req,
                                  Webserver::WebResponse* resp);
   void HandleDashboardsPage(const Webserver::WebRequest& req,
-                            Webserver::PrerenderedWebResponse* resp);
+                            Webserver::WebResponse* resp);
   void HandleMaintenanceManagerPage(const Webserver::WebRequest& req,
                                     Webserver::WebResponse* resp);
-  std::string GetDashboardLine(const std::string& link,
-                               const std::string& text, const std::string& desc);
 
   TabletServer* tserver_;
 
diff --git a/src/kudu/util/thread.cc b/src/kudu/util/thread.cc
index 1c16763..9f5eb01 100644
--- a/src/kudu/util/thread.cc
+++ b/src/kudu/util/thread.cc
@@ -28,7 +28,6 @@
 #include <algorithm>
 #include <cerrno>
 #include <cstring>
-#include <map>
 #include <memory>
 #include <mutex>
 #include <sstream>
@@ -50,6 +49,7 @@
 #include "kudu/gutil/once.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/easy_json.h"
 #include "kudu/util/env.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/kernel_stack_watchdog.h"
@@ -66,8 +66,8 @@
 
 using boost::bind;
 using boost::mem_fn;
-using std::map;
 using std::ostringstream;
+using std::pair;
 using std::shared_ptr;
 using std::string;
 using std::vector;
@@ -200,6 +200,12 @@ class ThreadMgr {
     const string& category() const { return category_; }
     int64_t thread_id() const { return thread_id_; }
 
+    struct Comparator {
+      bool operator()(const ThreadDescriptor& rhs, const ThreadDescriptor& lhs) const {
+        return rhs.name() < lhs.name();
+      }
+    };
+
    private:
     string name_;
     string category_;
@@ -241,9 +247,9 @@ class ThreadMgr {
 
   // Webpage callback; prints all threads by category.
   void ThreadPathHandler(const WebCallbackRegistry::WebRequest& req,
-                         WebCallbackRegistry::PrerenderedWebResponse* resp) const;
-  void PrintThreadDescriptorRow(const ThreadDescriptor& desc,
-                                ostringstream* output) const;
+                         WebCallbackRegistry::WebResponse* resp) const;
+  void SummarizeThreadDescriptor(const ThreadDescriptor& desc,
+                                 EasyJson* output) const;
 };
 
 void ThreadMgr::SetThreadName(const string& name, int64_t tid) {
@@ -295,11 +301,11 @@ Status ThreadMgr::StartInstrumentation(const scoped_refptr<MetricEntity>& metric
         Bind(&GetInVoluntaryContextSwitches)));
 
   if (web) {
-    WebCallbackRegistry::PrerenderedPathHandlerCallback thread_callback =
-        bind<void>(mem_fn(&ThreadMgr::ThreadPathHandler), this, _1, _2);
-    DCHECK_NOTNULL(web)->RegisterPrerenderedPathHandler("/threadz", "Threads", thread_callback,
-                                                        true /* is_styled*/,
-                                                        true /* is_on_nav_bar */);
+    auto thread_callback = bind<void>(mem_fn(&ThreadMgr::ThreadPathHandler),
+                                      this, _1, _2);
+    DCHECK_NOTNULL(web)->RegisterPathHandler("/threadz", "Threads", thread_callback,
+                                             /* is_styled= */ true,
+                                             /* is_on_nav_bar= */ true);
   }
   return Status::OK();
 }
@@ -367,84 +373,81 @@ void ThreadMgr::RemoveThread(const pthread_t& pthread_id, const string& category
   ANNOTATE_IGNORE_READS_AND_WRITES_END();
 }
 
-void ThreadMgr::PrintThreadDescriptorRow(const ThreadDescriptor& desc,
-                                         ostringstream* output) const {
+void ThreadMgr::SummarizeThreadDescriptor(const ThreadDescriptor& desc,
+                                          EasyJson* output) const {
   ThreadStats stats;
   Status status = GetThreadStats(desc.thread_id(), &stats);
   if (!status.ok()) {
     KLOG_EVERY_N(INFO, 100) << "Could not get per-thread statistics: "
                             << status.ToString();
   }
-  (*output) << "<tr><td>" << desc.name() << "</td><td>"
-            << (static_cast<double>(stats.user_ns) / 1e9) << "</td><td>"
-            << (static_cast<double>(stats.kernel_ns) / 1e9) << "</td><td>"
-            << (static_cast<double>(stats.iowait_ns) / 1e9) << "</td></tr>";
+  EasyJson thr = output->PushBack(EasyJson::kObject);
+  thr["thread_name"] = desc.name();
+  thr["user_sec"] = static_cast<double>(stats.user_ns) / 1e9;
+  thr["kernel_sec"] = static_cast<double>(stats.kernel_ns) / 1e9;
+  thr["iowait_sec"] = static_cast<double>(stats.iowait_ns) / 1e9;
 }
 
-void ThreadMgr::ThreadPathHandler(
-    const WebCallbackRegistry::WebRequest& req,
-    WebCallbackRegistry::PrerenderedWebResponse* resp) const {
-  ostringstream& output = resp->output;
-  vector<ThreadDescriptor> descriptors_to_print;
-  const auto category_name = req.parsed_args.find("group");
-  if (category_name != req.parsed_args.end()) {
-    const auto& group = category_name->second;
-    const auto& group_esc = EscapeForHtmlToString(group);
-    output << "<h2>Thread Group: " << group_esc << "</h2>";
-    if (group != "all") {
+void ThreadMgr::ThreadPathHandler(const WebCallbackRegistry::WebRequest& req,
+                                  WebCallbackRegistry::WebResponse* resp) const {
+  EasyJson& output = resp->output;
+  const auto* category_name = FindOrNull(req.parsed_args, "group");
+  if (category_name) {
+    // List all threads belonging to the desired thread group.
+    bool requested_all = *category_name == "all";
+    EasyJson rtg = output.Set("requested_thread_group", EasyJson::kObject);
+    rtg["group_name"] = EscapeForHtmlToString(*category_name);
+    rtg["requested_all"] = requested_all;
+
+    // The critical section is as short as possible so as to minimize the delay
+    // imposed on new threads that acquire the lock in write mode.
+    vector<ThreadDescriptor> descriptors_to_print;
+    if (!requested_all) {
       shared_lock<decltype(lock_)> l(lock_);
-      const auto it = thread_categories_.find(group);
-      if (it == thread_categories_.end()) {
-        output << "Thread group '" << group_esc << "' not found";
+      const auto* category = FindOrNull(thread_categories_, *category_name);
+      if (!category) {
         return;
       }
-      for (const auto& elem : it->second) {
-        descriptors_to_print.push_back(elem.second);
+      for (const auto& elem : *category) {
+        descriptors_to_print.emplace_back(elem.second);
       }
-      output << "<h3>" << it->first << " : " << it->second.size() << "</h3>";
     } else {
       shared_lock<decltype(lock_)> l(lock_);
       for (const auto& category : thread_categories_) {
         for (const auto& elem : category.second) {
-          descriptors_to_print.push_back(elem.second);
+          descriptors_to_print.emplace_back(elem.second);
         }
       }
-      output << "<h3>All Threads : </h3>";
     }
-    output << "<table class='table table-hover table-border'>"
-              "<thead><tr><th>Thread name</th><th>Cumulative User CPU(s)</th>"
-              "<th>Cumulative Kernel CPU(s)</th>"
-              "<th>Cumulative IO-wait(s)</th></tr></thead>"
-              "<tbody>\n";
-    // Sort the entries in the table by the name of a thread.
-    // TODO(aserbin): use "mustache + fancy table" instead.
-    std::sort(descriptors_to_print.begin(), descriptors_to_print.end(),
-              [](const ThreadDescriptor& lhs, const ThreadDescriptor& rhs) {
-                return lhs.name() < rhs.name();
-              });
+
+    EasyJson found = rtg.Set("found", EasyJson::kObject);
+    EasyJson threads = found.Set("threads", EasyJson::kArray);
     for (const auto& desc : descriptors_to_print) {
-      PrintThreadDescriptorRow(desc, &output);
+      SummarizeThreadDescriptor(desc, &threads);
     }
-    output << "</tbody></table>";
   } else {
-    // Using the tree map (std::map) to have the list of the thread categories
-    // at the '/threadz' page sorted alphabetically.
-    // TODO(aserbin): use "mustache + fancy table" instead.
-    map<string, size_t> thread_categories_info;
+    // List all thread groups and the number of threads running in each.
+    vector<pair<string, size_t>> thread_categories_info;
+    uint64_t running;
     {
+      // See comment above regarding short critical sections.
       shared_lock<decltype(lock_)> l(lock_);
-      output << "<h2>Thread Groups</h2>"
-                "<h4>" << threads_running_metric_ << " thread(s) running"
-                "<a href='/threadz?group=all'><h3>All Threads</h3>";
+      running = threads_running_metric_;
+      thread_categories_info.reserve(thread_categories_.size());
       for (const auto& category : thread_categories_) {
-        thread_categories_info.emplace(category.first, category.second.size());
+        thread_categories_info.emplace_back(category.first, category.second.size());
       }
     }
+
+    output["total_threads_running"] = running;
+    EasyJson groups = output.Set("groups", EasyJson::kArray);
     for (const auto& elem : thread_categories_info) {
       string category_arg;
       UrlEncode(elem.first, &category_arg);
-      output << "<a href='/threadz?group=" << category_arg << "'><h3>"
-             << elem.first << " : " << elem.second << "</h3></a>";
+      EasyJson g = groups.PushBack(EasyJson::kObject);
+      g["encoded_group_name"] = category_arg;
+      g["group_name"] = elem.first;
+      g["threads_running"] = elem.second;
     }
   }
 }
diff --git a/www/dashboards.mustache b/www/dashboards.mustache
new file mode 100644
index 0000000..d86ae27
--- /dev/null
+++ b/www/dashboards.mustache
@@ -0,0 +1,27 @@
+{{!
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+}}
+
+<h3>Dashboards</h3>
+<table class='table table-striped'>
+  <thead><tr><th>Dashboard</th><th>Description</th></tr></thead>
+  <tbody
+  <tr><td><a href="/scans">Scans</a></td><td>List of currently running and recently completed scans.</td></tr>
+  <tr><td><a href="/transactions">Transactions</a></td><td>List of transactions that are currently running.</td></tr>
+  <tr><td><a href="/maintenance-manager">Maintenance Manager</a></td><td>List of operations that are currently running and those that are registered.</td></tr>
+</tbody></table>
diff --git a/www/kudu.js b/www/kudu.js
index 2e7df97..9f02320 100644
--- a/www/kudu.js
+++ b/www/kudu.js
@@ -67,8 +67,21 @@ function bytesSorter(left, right) {
   return 0;
 }
 
+// A comparison function for floating point numbers.
+function floatsSorter(left, right) {
+  left_float = parseFloat(left)
+  right_float = parseFloat(right)
+  if (left_float < right_float) {
+    return -1;
+  }
+  if (left_float > right_float) {
+    return 1;
+  }
+  return 0;
+}
+
 // Converts numeric strings to numbers and then compares them.
-function compareNumericStrings(left, right) {
+function numericStringsSorter(left, right) {
   left_num = parseInt(left, 10);
   right_num = parseInt(right, 10);
   if (left_num < right_num) {
@@ -103,32 +116,32 @@ function timesSorter(left, right) {
   }
 
   // Year.
-  var ret = compareNumericStrings(left.substr(0, 4), right.substr(0, 4));
+  var ret = numericStringsSorter(left.substr(0, 4), right.substr(0, 4));
   if (ret != 0) {
     return ret;
   }
   // Month.
-  ret = compareNumericStrings(left.substr(5, 2), right.substr(5, 2));
+  ret = numericStringsSorter(left.substr(5, 2), right.substr(5, 2));
   if (ret != 0) {
     return ret;
   }
   // Day.
-  ret = compareNumericStrings(left.substr(8, 2), right.substr(8, 2));
+  ret = numericStringsSorter(left.substr(8, 2), right.substr(8, 2));
   if (ret != 0) {
     return ret;
   }
   // Hour.
-  ret = compareNumericStrings(left.substr(11, 2), right.substr(11, 2));
+  ret = numericStringsSorter(left.substr(11, 2), right.substr(11, 2));
   if (ret != 0) {
     return ret;
   }
   // Minute.
-  ret = compareNumericStrings(left.substr(14, 2), right.substr(14, 2));
+  ret = numericStringsSorter(left.substr(14, 2), right.substr(14, 2));
   if (ret != 0) {
     return ret;
   }
   // Second.
-  ret = compareNumericStrings(left.substr(17, 2), right.substr(17, 2));
+  ret = numericStringsSorter(left.substr(17, 2), right.substr(17, 2));
   if (ret != 0) {
     return ret;
   }
diff --git a/www/tablets.mustache b/www/tablets.mustache
index d7eee69..2a54ada 100644
--- a/www/tablets.mustache
+++ b/www/tablets.mustache
@@ -47,7 +47,10 @@ There are no tablet replicas.
     {{#replicas}}
       <tr>
         <td>{{table_name}}</td>
-        <td>{{{id_or_link}}}</td>
+        <td>
+          {{#link}}<a href="{{url}}">{{id}}</a>{{/link}}
+          {{^link}}{{id}}{{/link}}
+        </td>
         <td>{{partition}}</td>
         <td>{{state}}</td>
         <td>{{n_bytes}}</td>
@@ -90,7 +93,10 @@ There are no tablet replicas.
     {{#replicas}}
       <tr>
         <td>{{table_name}}</td>
-        <td>{{{id_or_link}}}</td>
+        <td>
+          {{#link}}<a href="{{url}}">{{id}}</a>{{/link}}
+          {{^link}}{{id}}{{/link}}
+        </td>
         <td>{{partition}}</td>
         <td>{{state}}</td>
         <td>{{n_bytes}}</td>
diff --git a/www/threadz.mustache b/www/threadz.mustache
new file mode 100644
index 0000000..efabdd7
--- /dev/null
+++ b/www/threadz.mustache
@@ -0,0 +1,68 @@
+{{!
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+}}
+
+{{#requested_thread_group}}
+<h2>Thread Group: {{group_name}}</h2>
+{{#requested_all}}<h3>All Threads : </h3>{{/requested_all}}
+{{#found}}
+<table class='table table-hover' data-sort-name='name' data-toggle='table'>
+  <thead>
+    <tr>
+      <th data-field='name' data-sortable='true' data-sorter='stringsSorter'>Thread name</th>
+      <th data-sortable='true' data-sorter='floatsSorter'>Cumulative User CPU (s)</th>
+      <th data-sortable='true' data-sorter='floatsSorter'>Cumulative Kernel CPU (s)</th>
+      <th data-sortable='true' data-sorter='floatsSorter'>Cumulative IO-wait (s)</th>
+    </tr>
+  </thead>
+  <tbody>
+    {{#threads}}
+    <tr>
+      <td>{{thread_name}}</td>
+      <td>{{user_sec}}</td>
+      <td>{{kernel_sec}}</td>
+      <td>{{iowait_sec}}</td>
+    </tr>
+    {{/threads}}
+  </tbody>
+</table>
+{{/found}}
+{{^found}}Thread group {{group_name}} not found{{/found}}
+{{/requested_thread_group}}
+
+{{^requested_thread_group}}
+<h2>Thread Groups</h2>
+<h4>{{total_threads_running}} thread(s) running</h4>
+<a href='/threadz?group=all'><h3>All Threads</h3></a>
+<table class='table table-hover' data-sort-name='group' data-toggle='table'>
+  <thead>
+    <tr>
+      <th data-field='group' data-sortable='true' data-sorter='stringsSorter'>Group</th>
+      <th data-sortable='true' data-sorter='numericStringsSorter'>Threads running</th>
+    </tr>
+  </thead>
+  <tbody>
+    {{#groups}}
+    <tr>
+      <td><a href='/threadz?group={{encoded_group_name}}'>{{group_name}}</a></td>
+      <td>{{threads_running}}</td>
+    </tr>
+    {{/groups}}
+  </tbody>
+</table>
+{{/requested_thread_group}}