You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by da...@apache.org on 2016/11/02 00:40:13 UTC
hive git commit: HIVE-15068: Run ClearDanglingScratchDir periodically
inside HS2 (Daniel Dai, reviewed by Thejas Nair)
Repository: hive
Updated Branches:
refs/heads/master 95b17e530 -> 7c2639166
HIVE-15068: Run ClearDanglingScratchDir periodically inside HS2 (Daniel Dai, reviewed by Thejas Nair)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7c263916
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7c263916
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7c263916
Branch: refs/heads/master
Commit: 7c263916612bdbc50bc3b3b8a1bf7b96a0e12190
Parents: 95b17e5
Author: Daniel Dai <da...@hortonworks.com>
Authored: Tue Nov 1 17:39:55 2016 -0700
Committer: Daniel Dai <da...@hortonworks.com>
Committed: Tue Nov 1 17:39:55 2016 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 5 +
.../server/TestHS2ClearDanglingScratchDir.java | 82 +++++++++
.../ql/session/ClearDanglingScratchDir.java | 181 +++++++++++--------
.../apache/hive/service/server/HiveServer2.java | 27 ++-
4 files changed, 219 insertions(+), 76 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/7c263916/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 2f3fba7..80cd5ad 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2184,6 +2184,11 @@ public class HiveConf extends Configuration {
"SSL Versions to disable for all Hive Servers"),
// HiveServer2 specific configs
+ HIVE_SERVER2_CLEAR_DANGLING_SCRATCH_DIR("hive.server2.clear.dangling.scratchdir", false,
+ "Clear dangling scratch dir periodically in HS2"),
+ HIVE_SERVER2_CLEAR_DANGLING_SCRATCH_DIR_INTERVAL("hive.server2.clear.dangling.scratchdir.interval",
+ "1800s", new TimeValidator(TimeUnit.SECONDS),
+ "Interval to clear dangling scratch dir periodically in HS2"),
HIVE_SERVER2_SLEEP_INTERVAL_BETWEEN_START_ATTEMPTS("hive.server2.sleep.interval.between.start.attempts",
"60s", new TimeValidator(TimeUnit.MILLISECONDS, 0l, true, Long.MAX_VALUE, true),
"Amount of time to sleep between HiveServer2 start attempts. Primarily meant for tests"),
http://git-wip-us.apache.org/repos/asf/hive/blob/7c263916/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestHS2ClearDanglingScratchDir.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestHS2ClearDanglingScratchDir.java b/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestHS2ClearDanglingScratchDir.java
new file mode 100644
index 0000000..081ac96
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestHS2ClearDanglingScratchDir.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.service.server;
+
+import java.util.UUID;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.WindowsPathUtil;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.shims.Utils;
+import org.apache.hadoop.util.Shell;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestHS2ClearDanglingScratchDir {
+ @Test
+ public void testScratchDirCleared() throws Exception {
+ MiniDFSCluster m_dfs = new MiniDFSCluster.Builder(new Configuration()).numDataNodes(1).format(true).build();
+ HiveConf conf = new HiveConf();
+ conf.addResource(m_dfs.getConfiguration(0));
+ if (Shell.WINDOWS) {
+ WindowsPathUtil.convertPathsFromWindowsToHdfs(conf);
+ }
+ conf.set(HiveConf.ConfVars.HIVE_SCRATCH_DIR_LOCK.toString(), "true");
+ conf.set(HiveConf.ConfVars.HIVE_SERVER2_CLEAR_DANGLING_SCRATCH_DIR.toString(), "true");
+
+ Path scratchDir = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR));
+ m_dfs.getFileSystem().mkdirs(scratchDir);
+ m_dfs.getFileSystem().setPermission(scratchDir, new FsPermission("777"));
+
+ // Fake two live session
+ SessionState.start(conf);
+ conf.setVar(HiveConf.ConfVars.HIVESESSIONID, UUID.randomUUID().toString());
+ SessionState.start(conf);
+
+ // Fake dead session
+ Path fakeSessionPath = new Path(new Path(scratchDir, Utils.getUGI().getShortUserName()),
+ UUID.randomUUID().toString());
+ m_dfs.getFileSystem().mkdirs(fakeSessionPath);
+ m_dfs.getFileSystem().create(new Path(fakeSessionPath, "inuse.lck")).close();
+
+ FileStatus[] scratchDirs = m_dfs.getFileSystem()
+ .listStatus(new Path(scratchDir, Utils.getUGI().getShortUserName()));
+
+ Assert.assertEquals(scratchDirs.length, 3);
+
+ HiveServer2.scheduleClearDanglingScratchDir(conf, 0);
+
+ // Check dead session get cleared
+ long start = System.currentTimeMillis();
+ long end;
+ do {
+ Thread.sleep(200);
+ end = System.currentTimeMillis();
+ if (end - start > 5000) {
+ Assert.fail("timeout, scratch dir has not been cleared");
+ }
+ scratchDirs = m_dfs.getFileSystem()
+ .listStatus(new Path(scratchDir, Utils.getUGI().getShortUserName()));
+ } while (scratchDirs.length != 2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/7c263916/ql/src/java/org/apache/hadoop/hive/ql/session/ClearDanglingScratchDir.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/ClearDanglingScratchDir.java b/ql/src/java/org/apache/hadoop/hive/ql/session/ClearDanglingScratchDir.java
index 725f954..2fff92e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/ClearDanglingScratchDir.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/ClearDanglingScratchDir.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hive.ql.session;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -35,6 +36,8 @@ import org.apache.hadoop.hive.common.LogUtils.LogInitializationException;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A tool to remove dangling scratch directory. A scratch directory could be left behind
@@ -51,7 +54,13 @@ import org.apache.hadoop.ipc.RemoteException;
* again after 10 min. Once it become writable, cleardanglingscratchDir will be able to
* remove it
*/
-public class ClearDanglingScratchDir {
+public class ClearDanglingScratchDir implements Runnable {
+ private static final Logger LOG = LoggerFactory.getLogger(ClearDanglingScratchDir.class);
+ boolean dryRun = false;
+ boolean verbose = false;
+ boolean useConsole = false;
+ String rootHDFSDir;
+ HiveConf conf;
public static void main(String[] args) throws Exception {
try {
@@ -82,97 +91,119 @@ public class ClearDanglingScratchDir {
HiveConf conf = new HiveConf();
- Path rootHDFSDirPath;
+ String rootHDFSDir;
if (cli.hasOption("s")) {
- rootHDFSDirPath = new Path(cli.getOptionValue("s"));
+ rootHDFSDir = cli.getOptionValue("s");
} else {
- rootHDFSDirPath = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR));
+ rootHDFSDir = HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR);
}
+ ClearDanglingScratchDir clearDanglingScratchDirMain = new ClearDanglingScratchDir(dryRun,
+ verbose, true, rootHDFSDir, conf);
+ clearDanglingScratchDirMain.run();
+ }
- FileSystem fs = FileSystem.get(rootHDFSDirPath.toUri(), conf);
- FileStatus[] userHDFSDirList = fs.listStatus(rootHDFSDirPath);
-
- List<Path> scratchDirToRemove = new ArrayList<Path>();
- for (FileStatus userHDFSDir : userHDFSDirList) {
- FileStatus[] scratchDirList = fs.listStatus(userHDFSDir.getPath());
- for (FileStatus scratchDir : scratchDirList) {
- Path lockFilePath = new Path(scratchDir.getPath(), SessionState.LOCK_FILE_NAME);
- if (!fs.exists(lockFilePath)) {
- String message = "Skipping " + scratchDir.getPath() + " since it does not contain " +
- SessionState.LOCK_FILE_NAME;
- if (verbose) {
- SessionState.getConsole().printInfo(message);
- } else {
- SessionState.getConsole().logInfo(message);
+ public ClearDanglingScratchDir(boolean dryRun, boolean verbose, boolean useConsole,
+ String rootHDFSDir, HiveConf conf) {
+ this.dryRun = dryRun;
+ this.verbose = verbose;
+ this.useConsole = useConsole;
+ this.rootHDFSDir = rootHDFSDir;
+ this.conf = conf;
+ }
+
+ @Override
+ public void run() {
+ try {
+ Path rootHDFSDirPath = new Path(rootHDFSDir);
+ FileSystem fs = FileSystem.get(rootHDFSDirPath.toUri(), conf);
+ FileStatus[] userHDFSDirList = fs.listStatus(rootHDFSDirPath);
+
+ List<Path> scratchDirToRemove = new ArrayList<Path>();
+ for (FileStatus userHDFSDir : userHDFSDirList) {
+ FileStatus[] scratchDirList = fs.listStatus(userHDFSDir.getPath());
+ for (FileStatus scratchDir : scratchDirList) {
+ Path lockFilePath = new Path(scratchDir.getPath(), SessionState.LOCK_FILE_NAME);
+ if (!fs.exists(lockFilePath)) {
+ String message = "Skipping " + scratchDir.getPath() + " since it does not contain " +
+ SessionState.LOCK_FILE_NAME;
+ if (verbose) {
+ consoleMessage(message);
+ }
+ continue;
}
- continue;
- }
- boolean removable = false;
- boolean inuse = false;
- try {
- IOUtils.closeStream(fs.append(lockFilePath));
- removable = true;
- } catch (RemoteException eAppend) {
- // RemoteException with AlreadyBeingCreatedException will be thrown
- // if the file is currently held by a writer
- if(AlreadyBeingCreatedException.class.getName().equals(eAppend.getClassName())){
- inuse = true;
- } else if (UnsupportedOperationException.class.getName().equals(eAppend.getClassName())) {
- // Append is not supported in the cluster, try to use create
- try {
- IOUtils.closeStream(fs.create(lockFilePath, false));
- } catch (RemoteException eCreate) {
- if (AlreadyBeingCreatedException.class.getName().equals(eCreate.getClassName())){
- // If the file is held by a writer, will throw AlreadyBeingCreatedException
- inuse = true;
- } else {
- SessionState.getConsole().printInfo("Unexpected error:" + eCreate.getMessage());
+ boolean removable = false;
+ boolean inuse = false;
+ try {
+ IOUtils.closeStream(fs.append(lockFilePath));
+ removable = true;
+ } catch (RemoteException eAppend) {
+ // RemoteException with AlreadyBeingCreatedException will be thrown
+ // if the file is currently held by a writer
+ if(AlreadyBeingCreatedException.class.getName().equals(eAppend.getClassName())){
+ inuse = true;
+ } else if (UnsupportedOperationException.class.getName().equals(eAppend.getClassName())) {
+ // Append is not supported in the cluster, try to use create
+ try {
+ IOUtils.closeStream(fs.create(lockFilePath, false));
+ } catch (RemoteException eCreate) {
+ if (AlreadyBeingCreatedException.class.getName().equals(eCreate.getClassName())){
+ // If the file is held by a writer, will throw AlreadyBeingCreatedException
+ inuse = true;
+ } else {
+ consoleMessage("Unexpected error:" + eCreate.getMessage());
+ }
+ } catch (FileAlreadyExistsException eCreateNormal) {
+ // Otherwise, throw FileAlreadyExistsException, which means the file owner is
+ // dead
+ removable = true;
}
- } catch (FileAlreadyExistsException eCreateNormal) {
- // Otherwise, throw FileAlreadyExistsException, which means the file owner is
- // dead
- removable = true;
+ } else {
+ consoleMessage("Unexpected error:" + eAppend.getMessage());
}
- } else {
- SessionState.getConsole().printInfo("Unexpected error:" + eAppend.getMessage());
}
- }
- if (inuse) {
- // Cannot open the lock file for writing, must be held by a live process
- String message = scratchDir.getPath() + " is being used by live process";
- if (verbose) {
- SessionState.getConsole().printInfo(message);
- } else {
- SessionState.getConsole().logInfo(message);
+ if (inuse) {
+ // Cannot open the lock file for writing, must be held by a live process
+ String message = scratchDir.getPath() + " is being used by live process";
+ if (verbose) {
+ consoleMessage(message);
+ }
+ }
+ if (removable) {
+ scratchDirToRemove.add(scratchDir.getPath());
}
- }
- if (removable) {
- scratchDirToRemove.add(scratchDir.getPath());
}
}
- }
- if (scratchDirToRemove.size()==0) {
- SessionState.getConsole().printInfo("Cannot find any scratch directory to clear");
- return;
- }
- SessionState.getConsole().printInfo("Removing " + scratchDirToRemove.size() + " scratch directories");
- for (Path scratchDir : scratchDirToRemove) {
- if (dryRun) {
- System.out.println(scratchDir);
- } else {
- boolean succ = fs.delete(scratchDir, true);
- if (!succ) {
- SessionState.getConsole().printInfo("Cannot remove " + scratchDir);
+ if (scratchDirToRemove.size()==0) {
+ consoleMessage("Cannot find any scratch directory to clear");
+ return;
+ }
+ consoleMessage("Removing " + scratchDirToRemove.size() + " scratch directories");
+ for (Path scratchDir : scratchDirToRemove) {
+ if (dryRun) {
+ System.out.println(scratchDir);
} else {
- String message = scratchDir + " removed";
- if (verbose) {
- SessionState.getConsole().printInfo(message);
+ boolean succ = fs.delete(scratchDir, true);
+ if (!succ) {
+ consoleMessage("Cannot remove " + scratchDir);
} else {
- SessionState.getConsole().logInfo(message);
+ String message = scratchDir + " removed";
+ if (verbose) {
+ consoleMessage(message);
+ }
}
}
}
+ } catch (IOException e) {
+ consoleMessage("Unexpected exception " + e.getMessage());
+ }
+ }
+
+ private void consoleMessage(String message) {
+ if (useConsole) {
+ SessionState.getConsole().printInfo(message);
+ } else {
+ LOG.info(message);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/7c263916/service/src/java/org/apache/hive/service/server/HiveServer2.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index 590b1f3..9c94611 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -25,7 +25,10 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Random;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.cli.GnuParser;
@@ -34,6 +37,7 @@ import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
@@ -51,9 +55,9 @@ import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.coordinator.LlapCoordinator;
-import org.apache.hadoop.hive.llap.io.api.LlapProxy;
import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
+import org.apache.hadoop.hive.ql.session.ClearDanglingScratchDir;
import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.hive.shims.Utils;
@@ -81,6 +85,7 @@ import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
/**
@@ -538,6 +543,21 @@ public class HiveServer2 extends CompositeService {
}
}
+ @VisibleForTesting
+ public static void scheduleClearDanglingScratchDir(HiveConf hiveConf, int initialWaitInSec) {
+ if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_CLEAR_DANGLING_SCRATCH_DIR)) {
+ ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(
+ new BasicThreadFactory.Builder()
+ .namingPattern("cleardanglingscratchdir-%d")
+ .daemon(true)
+ .build());
+ executor.scheduleAtFixedRate(new ClearDanglingScratchDir(false, false, false,
+ HiveConf.getVar(hiveConf, HiveConf.ConfVars.SCRATCHDIR), hiveConf), initialWaitInSec,
+ HiveConf.getTimeVar(hiveConf, ConfVars.HIVE_SERVER2_CLEAR_DANGLING_SCRATCH_DIR_INTERVAL,
+ TimeUnit.SECONDS), TimeUnit.SECONDS);
+ }
+ }
+
private static void startHiveServer2() throws Throwable {
long attempts = 0, maxAttempts = 1;
while (true) {
@@ -558,6 +578,11 @@ public class HiveServer2 extends CompositeService {
// Cleanup the scratch dir before starting
ServerUtils.cleanUpScratchDir(hiveConf);
+ // Schedule task to cleanup dangling scratch dir periodically,
+ // initial wait for a random time between 0-10 min to
+ // avoid intial spike when using multiple HS2
+ scheduleClearDanglingScratchDir(hiveConf, new Random().nextInt(600));
+
server = new HiveServer2();
server.init(hiveConf);
server.start();