You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/06/15 11:50:24 UTC
[19/42] carbondata git commit: close dictionary server on application
end
close dictionary server on application end
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/43e06b65
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/43e06b65
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/43e06b65
Branch: refs/heads/branch-1.1
Commit: 43e06b65a7fbeaf35dced6ece4f8014015960ba2
Parents: 50da524
Author: kunal642 <ku...@knoldus.in>
Authored: Sun May 21 23:12:59 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Jun 15 12:58:11 2017 +0530
----------------------------------------------------------------------
.../carbondata/core/dictionary/server/DictionaryServer.java | 4 +---
.../spark/sql/execution/command/carbonTableSchema.scala | 6 ++++++
.../spark/sql/execution/command/carbonTableSchema.scala | 7 +++++++
3 files changed, 14 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/43e06b65/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServer.java b/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServer.java
index f86cd6b..84f2a0d 100644
--- a/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServer.java
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServer.java
@@ -135,11 +135,9 @@ public class DictionaryServer {
* @throws Exception
*/
public void shutdown() throws Exception {
+ LOGGER.info("Shutting down dictionary server");
worker.shutdownGracefully();
boss.shutdownGracefully();
- // Wait until all threads are terminated.
- boss.terminationFuture().sync();
- worker.terminationFuture().sync();
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/43e06b65/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 494beff..7258511 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
import scala.language.implicitConversions
import org.apache.commons.lang3.StringUtils
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
@@ -525,6 +526,11 @@ case class LoadTable(
val dictionaryServer = DictionaryServer
.getInstance(dictionaryServerPort.toInt)
carbonLoadModel.setDictionaryServerPort(dictionaryServer.getPort)
+ sqlContext.sparkContext.addSparkListener(new SparkListener() {
+ override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
+ dictionaryServer.shutdown()
+ }
+ })
Some(dictionaryServer)
} else {
None
http://git-wip-us.apache.org/repos/asf/carbondata/blob/43e06b65/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 09824d8..5dd6832 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
import scala.language.implicitConversions
import org.apache.commons.lang3.StringUtils
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
@@ -533,10 +534,16 @@ case class LoadTable(
val dictionaryServer = DictionaryServer
.getInstance(dictionaryServerPort.toInt)
carbonLoadModel.setDictionaryServerPort(dictionaryServer.getPort)
+ sparkSession.sparkContext.addSparkListener(new SparkListener() {
+ override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
+ dictionaryServer.shutdown()
+ }
+ })
Some(dictionaryServer)
} else {
None
}
+
CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
carbonLoadModel,
relation.tableMeta.storePath,