You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by da...@apache.org on 2016/03/17 22:30:31 UTC

incubator-kudu git commit: Fix Spark shutdown

Repository: incubator-kudu
Updated Branches:
  refs/heads/master d7de21009 -> 64f007c76


Fix Spark shutdown

The Kudu client was creating Netty timers with non-daemon threads, which
prevented the Spark shell JVM from beginning the shutdown process. This commit
also adds a shutdown hook to the Spark connector so that when the shell process
exits it will properly shut down the Kudu client which prevents batched but
unflushed writes from being lost.

See
http://apache-spark-developers-list.1001551.n3.nabble.com/graceful-shutdown-in-external-data-sources-td16684.html
for more discussion.

Change-Id: I30a64ec5eb30d70361204646523c9947d88c251f
Reviewed-on: http://gerrit.cloudera.org:8080/2571
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/64f007c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/64f007c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/64f007c7

Branch: refs/heads/master
Commit: 64f007c76226f9040afc7c555c42c9aede238022
Parents: d7de210
Author: Dan Burkert <da...@cloudera.com>
Authored: Wed Mar 16 18:48:29 2016 -0700
Committer: Dan Burkert <da...@cloudera.com>
Committed: Thu Mar 17 21:29:50 2016 +0000

----------------------------------------------------------------------
 .../java/org/kududb/client/AsyncKuduClient.java | 13 +++++++--
 .../scala/org/kududb/spark/KuduContext.scala    | 30 ++++++++++++++++++--
 2 files changed, 38 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/64f007c7/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java
index 1a396b1..51a6dd2 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java
@@ -38,6 +38,7 @@ import com.stumbleupon.async.Callback;
 import com.stumbleupon.async.Deferred;
 
 import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.socket.nio.NioWorkerPool;
 import org.kududb.Common;
 import org.kududb.Schema;
 import org.kududb.annotations.InterfaceAudience;
@@ -197,7 +198,7 @@ public class AsyncKuduClient implements AutoCloseable {
   final KuduTable masterTable;
   private final List<HostAndPort> masterAddresses;
 
-  private final HashedWheelTimer timer = new HashedWheelTimer(20, MILLISECONDS);
+  private final HashedWheelTimer timer;
 
   /**
    * Timestamp required for HybridTime external consistency through timestamp
@@ -238,6 +239,7 @@ public class AsyncKuduClient implements AutoCloseable {
     this.defaultOperationTimeoutMs = b.defaultOperationTimeoutMs;
     this.defaultAdminOperationTimeoutMs = b.defaultAdminOperationTimeoutMs;
     this.defaultSocketReadTimeoutMs = b.defaultSocketReadTimeoutMs;
+    this.timer = b.timer;
   }
 
   /**
@@ -1986,6 +1988,8 @@ public class AsyncKuduClient implements AutoCloseable {
     private long defaultOperationTimeoutMs = DEFAULT_OPERATION_TIMEOUT_MS;
     private long defaultSocketReadTimeoutMs = DEFAULT_SOCKET_READ_TIMEOUT_MS;
 
+    private final HashedWheelTimer timer =
+        new HashedWheelTimer(new ThreadFactoryBuilder().setDaemon(true).build(), 20, MILLISECONDS);
     private Executor bossExecutor;
     private Executor workerExecutor;
     private int bossCount = DEFAULT_BOSS_COUNT;
@@ -2118,7 +2122,12 @@ public class AsyncKuduClient implements AutoCloseable {
         if (boss == null) boss = defaultExec;
         if (worker == null) worker = defaultExec;
       }
-      return new NioClientSocketChannelFactory(boss, worker, bossCount, workerCount);
+      // Share the timer with the socket channel factory so that it does not
+      // create an internal timer with a non-daemon thread.
+      return new NioClientSocketChannelFactory(boss,
+                                               bossCount,
+                                               new NioWorkerPool(worker, workerCount),
+                                               timer);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/64f007c7/java/kudu-spark/src/main/scala/org/kududb/spark/KuduContext.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/kududb/spark/KuduContext.scala b/java/kudu-spark/src/main/scala/org/kududb/spark/KuduContext.scala
index a034099..c7bfe3e 100644
--- a/java/kudu-spark/src/main/scala/org/kududb/spark/KuduContext.scala
+++ b/java/kudu-spark/src/main/scala/org/kududb/spark/KuduContext.scala
@@ -19,8 +19,9 @@ package org.kududb.spark
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.io.NullWritable
-import org.apache.spark.rdd.RDD
+import org.apache.hadoop.util.ShutdownHookManager
 import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
 import org.kududb.annotations.InterfaceStability
 import org.kududb.client.{AsyncKuduClient, KuduClient, RowResult}
 import org.kududb.mapreduce.KuduTableInputFormat
@@ -34,8 +35,31 @@ import org.kududb.mapreduce.KuduTableInputFormat
   */
 @InterfaceStability.Unstable
 class KuduContext(kuduMaster: String) extends Serializable {
-  @transient lazy val syncClient = new KuduClient.KuduClientBuilder(kuduMaster).build()
-  @transient lazy val asyncClient = new AsyncKuduClient.AsyncKuduClientBuilder(kuduMaster).build()
+
+  /**
+    * Set to
+    * [[org.apache.spark.util.ShutdownHookManager.DEFAULT_SHUTDOWN_PRIORITY]].
+    * The client instances are closed through the JVM shutdown hook
+    * mechanism in order to make sure that any unflushed writes are cleaned up
+    * properly. Spark has no way of notifying the [[DefaultSource]] on shutdown.
+    */
+  private val ShutdownHookPriority = 100
+
+  @transient lazy val syncClient = {
+    val syncClient = new KuduClient.KuduClientBuilder(kuduMaster).build()
+    ShutdownHookManager.get().addShutdownHook(new Runnable {
+      override def run() = syncClient.close()
+    }, ShutdownHookPriority)
+    syncClient
+  }
+  @transient lazy val asyncClient = {
+    val asyncClient = new AsyncKuduClient.AsyncKuduClientBuilder(kuduMaster).build()
+    ShutdownHookManager.get().addShutdownHook(
+      new Runnable {
+        override def run() = asyncClient.close()
+      }, ShutdownHookPriority)
+    asyncClient
+  }
 
   /**
     * Create an RDD from a Kudu table.