You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2019/01/23 22:33:21 UTC

[kudu] branch master updated (5b1d17f -> 9443a69)

This is an automated email from the ASF dual-hosted git repository.

adar pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git.


    from 5b1d17f  python: fix Python 3.4 based tests
     new 9ea5433  [spark] Add metrics to kudu-spark
     new 9443a69  [java] log warning if applying operation on a closed session

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/kudu/client/AsyncKuduClient.java    |  2 +-
 .../org/apache/kudu/client/AsyncKuduSession.java   |  5 +++
 .../org/apache/kudu/client/TestKuduClient.java     | 27 ++++++++++++-
 .../org/apache/kudu/spark/kudu/DefaultSource.scala |  6 +--
 .../org/apache/kudu/spark/kudu/KuduContext.scala   | 24 ++++++++++-
 .../scala/org/apache/kudu/spark/kudu/KuduRDD.scala | 14 ++++++-
 .../apache/kudu/spark/kudu/DefaultSourceTest.scala | 47 +++++++++++++++-------
 .../org/apache/kudu/spark/kudu/KuduRDDTest.scala   |  4 +-
 8 files changed, 104 insertions(+), 25 deletions(-)


[kudu] 01/02: [spark] Add metrics to kudu-spark

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 9ea54330645af044817894dbf1c0d5b8a5185822
Author: Will Berkeley <wd...@gmail.org>
AuthorDate: Fri Jan 18 17:00:26 2019 -0800

    [spark] Add metrics to kudu-spark
    
    This adds some basic accumulator metrics to kudu-spark. These
    accumulators appear for each stage involving a KuduContext, with values
    broken down by executor.
    
    Follow-ups will add some more sophisticated metrics and use metrics to
    add additional logging.
    
    In addition to the unit tests, I tested this on a 3-node cluster and
    confirmed I was able to see the accumulator values for stages and
    individual tasks on the Spark application's web UI, and that were
    correct.
    
    Change-Id: Ied81c890b3d4510f767d8f023963ff878f398140
    Reviewed-on: http://gerrit.cloudera.org:8080/12251
    Reviewed-by: Grant Henke <gr...@apache.org>
    Tested-by: Will Berkeley <wd...@gmail.com>
---
 .../org/apache/kudu/client/AsyncKuduClient.java    |  2 +-
 .../org/apache/kudu/spark/kudu/DefaultSource.scala |  6 +--
 .../org/apache/kudu/spark/kudu/KuduContext.scala   | 24 ++++++++++-
 .../scala/org/apache/kudu/spark/kudu/KuduRDD.scala | 14 ++++++-
 .../apache/kudu/spark/kudu/DefaultSourceTest.scala | 47 +++++++++++++++-------
 .../org/apache/kudu/spark/kudu/KuduRDDTest.scala   |  4 +-
 6 files changed, 74 insertions(+), 23 deletions(-)

diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index 4ed25f6..9b5b280 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -432,7 +432,7 @@ public class AsyncKuduClient implements AutoCloseable {
         new ServerInfo(getFakeMasterUuid(hostPort),
                        hostPort,
                        inetAddress,
-                       /*locaton=*/""),
+                       /* location= */""),
         credentialsPolicy);
   }
 
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
index 7729eb9..388fac1 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
@@ -122,7 +122,7 @@ class DefaultSource
    * @param sqlContext
    * @param mode Only Append mode is supported. It will upsert or insert data
    *             to an existing table, depending on the upsert parameter
-   * @param parameters Necessary parameters for kudu.table, kudu.master, etc..
+   * @param parameters Necessary parameters for kudu.table, kudu.master, etc...
    * @param data Dataframe to save into kudu
    * @return returns populated base relation
    */
@@ -401,10 +401,10 @@ class KuduRelation(
   }
 
   /**
-   * Creates a new `IS NULL` predicate for the column.
+   * Creates a new `IS NOT NULL` predicate for the column.
    *
    * @param column the column name
-   * @return the `IS NULL` predicate
+   * @return the `IS NOT NULL` predicate
    */
   private def isNotNullPredicate(column: String): KuduPredicate = {
     KuduPredicate.newIsNotNullPredicate(table.getSchema.getColumn(column))
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
index 5a12ad6..d6295f9 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
@@ -19,7 +19,6 @@ package org.apache.kudu.spark.kudu
 
 import java.security.AccessController
 import java.security.PrivilegedAction
-
 import javax.security.auth.Subject
 import javax.security.auth.login.AppConfigurationEntry
 import javax.security.auth.login.Configuration
@@ -27,6 +26,7 @@ import javax.security.auth.login.LoginContext
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
+
 import org.apache.hadoop.util.ShutdownHookManager
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
@@ -39,6 +39,7 @@ import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.CatalystTypeConverters
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.util.AccumulatorV2
+import org.apache.spark.util.LongAccumulator
 import org.apache.yetus.audience.InterfaceAudience
 import org.apache.yetus.audience.InterfaceStability
 import org.slf4j.Logger
@@ -65,6 +66,24 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou
   def this(kuduMaster: String, sc: SparkContext) = this(kuduMaster, sc, None)
 
   /**
+   * A collection of accumulator metrics describing the usage of a KuduContext.
+   */
+  private[kudu] val numInserts: LongAccumulator = sc.longAccumulator("kudu.num_inserts")
+  private[kudu] val numUpserts: LongAccumulator = sc.longAccumulator("kudu.num_upserts")
+  private[kudu] val numUpdates: LongAccumulator = sc.longAccumulator("kudu.num_updates")
+  private[kudu] val numDeletes: LongAccumulator = sc.longAccumulator("kudu.num_deletes")
+
+  // Increments the appropriate metric given an OperationType and a count.
+  private def addForOperation(count: Long, opType: OperationType): Unit = {
+    opType match {
+      case Insert => numInserts.add(count)
+      case Upsert => numUpserts.add(count)
+      case Update => numUpdates.add(count)
+      case Delete => numDeletes.add(count)
+    }
+  }
+
+  /**
    * TimestampAccumulator accumulates the maximum value of client's
    * propagated timestamp of all executors and can only read by the
    * driver.
@@ -333,6 +352,7 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou
     session.setFlushMode(FlushMode.AUTO_FLUSH_BACKGROUND)
     session.setIgnoreAllDuplicateRows(writeOptions.ignoreDuplicateRowErrors)
     val typeConverter = CatalystTypeConverters.createToScalaConverter(schema)
+    var numRows = 0
     try {
       for (internalRow <- rows) {
         val row = typeConverter(internalRow).asInstanceOf[Row]
@@ -377,12 +397,14 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou
           }
         }
         session.apply(operation)
+        numRows += 1
       }
     } finally {
       session.close()
       // Update timestampAccumulator with the client's last propagated
       // timestamp on each executor.
       timestampAccumulator.add(syncClient.getLastPropagatedTimestamp)
+      addForOperation(numRows, operationType)
     }
     session.getPendingErrors
   }
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
index 2deea16..356c290 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
@@ -17,13 +17,16 @@
 package org.apache.kudu.spark.kudu
 
 import scala.collection.JavaConverters._
+
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.Row
 import org.apache.spark.Partition
 import org.apache.spark.SparkContext
 import org.apache.spark.TaskContext
+import org.apache.spark.util.LongAccumulator
 import org.apache.yetus.audience.InterfaceAudience
 import org.apache.yetus.audience.InterfaceStability
+
 import org.apache.kudu.client._
 import org.apache.kudu.Type
 import org.apache.kudu.client
@@ -47,6 +50,10 @@ class KuduRDD private[kudu] (
   // Defined here because the options are transient.
   private val keepAlivePeriodMs = options.keepAlivePeriodMs
 
+  // A metric for the rows read from Kudu for this RDD.
+  // TODO(wdberkeley): Add bytes read if it becomes available from the Java client.
+  private[kudu] val rowsRead = sc.longAccumulator("kudu.rows_read")
+
   override protected def getPartitions: Array[Partition] = {
     val builder = kuduContext.syncClient
       .newScanTokenBuilder(table)
@@ -93,7 +100,7 @@ class KuduRDD private[kudu] (
     val partition: KuduPartition = part.asInstanceOf[KuduPartition]
     val scanner =
       KuduScanToken.deserializeIntoScanner(partition.scanToken, client)
-    new RowIterator(scanner, kuduContext, keepAlivePeriodMs)
+    new RowIterator(scanner, kuduContext, keepAlivePeriodMs, rowsRead)
   }
 
   override def getPreferredLocations(partition: Partition): Seq[String] = {
@@ -115,11 +122,13 @@ private class KuduPartition(
  * @param scanner the wrapped scanner
  * @param kuduContext the kudu context
  * @param keepAlivePeriodMs the period in which to call the keepAlive on the scanners
+ * @param rowsRead an accumulator to track the number of rows read from Kudu
  */
 private class RowIterator(
     val scanner: KuduScanner,
     val kuduContext: KuduContext,
-    val keepAlivePeriodMs: Long)
+    val keepAlivePeriodMs: Long,
+    val rowsRead: LongAccumulator)
     extends Iterator[Row] {
 
   private var currentIterator: RowResultIterator = RowResultIterator.empty
@@ -145,6 +154,7 @@ private class RowIterator(
       // Update timestampAccumulator with the client's last propagated
       // timestamp on each executor.
       kuduContext.timestampAccumulator.add(kuduContext.syncClient.getLastPropagatedTimestamp)
+      rowsRead.add(currentIterator.getNumRows)
     }
     KeepKuduScannerAlive()
     currentIterator.hasNext
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
index fe56e75..ad2c110 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
@@ -89,9 +89,11 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
       new CreateTableOptions()
         .setRangePartitionColumns(List("key").asJava)
         .setNumReplicas(1))
+    val insertsBefore = kuduContext.numInserts.value
     kuduContext.insertRows(df, tableName)
+    assertEquals(insertsBefore + df.count(), kuduContext.numInserts.value)
 
-    // now use new options to refer to the new table name
+    // Now use new options to refer to the new table name.
     val newOptions: Map[String, String] =
       Map("kudu.table" -> tableName, "kudu.master" -> harness.getMasterAddressesAsString)
     val checkDf = sqlContext.read.options(newOptions).format("kudu").load
@@ -127,7 +129,9 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
         .addRangePartition(lower, upper)
         .setNumReplicas(1)
     )
+    val insertsBefore = kuduContext.numInserts.value
     kuduContext.insertRows(df, tableName)
+    assertEquals(insertsBefore + df.count(), kuduContext.numInserts.value)
 
     // now use new options to refer to the new table name
     val newOptions: Map[String, String] =
@@ -144,12 +148,14 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
 
   @Test
   def testInsertion() {
+    val insertsBefore = kuduContext.numInserts.value
     val df = sqlContext.read.options(kuduOptions).format("kudu").load
     val changedDF = df
       .limit(1)
       .withColumn("key", df("key").plus(100))
       .withColumn("c2_s", lit("abc"))
     kuduContext.insertRows(changedDF, tableName)
+    assertEquals(insertsBefore + changedDF.count(), kuduContext.numInserts.value)
 
     val newDF = sqlContext.read.options(kuduOptions).format("kudu").load
     val collected = newDF.filter("key = 100").collect()
@@ -160,12 +166,14 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
 
   @Test
   def testInsertionMultiple() {
+    val insertsBefore = kuduContext.numInserts.value
     val df = sqlContext.read.options(kuduOptions).format("kudu").load
     val changedDF = df
       .limit(2)
       .withColumn("key", df("key").plus(100))
       .withColumn("c2_s", lit("abc"))
     kuduContext.insertRows(changedDF, tableName)
+    assertEquals(insertsBefore + changedDF.count(), kuduContext.numInserts.value)
 
     val newDF = sqlContext.read.options(kuduOptions).format("kudu").load
     val collected = newDF.filter("key = 100").collect()
@@ -181,28 +189,28 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
   @Test
   def testInsertionIgnoreRows() {
     val df = sqlContext.read.options(kuduOptions).format("kudu").load
-    val baseDF = df.limit(1) // filter down to just the first row
+    val baseDF = df.limit(1) // Filter down to just the first row.
 
     // change the c2 string to abc and insert
     val updateDF = baseDF.withColumn("c2_s", lit("abc"))
     val kuduWriteOptions = KuduWriteOptions(ignoreDuplicateRowErrors = true)
     kuduContext.insertRows(updateDF, tableName, kuduWriteOptions)
 
-    // change the key and insert
+    // Change the key and insert.
     val insertDF = df
       .limit(1)
       .withColumn("key", df("key").plus(100))
       .withColumn("c2_s", lit("def"))
     kuduContext.insertRows(insertDF, tableName, kuduWriteOptions)
 
-    // read the data back
+    // Read the data back.
     val newDF = sqlContext.read.options(kuduOptions).format("kudu").load
     val collectedUpdate = newDF.filter("key = 0").collect()
     assertEquals("0", collectedUpdate(0).getAs[String]("c2_s"))
     val collectedInsert = newDF.filter("key = 100").collect()
     assertEquals("def", collectedInsert(0).getAs[String]("c2_s"))
 
-    // restore the original state of the table
+    // Restore the original state of the table.
     deleteRow(100)
   }
 
@@ -299,28 +307,34 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
   @Test
   def testUpsertRows() {
     val df = sqlContext.read.options(kuduOptions).format("kudu").load
-    val baseDF = df.limit(1) // filter down to just the first row
+    val baseDF = df.limit(1) // Filter down to just the first row.
 
-    // change the c2 string to abc and update
-    val updateDF = baseDF.withColumn("c2_s", lit("abc"))
-    kuduContext.upsertRows(updateDF, tableName)
+    // Change the c2 string to abc and update.
+    val upsertDF = baseDF.withColumn("c2_s", lit("abc"))
+    kuduContext.upsertRows(upsertDF, tableName)
 
-    // change the key and insert
+    // Change the key and insert.
+    val upsertsBefore = kuduContext.numUpserts.value
     val insertDF = df
       .limit(1)
       .withColumn("key", df("key").plus(100))
       .withColumn("c2_s", lit("def"))
     kuduContext.upsertRows(insertDF, tableName)
+    assertEquals(upsertsBefore + insertDF.count(), kuduContext.numUpserts.value)
 
-    // read the data back
+    // Read the data back.
     val newDF = sqlContext.read.options(kuduOptions).format("kudu").load
     val collectedUpdate = newDF.filter("key = 0").collect()
     assertEquals("abc", collectedUpdate(0).getAs[String]("c2_s"))
     val collectedInsert = newDF.filter("key = 100").collect()
     assertEquals("def", collectedInsert(0).getAs[String]("c2_s"))
 
-    // restore the original state of the table
-    kuduContext.updateRows(baseDF.filter("key = 0").withColumn("c2_s", lit("0")), tableName)
+    // Restore the original state of the table, and test the numUpdates metric.
+    val updatesBefore = kuduContext.numUpdates.value
+    val updateDF = baseDF.filter("key = 0").withColumn("c2_s", lit("0"))
+    val updatesApplied = updateDF.count()
+    kuduContext.updateRows(updateDF, tableName)
+    assertEquals(updatesBefore + updatesApplied, kuduContext.numUpdates.value)
     deleteRow(100)
   }
 
@@ -392,14 +406,17 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
   def testDeleteRows() {
     val df = sqlContext.read.options(kuduOptions).format("kudu").load
     val deleteDF = df.filter("key = 0").select("key")
+    val deletesBefore = kuduContext.numDeletes.value
+    val deletesApplied = deleteDF.count()
     kuduContext.deleteRows(deleteDF, tableName)
+    assertEquals(deletesBefore + deletesApplied, kuduContext.numDeletes.value)
 
-    // read the data back
+    // Read the data back.
     val newDF = sqlContext.read.options(kuduOptions).format("kudu").load
     val collectedDelete = newDF.filter("key = 0").collect()
     assertEquals(0, collectedDelete.length)
 
-    // restore the original state of the table
+    // Restore the original state of the table.
     val insertDF = df.limit(1).filter("key = 0")
     kuduContext.insertRows(insertDF, tableName)
   }
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduRDDTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduRDDTest.scala
index 49bc15e..1904f95 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduRDDTest.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduRDDTest.scala
@@ -26,6 +26,7 @@ import org.apache.kudu.test.KuduTestHarness.TabletServerConfig
 import org.apache.spark.SparkException
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.Row
+import org.junit.Assert.assertEquals
 import org.junit.Test
 
 class KuduRDDTest extends KuduTestSuite {
@@ -34,7 +35,8 @@ class KuduRDDTest extends KuduTestSuite {
   def testCollectRows() {
     insertRows(table, 100)
     val rdd = kuduContext.kuduRDD(ss.sparkContext, tableName, List("key"))
-    assert(rdd.collect.length == 100)
+    assertEquals(100, rdd.collect().length)
+    assertEquals(100L, rdd.asInstanceOf[KuduRDD].rowsRead.value)
   }
 
   @Test


[kudu] 02/02: [java] log warning if applying operation on a closed session

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 9443a69b0d81c65a174874661e90a13e5792d55b
Author: abc549825 <ab...@163.com>
AuthorDate: Wed Jan 9 19:23:37 2019 +0800

    [java] log warning if applying operation on a closed session
    
    Closed sessions won't flush on another close() call, so it's somewhat
    dubious to apply() an operation to a closed session. This change adds a
    warning if apply() is used with a closed session.
    
    Originally this patch enforced the new behavior via precondition, but that's
    a breaking change for clients that relied on the old broken behavior.
    
    Change-Id: I0fe221fedbb91959985f5ee374f1b691be2426a9
    Reviewed-on: http://gerrit.cloudera.org:8080/12237
    Tested-by: Kudu Jenkins
    Reviewed-by: Grant Henke <gr...@apache.org>
---
 .../org/apache/kudu/client/AsyncKuduSession.java   |  5 ++++
 .../org/apache/kudu/client/TestKuduClient.java     | 27 ++++++++++++++++++++--
 2 files changed, 30 insertions(+), 2 deletions(-)

diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
index 94ca290..7d649bd 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
@@ -532,6 +532,11 @@ public class AsyncKuduSession implements SessionConfiguration {
     Preconditions.checkArgument(operation.getTable().getAsyncClient() == client,
         "Applied operations must be created from a KuduTable instance opened " +
         "from the same client that opened this KuduSession");
+    if (closed) {
+      // Ideally this would be a precondition, but that may break existing
+      // clients who have grown to rely on this unsafe behavior.
+      LOG.warn("Applying an operation in a closed session; this is unsafe");
+    }
 
     // Freeze the row so that the client can not concurrently modify it while it is in flight.
     operation.getRow().freeze();
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
index 97df3e0..4584890 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
@@ -1071,7 +1071,6 @@ public class TestKuduClient {
               session.apply(insert);
             }
             session.flush();
-            session.close();
 
             // Perform a bunch of READ_YOUR_WRITES scans to all the replicas
             // that count the rows. And verify that the count of the rows
@@ -1114,7 +1113,7 @@ public class TestKuduClient {
       future.get();
     }
   }
-  
+
   private void runTestCallDuringLeaderElection(String clientMethodName) throws Exception {
     // This bit of reflection helps us avoid duplicating test code.
     Method methodToInvoke = KuduClient.class.getMethod(clientMethodName);
@@ -1138,6 +1137,7 @@ public class TestKuduClient {
                     .build();
     try {
       methodToInvoke.invoke(cl);
+      fail();
     } catch (InvocationTargetException ex) {
       assertTrue(ex.getTargetException() instanceof KuduException);
       KuduException realEx = (KuduException)ex.getTargetException();
@@ -1154,6 +1154,7 @@ public class TestKuduClient {
   public void testGetHiveMetastoreConfigDuringLeaderElection() throws Exception {
     runTestCallDuringLeaderElection("getHiveMetastoreConfig");
   }
+
   /**
    * Test assignment of a location to the client.
    */
@@ -1173,4 +1174,26 @@ public class TestKuduClient {
     client.listTabletServers();
     assertEquals("/L0", client.getLocationString());
   }
+
+  @Test(timeout = 100000)
+  public void testSessionOnceClosed() throws Exception {
+    client.createTable(TABLE_NAME, basicSchema, getBasicCreateTableOptions());
+    KuduTable table = client.openTable(TABLE_NAME);
+    KuduSession session = client.newSession();
+
+    session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
+    Insert insert = createBasicSchemaInsert(table, 0);
+    session.apply(insert);
+    session.close();
+    assertTrue(session.isClosed());
+
+    insert = createBasicSchemaInsert(table, 1);
+    CapturingLogAppender cla = new CapturingLogAppender();
+    try (Closeable c = cla.attach()) {
+      session.apply(insert);
+    }
+    String loggedText = cla.getAppendedText();
+    assertTrue("Missing warning:\n" + loggedText,
+               loggedText.contains("this is unsafe"));
+  }
 }