You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/03/13 18:31:37 UTC
spark git commit: [SPARK-23547][SQL] Cleanup the .pipeout file when
the Hive Session closed
Repository: spark
Updated Branches:
refs/heads/master 9ddd1e2ce -> 918fb9bee
[SPARK-23547][SQL] Cleanup the .pipeout file when the Hive Session closed
## What changes were proposed in this pull request?
![2018-03-07_121010](https://user-images.githubusercontent.com/24823338/37073232-922e10d2-2200-11e8-8172-6e03aa984b39.png)
when the hive session closed, we should also cleanup the .pipeout file.
## How was this patch tested?
Added test cases.
Author: zuotingbing <zu...@zte.com.cn>
Closes #20702 from zuotingbing/SPARK-23547.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/918fb9be
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/918fb9be
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/918fb9be
Branch: refs/heads/master
Commit: 918fb9beee6a2fd499b8f18dfe0d460f078f5290
Parents: 9ddd1e2
Author: zuotingbing <zu...@zte.com.cn>
Authored: Tue Mar 13 11:31:32 2018 -0700
Committer: Wenchen Fan <we...@databricks.com>
Committed: Tue Mar 13 11:31:32 2018 -0700
----------------------------------------------------------------------
.../service/cli/session/HiveSessionImpl.java | 18 +++++++++++
.../thriftserver/HiveThriftServer2Suites.scala | 32 +++++++++++++++++++-
2 files changed, 49 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/918fb9be/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index fc818bc..f59cdcd 100644
--- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -641,6 +641,8 @@ public class HiveSessionImpl implements HiveSession {
opHandleSet.clear();
// Cleanup session log directory.
cleanupSessionLogDir();
+ // Cleanup pipeout file.
+ cleanupPipeoutFile();
HiveHistory hiveHist = sessionState.getHiveHistory();
if (null != hiveHist) {
hiveHist.closeStream();
@@ -665,6 +667,22 @@ public class HiveSessionImpl implements HiveSession {
}
}
+ private void cleanupPipeoutFile() {
+ String lScratchDir = hiveConf.getVar(ConfVars.LOCALSCRATCHDIR);
+ String sessionID = hiveConf.getVar(ConfVars.HIVESESSIONID);
+
+ File[] fileAry = new File(lScratchDir).listFiles(
+ (dir, name) -> name.startsWith(sessionID) && name.endsWith(".pipeout"));
+
+ for (File file : fileAry) {
+ try {
+ FileUtils.forceDelete(file);
+ } catch (Exception e) {
+ LOG.error("Failed to cleanup pipeout file: " + file, e);
+ }
+ }
+ }
+
private void cleanupSessionLogDir() {
if (isOperationLogEnabled) {
try {
http://git-wip-us.apache.org/repos/asf/spark/blob/918fb9be/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index b32c547..192f33a 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -17,10 +17,11 @@
package org.apache.spark.sql.hive.thriftserver
-import java.io.File
+import java.io.{File, FilenameFilter}
import java.net.URL
import java.nio.charset.StandardCharsets
import java.sql.{Date, DriverManager, SQLException, Statement}
+import java.util.UUID
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
@@ -613,6 +614,28 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
bufferSrc.close()
}
}
+
+ test("SPARK-23547 Cleanup the .pipeout file when the Hive Session closed") {
+ def pipeoutFileList(sessionID: UUID): Array[File] = {
+ lScratchDir.listFiles(new FilenameFilter {
+ override def accept(dir: File, name: String): Boolean = {
+ name.startsWith(sessionID.toString) && name.endsWith(".pipeout")
+ }
+ })
+ }
+
+ withCLIServiceClient { client =>
+ val user = System.getProperty("user.name")
+ val sessionHandle = client.openSession(user, "")
+ val sessionID = sessionHandle.getSessionId
+
+ assert(pipeoutFileList(sessionID).length == 1)
+
+ client.closeSession(sessionHandle)
+
+ assert(pipeoutFileList(sessionID).length == 0)
+ }
+ }
}
class SingleSessionSuite extends HiveThriftJdbcTest {
@@ -807,6 +830,7 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
private val pidDir: File = Utils.createTempDir(namePrefix = "thriftserver-pid")
protected var logPath: File = _
protected var operationLogPath: File = _
+ protected var lScratchDir: File = _
private var logTailingProcess: Process = _
private var diagnosisBuffer: ArrayBuffer[String] = ArrayBuffer.empty[String]
@@ -844,6 +868,7 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
| --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST}=localhost
| --hiveconf ${ConfVars.HIVE_SERVER2_TRANSPORT_MODE}=$mode
| --hiveconf ${ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION}=$operationLogPath
+ | --hiveconf ${ConfVars.LOCALSCRATCHDIR}=$lScratchDir
| --hiveconf $portConf=$port
| --driver-class-path $driverClassPath
| --driver-java-options -Dlog4j.debug
@@ -873,6 +898,8 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
metastorePath.delete()
operationLogPath = Utils.createTempDir()
operationLogPath.delete()
+ lScratchDir = Utils.createTempDir()
+ lScratchDir.delete()
logPath = null
logTailingProcess = null
@@ -956,6 +983,9 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
operationLogPath.delete()
operationLogPath = null
+ lScratchDir.delete()
+ lScratchDir = null
+
Option(logPath).foreach(_.delete())
logPath = null
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org