You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by da...@apache.org on 2016/11/02 00:40:13 UTC

hive git commit: HIVE-15068: Run ClearDanglingScratchDir periodically inside HS2 (Daniel Dai, reviewed by Thejas Nair)

Repository: hive
Updated Branches:
  refs/heads/master 95b17e530 -> 7c2639166


HIVE-15068: Run ClearDanglingScratchDir periodically inside HS2 (Daniel Dai, reviewed by Thejas Nair)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7c263916
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7c263916
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7c263916

Branch: refs/heads/master
Commit: 7c263916612bdbc50bc3b3b8a1bf7b96a0e12190
Parents: 95b17e5
Author: Daniel Dai <da...@hortonworks.com>
Authored: Tue Nov 1 17:39:55 2016 -0700
Committer: Daniel Dai <da...@hortonworks.com>
Committed: Tue Nov 1 17:39:55 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   5 +
 .../server/TestHS2ClearDanglingScratchDir.java  |  82 +++++++++
 .../ql/session/ClearDanglingScratchDir.java     | 181 +++++++++++--------
 .../apache/hive/service/server/HiveServer2.java |  27 ++-
 4 files changed, 219 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/7c263916/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 2f3fba7..80cd5ad 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2184,6 +2184,11 @@ public class HiveConf extends Configuration {
         "SSL Versions to disable for all Hive Servers"),
 
      // HiveServer2 specific configs
+    HIVE_SERVER2_CLEAR_DANGLING_SCRATCH_DIR("hive.server2.clear.dangling.scratchdir", false,
+        "Clear dangling scratch dir periodically in HS2"),
+    HIVE_SERVER2_CLEAR_DANGLING_SCRATCH_DIR_INTERVAL("hive.server2.clear.dangling.scratchdir.interval",
+        "1800s", new TimeValidator(TimeUnit.SECONDS),
+        "Interval to clear dangling scratch dir periodically in HS2"),
     HIVE_SERVER2_SLEEP_INTERVAL_BETWEEN_START_ATTEMPTS("hive.server2.sleep.interval.between.start.attempts",
         "60s", new TimeValidator(TimeUnit.MILLISECONDS, 0l, true, Long.MAX_VALUE, true),
         "Amount of time to sleep between HiveServer2 start attempts. Primarily meant for tests"),

http://git-wip-us.apache.org/repos/asf/hive/blob/7c263916/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestHS2ClearDanglingScratchDir.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestHS2ClearDanglingScratchDir.java b/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestHS2ClearDanglingScratchDir.java
new file mode 100644
index 0000000..081ac96
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestHS2ClearDanglingScratchDir.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.service.server;
+
+import java.util.UUID;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.WindowsPathUtil;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.shims.Utils;
+import org.apache.hadoop.util.Shell;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestHS2ClearDanglingScratchDir {
+  @Test
+  public void testScratchDirCleared() throws Exception {
+    MiniDFSCluster m_dfs = new MiniDFSCluster.Builder(new Configuration()).numDataNodes(1).format(true).build();
+    HiveConf conf = new HiveConf();
+    conf.addResource(m_dfs.getConfiguration(0));
+    if (Shell.WINDOWS) {
+      WindowsPathUtil.convertPathsFromWindowsToHdfs(conf);
+    }
+    conf.set(HiveConf.ConfVars.HIVE_SCRATCH_DIR_LOCK.toString(), "true");
+    conf.set(HiveConf.ConfVars.HIVE_SERVER2_CLEAR_DANGLING_SCRATCH_DIR.toString(), "true");
+
+    Path scratchDir = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR));
+    m_dfs.getFileSystem().mkdirs(scratchDir);
+    m_dfs.getFileSystem().setPermission(scratchDir, new FsPermission("777"));
+
+    // Fake two live session
+    SessionState.start(conf);
+    conf.setVar(HiveConf.ConfVars.HIVESESSIONID, UUID.randomUUID().toString());
+    SessionState.start(conf);
+
+    // Fake dead session
+    Path fakeSessionPath = new Path(new Path(scratchDir, Utils.getUGI().getShortUserName()),
+        UUID.randomUUID().toString());
+    m_dfs.getFileSystem().mkdirs(fakeSessionPath);
+    m_dfs.getFileSystem().create(new Path(fakeSessionPath, "inuse.lck")).close();
+
+    FileStatus[] scratchDirs = m_dfs.getFileSystem()
+        .listStatus(new Path(scratchDir, Utils.getUGI().getShortUserName()));
+
+    Assert.assertEquals(scratchDirs.length, 3);
+
+    HiveServer2.scheduleClearDanglingScratchDir(conf, 0);
+
+    // Check dead session get cleared
+    long start = System.currentTimeMillis();
+    long end;
+    do {
+      Thread.sleep(200);
+      end = System.currentTimeMillis();
+      if (end - start > 5000) {
+        Assert.fail("timeout, scratch dir has not been cleared");
+      }
+      scratchDirs = m_dfs.getFileSystem()
+          .listStatus(new Path(scratchDir, Utils.getUGI().getShortUserName()));
+    } while (scratchDirs.length != 2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/7c263916/ql/src/java/org/apache/hadoop/hive/ql/session/ClearDanglingScratchDir.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/ClearDanglingScratchDir.java b/ql/src/java/org/apache/hadoop/hive/ql/session/ClearDanglingScratchDir.java
index 725f954..2fff92e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/ClearDanglingScratchDir.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/ClearDanglingScratchDir.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.ql.session;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -35,6 +36,8 @@ import org.apache.hadoop.hive.common.LogUtils.LogInitializationException;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RemoteException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A tool to remove dangling scratch directory. A scratch directory could be left behind
@@ -51,7 +54,13 @@ import org.apache.hadoop.ipc.RemoteException;
  *    again after 10 min. Once it become writable, cleardanglingscratchDir will be able to
  *    remove it
  */
-public class ClearDanglingScratchDir {
+public class ClearDanglingScratchDir implements Runnable {
+  private static final Logger LOG = LoggerFactory.getLogger(ClearDanglingScratchDir.class);
+  boolean dryRun = false;
+  boolean verbose = false;
+  boolean useConsole = false;
+  String rootHDFSDir;
+  HiveConf conf;
 
   public static void main(String[] args) throws Exception {
     try {
@@ -82,97 +91,119 @@ public class ClearDanglingScratchDir {
 
     HiveConf conf = new HiveConf();
 
-    Path rootHDFSDirPath;
+    String rootHDFSDir;
     if (cli.hasOption("s")) {
-      rootHDFSDirPath = new Path(cli.getOptionValue("s"));
+      rootHDFSDir = cli.getOptionValue("s");
     } else {
-      rootHDFSDirPath = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR));
+      rootHDFSDir = HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR);
     }
+    ClearDanglingScratchDir clearDanglingScratchDirMain = new ClearDanglingScratchDir(dryRun,
+        verbose, true, rootHDFSDir, conf);
+    clearDanglingScratchDirMain.run();
+  }
 
-    FileSystem fs = FileSystem.get(rootHDFSDirPath.toUri(), conf);
-    FileStatus[] userHDFSDirList = fs.listStatus(rootHDFSDirPath);
-
-    List<Path> scratchDirToRemove = new ArrayList<Path>();
-    for (FileStatus userHDFSDir : userHDFSDirList) {
-      FileStatus[] scratchDirList = fs.listStatus(userHDFSDir.getPath());
-      for (FileStatus scratchDir : scratchDirList) {
-        Path lockFilePath = new Path(scratchDir.getPath(), SessionState.LOCK_FILE_NAME);
-        if (!fs.exists(lockFilePath)) {
-          String message = "Skipping " + scratchDir.getPath() + " since it does not contain " +
-              SessionState.LOCK_FILE_NAME;
-          if (verbose) {
-            SessionState.getConsole().printInfo(message);
-          } else {
-            SessionState.getConsole().logInfo(message);
+  public ClearDanglingScratchDir(boolean dryRun, boolean verbose, boolean useConsole,
+      String rootHDFSDir, HiveConf conf) {
+    this.dryRun = dryRun;
+    this.verbose = verbose;
+    this.useConsole = useConsole;
+    this.rootHDFSDir = rootHDFSDir;
+    this.conf = conf;
+  }
+
+  @Override
+  public void run() {
+    try {
+      Path rootHDFSDirPath = new Path(rootHDFSDir);
+      FileSystem fs = FileSystem.get(rootHDFSDirPath.toUri(), conf);
+      FileStatus[] userHDFSDirList = fs.listStatus(rootHDFSDirPath);
+
+      List<Path> scratchDirToRemove = new ArrayList<Path>();
+      for (FileStatus userHDFSDir : userHDFSDirList) {
+        FileStatus[] scratchDirList = fs.listStatus(userHDFSDir.getPath());
+        for (FileStatus scratchDir : scratchDirList) {
+          Path lockFilePath = new Path(scratchDir.getPath(), SessionState.LOCK_FILE_NAME);
+          if (!fs.exists(lockFilePath)) {
+            String message = "Skipping " + scratchDir.getPath() + " since it does not contain " +
+                SessionState.LOCK_FILE_NAME;
+            if (verbose) {
+              consoleMessage(message);
+            }
+            continue;
           }
-          continue;
-        }
-        boolean removable = false;
-        boolean inuse = false;
-        try {
-          IOUtils.closeStream(fs.append(lockFilePath));
-          removable = true;
-        } catch (RemoteException eAppend) {
-          // RemoteException with AlreadyBeingCreatedException will be thrown
-          // if the file is currently held by a writer
-          if(AlreadyBeingCreatedException.class.getName().equals(eAppend.getClassName())){
-            inuse = true;
-          } else if (UnsupportedOperationException.class.getName().equals(eAppend.getClassName())) {
-            // Append is not supported in the cluster, try to use create
-            try {
-              IOUtils.closeStream(fs.create(lockFilePath, false));
-            } catch (RemoteException eCreate) {
-              if (AlreadyBeingCreatedException.class.getName().equals(eCreate.getClassName())){
-                // If the file is held by a writer, will throw AlreadyBeingCreatedException
-                inuse = true;
-              }  else {
-                SessionState.getConsole().printInfo("Unexpected error:" + eCreate.getMessage());
+          boolean removable = false;
+          boolean inuse = false;
+          try {
+            IOUtils.closeStream(fs.append(lockFilePath));
+            removable = true;
+          } catch (RemoteException eAppend) {
+            // RemoteException with AlreadyBeingCreatedException will be thrown
+            // if the file is currently held by a writer
+            if(AlreadyBeingCreatedException.class.getName().equals(eAppend.getClassName())){
+              inuse = true;
+            } else if (UnsupportedOperationException.class.getName().equals(eAppend.getClassName())) {
+              // Append is not supported in the cluster, try to use create
+              try {
+                IOUtils.closeStream(fs.create(lockFilePath, false));
+              } catch (RemoteException eCreate) {
+                if (AlreadyBeingCreatedException.class.getName().equals(eCreate.getClassName())){
+                  // If the file is held by a writer, will throw AlreadyBeingCreatedException
+                  inuse = true;
+                }  else {
+                  consoleMessage("Unexpected error:" + eCreate.getMessage());
+                }
+              } catch (FileAlreadyExistsException eCreateNormal) {
+                  // Otherwise, throw FileAlreadyExistsException, which means the file owner is
+                  // dead
+                  removable = true;
               }
-            } catch (FileAlreadyExistsException eCreateNormal) {
-                // Otherwise, throw FileAlreadyExistsException, which means the file owner is
-                // dead
-                removable = true;
+            } else {
+              consoleMessage("Unexpected error:" + eAppend.getMessage());
             }
-          } else {
-            SessionState.getConsole().printInfo("Unexpected error:" + eAppend.getMessage());
           }
-        }
-        if (inuse) {
-          // Cannot open the lock file for writing, must be held by a live process
-          String message = scratchDir.getPath() + " is being used by live process";
-          if (verbose) {
-            SessionState.getConsole().printInfo(message);
-          } else {
-            SessionState.getConsole().logInfo(message);
+          if (inuse) {
+            // Cannot open the lock file for writing, must be held by a live process
+            String message = scratchDir.getPath() + " is being used by live process";
+            if (verbose) {
+              consoleMessage(message);
+            }
+          }
+          if (removable) {
+            scratchDirToRemove.add(scratchDir.getPath());
           }
-        }
-        if (removable) {
-          scratchDirToRemove.add(scratchDir.getPath());
         }
       }
-    }
 
-    if (scratchDirToRemove.size()==0) {
-      SessionState.getConsole().printInfo("Cannot find any scratch directory to clear");
-      return;
-    }
-    SessionState.getConsole().printInfo("Removing " + scratchDirToRemove.size() + " scratch directories");
-    for (Path scratchDir : scratchDirToRemove) {
-      if (dryRun) {
-        System.out.println(scratchDir);
-      } else {
-        boolean succ = fs.delete(scratchDir, true);
-        if (!succ) {
-          SessionState.getConsole().printInfo("Cannot remove " + scratchDir);
+      if (scratchDirToRemove.size()==0) {
+        consoleMessage("Cannot find any scratch directory to clear");
+        return;
+      }
+      consoleMessage("Removing " + scratchDirToRemove.size() + " scratch directories");
+      for (Path scratchDir : scratchDirToRemove) {
+        if (dryRun) {
+          System.out.println(scratchDir);
         } else {
-          String message = scratchDir + " removed";
-          if (verbose) {
-            SessionState.getConsole().printInfo(message);
+          boolean succ = fs.delete(scratchDir, true);
+          if (!succ) {
+            consoleMessage("Cannot remove " + scratchDir);
           } else {
-            SessionState.getConsole().logInfo(message);
+            String message = scratchDir + " removed";
+            if (verbose) {
+              consoleMessage(message);
+            }
           }
         }
       }
+    } catch (IOException e) {
+      consoleMessage("Unexpected exception " + e.getMessage());
+    }
+  }
+
+  private void consoleMessage(String message) {
+    if (useConsole) {
+      SessionState.getConsole().printInfo(message);
+    } else {
+      LOG.info(message);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/7c263916/service/src/java/org/apache/hive/service/server/HiveServer2.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index 590b1f3..9c94611 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -25,7 +25,10 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Random;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.cli.GnuParser;
@@ -34,6 +37,7 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.ACLProvider;
@@ -51,9 +55,9 @@ import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.coordinator.LlapCoordinator;
-import org.apache.hadoop.hive.llap.io.api.LlapProxy;
 import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
 import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
+import org.apache.hadoop.hive.ql.session.ClearDanglingScratchDir;
 import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.hive.shims.Utils;
@@ -81,6 +85,7 @@ import org.apache.zookeeper.data.ACL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 
 /**
@@ -538,6 +543,21 @@ public class HiveServer2 extends CompositeService {
     }
   }
 
+  @VisibleForTesting
+  public static void scheduleClearDanglingScratchDir(HiveConf hiveConf, int initialWaitInSec) {
+    if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_CLEAR_DANGLING_SCRATCH_DIR)) {
+      ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(
+          new BasicThreadFactory.Builder()
+          .namingPattern("cleardanglingscratchdir-%d")
+          .daemon(true)
+          .build());
+      executor.scheduleAtFixedRate(new ClearDanglingScratchDir(false, false, false,
+          HiveConf.getVar(hiveConf, HiveConf.ConfVars.SCRATCHDIR), hiveConf), initialWaitInSec,
+          HiveConf.getTimeVar(hiveConf, ConfVars.HIVE_SERVER2_CLEAR_DANGLING_SCRATCH_DIR_INTERVAL,
+          TimeUnit.SECONDS), TimeUnit.SECONDS);
+    }
+  }
+
   private static void startHiveServer2() throws Throwable {
     long attempts = 0, maxAttempts = 1;
     while (true) {
@@ -558,6 +578,11 @@ public class HiveServer2 extends CompositeService {
 
         // Cleanup the scratch dir before starting
         ServerUtils.cleanUpScratchDir(hiveConf);
+        // Schedule task to cleanup dangling scratch dir periodically,
+        // initial wait for a random time between 0-10 min to
+        // avoid intial spike when using multiple HS2
+        scheduleClearDanglingScratchDir(hiveConf, new Random().nextInt(600));
+
         server = new HiveServer2();
         server.init(hiveConf);
         server.start();