You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2018/07/04 07:38:27 UTC

hive git commit: HIVE-20025: Clean-up of event files created by HiveProtoLoggingHook (Sankar Hariappan, reviewed by Harish Jaiprakash, Anishek Agarwal)

Repository: hive
Updated Branches:
  refs/heads/master ee8c72ae1 -> 6311e0b03


HIVE-20025: Clean-up of event files created by HiveProtoLoggingHook (Sankar Hariappan, reviewed by Harish Jaiprakash, Anishek Agarwal)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/6311e0b0
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/6311e0b0
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/6311e0b0

Branch: refs/heads/master
Commit: 6311e0b031b5937d39bde8aad9916c8d0911f0b3
Parents: ee8c72a
Author: Sankar Hariappan <sa...@apache.org>
Authored: Wed Jul 4 13:08:00 2018 +0530
Committer: Sankar Hariappan <sa...@apache.org>
Committed: Wed Jul 4 13:08:00 2018 +0530

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  11 ++
 .../TestHiveProtoEventsCleanerTask.java         | 141 ++++++++++++++++
 .../metastore/HiveProtoEventsCleanerTask.java   | 168 +++++++++++++++++++
 .../hive/ql/hooks/HiveProtoLoggingHook.java     |  15 +-
 .../hive/ql/hooks/TestHiveProtoLoggingHook.java |   2 +-
 .../hive/metastore/conf/MetastoreConf.java      |   3 +-
 6 files changed, 331 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/6311e0b0/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 7ef22d6..2da9086 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -625,6 +625,17 @@ public class HiveConf extends Configuration {
         "Table alias will be added to column names for queries of type \"select *\" or \n" +
         "if query explicitly uses table alias \"select r1.x..\"."),
 
+    HIVE_PROTO_EVENTS_BASE_PATH("hive.hook.proto.base-directory", "",
+            "Base directory into which the proto event messages are written by HiveProtoLoggingHook."),
+    HIVE_PROTO_EVENTS_QUEUE_CAPACITY("hive.hook.proto.queue.capacity", 64,
+            "Queue capacity for the proto events logging threads."),
+    HIVE_PROTO_EVENTS_CLEAN_FREQ("hive.hook.proto.events.clean.freq", "1d",
+            new TimeValidator(TimeUnit.DAYS),
+            "Frequency at which timer task runs to purge expired proto event files."),
+    HIVE_PROTO_EVENTS_TTL("hive.hook.proto.events.ttl", "7d",
+            new TimeValidator(TimeUnit.DAYS),
+            "Time-To-Live (TTL) of proto event files before cleanup."),
+
     // Hadoop Configuration Properties
     // Properties with null values are ignored and exist only for the purpose of giving us
     // a symbolic name to reference in the Hive source code. Properties with non-null

http://git-wip-us.apache.org/repos/asf/hive/blob/6311e0b0/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveProtoEventsCleanerTask.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveProtoEventsCleanerTask.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveProtoEventsCleanerTask.java
new file mode 100644
index 0000000..e187fad
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveProtoEventsCleanerTask.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.HiveProtoEventsCleanerTask;
+import org.apache.hadoop.hive.metastore.utils.JavaUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestHiveProtoEventsCleanerTask {
+  protected static final Logger LOG = LoggerFactory.getLogger(TestHiveProtoEventsCleanerTask.class);
+
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
+  private Path baseDir;
+  private HiveConf hiveConf;
+  private SystemClock clock = SystemClock.getInstance();
+  private HiveProtoEventsCleanerTask cleanerTask;
+  private FileSystem fs;
+
+  private final String[] eventsSubDirs = new String[] { "query_data", "dag_meta", "dag_data", "app_data" };
+
+  @Before
+  public void setup() throws Exception {
+    hiveConf = new HiveConf(TestHiveProtoEventsCleanerTask.class);
+    String tmpFolder = folder.newFolder().getAbsolutePath();
+    hiveConf.setVar(HiveConf.ConfVars.HIVE_PROTO_EVENTS_BASE_PATH, tmpFolder + "/" + eventsSubDirs[0]);
+    HiveConf.setTimeVar(hiveConf, ConfVars.HIVE_PROTO_EVENTS_TTL, 2, TimeUnit.DAYS);
+
+    baseDir = new Path(tmpFolder);
+    fs = baseDir.getFileSystem(hiveConf);
+    cleanerTask = JavaUtils.newInstance(HiveProtoEventsCleanerTask.class);
+    cleanerTask.setConf(hiveConf);
+  }
+
+  /**
+   * Returns the current date, using the underlying clock in UTC time.
+   */
+  private LocalDate getNow() {
+    // Use UTC date to ensure reader date is same on all timezones.
+    return LocalDateTime.ofEpochSecond(clock.getTime() / 1000, 0, ZoneOffset.UTC).toLocalDate();
+  }
+
+  /**
+   * Returns the directory name for a given date.
+   */
+  public String getDirForDate(LocalDate date) {
+    return "date=" + DateTimeFormatter.ISO_LOCAL_DATE.format(date);
+  }
+
+  private void addDatePartition(Path basePath, LocalDate date) throws Exception {
+    if (!fs.exists(basePath)) {
+      fs.mkdirs(basePath);
+      fs.setPermission(basePath, FsPermission.createImmutable((short)01777));
+    }
+
+    Path datePtn = new Path(basePath, getDirForDate(date));
+    fs.mkdirs(datePtn);
+    fs.setPermission(datePtn, FsPermission.createImmutable((short) 01777));
+    FsPermission.setUMask(hiveConf, FsPermission.createImmutable((short) 0066));
+    Path partFile = new Path(datePtn, "data");
+    FSDataOutputStream out = fs.create(partFile);
+    out.writeInt(1000);
+    out.close();
+  }
+
+  @Test
+  public void testCleanup() throws Exception {
+    int[] inRange = { 3, 5, 2, 1 }; // Must have one entry per eventsSubDirs
+    int[] outRange = { 2, 2, 2, 1 }; // Must have one entry per eventsSubDirs
+    LocalDate today = getNow();
+
+    // Add partitions for the given range of dates from today to past.
+    for (int i = 0; i < inRange.length; i++) {
+      Path basePath = new Path(baseDir + "/" + eventsSubDirs[i]);
+      for (int j = 0; j < inRange[i]; j++) {
+        addDatePartition(basePath, today.minusDays(j));
+      }
+    }
+
+    // Run the task to cleanup
+    cleanerTask.run();
+
+    // Verify if the remaining partitions are not expired ones.
+    String expiredPtn = getDirForDate(today.minusDays(2));
+    for (int i = 0; i < inRange.length; i++) {
+      Path basePath = new Path(baseDir + "/" + eventsSubDirs[i]);
+      FileStatus[] statuses = fs.listStatus(basePath);
+
+      // If the test setup created today and if test runs tomorrow, then extra dir will be deleted.
+      // So, checking for both cases.
+      assertTrue((statuses.length == outRange[i]) || (statuses.length == (outRange[i] - 1)));
+      for (FileStatus status : statuses) {
+        assertTrue(status.getPath().getName().compareTo(expiredPtn) >= 0);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/6311e0b0/metastore/src/java/org/apache/hadoop/hive/metastore/HiveProtoEventsCleanerTask.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveProtoEventsCleanerTask.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveProtoEventsCleanerTask.java
new file mode 100644
index 0000000..2a772e2
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveProtoEventsCleanerTask.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class HiveProtoEventsCleanerTask implements MetastoreTaskThread {
+  public static final Logger LOG = LoggerFactory.getLogger(HiveProtoEventsCleanerTask.class);
+
+  private final String[] eventsSubDirs = new String[] { "query_data", "dag_meta", "dag_data", "app_data" };
+  private List<Path> eventsBasePaths = new ArrayList<>();
+  private Configuration conf;
+  private long ttl;
+  private static String expiredDatePtn = null;
+  private static final SystemClock clock = SystemClock.getInstance();
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+
+    String hiveEventsDir = HiveConf.getVar(conf, ConfVars.HIVE_PROTO_EVENTS_BASE_PATH);
+    if (StringUtils.isBlank(hiveEventsDir)) {
+      return;
+    }
+    Path hiveEventsBasePath = new Path(hiveEventsDir);
+    Path baseDir = hiveEventsBasePath.getParent();
+    for (String subDir : eventsSubDirs) {
+      eventsBasePaths.add(new Path(baseDir, subDir));
+    }
+    assert(eventsBasePaths.get(0).equals(hiveEventsBasePath));
+    ttl = HiveConf.getTimeVar(conf, ConfVars.HIVE_PROTO_EVENTS_TTL, TimeUnit.MILLISECONDS);
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public long runFrequency(TimeUnit unit) {
+    return HiveConf.getTimeVar(conf, ConfVars.HIVE_PROTO_EVENTS_CLEAN_FREQ, unit);
+  }
+
+  @Override
+  public void run() {
+    // If Hive proto logging is not enabled, then nothing to be cleaned-up.
+    if (eventsBasePaths.isEmpty()) {
+      return;
+    }
+
+    // Expired date should be computed each time we run cleaner thread.
+    computeExpiredDatePtn(ttl);
+    for (Path basePath : eventsBasePaths) {
+      cleanupDir(basePath);
+    }
+  }
+
+  /**
+   * Compute the expired date partition, using the underlying clock in UTC time.
+   */
+  private static void computeExpiredDatePtn(long ttl) {
+    // Use UTC date to ensure reader date is same on all timezones.
+    LocalDate expiredDate
+            = LocalDateTime.ofEpochSecond((clock.getTime() - ttl) / 1000, 0, ZoneOffset.UTC).toLocalDate();
+    expiredDatePtn = "date=" + DateTimeFormatter.ISO_LOCAL_DATE.format(expiredDate);
+  }
+
+  /**
+   * Path filters to include only expired date partitions based on TTL.
+   */
+  private static final PathFilter expiredDatePartitionsFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      String dirName = path.getName();
+      return ((dirName.startsWith("date="))
+              && (dirName.compareTo(expiredDatePtn) <= 0));
+    }
+  };
+
+  /**
+   * Finds the expired date partitioned events directory based on TTL and delete them.
+   */
+  private void cleanupDir(Path eventsBasePath) {
+    LOG.debug("Trying to delete expired proto events from " + eventsBasePath);
+    try {
+      FileSystem fs = FileSystem.get(eventsBasePath.toUri(), conf);
+      if (!fs.exists(eventsBasePath)) {
+        return;
+      }
+      FileStatus[] statuses = fs.listStatus(eventsBasePath, expiredDatePartitionsFilter);
+      for (FileStatus dir : statuses) {
+        try {
+          deleteDirByOwner(fs, dir);
+          LOG.info("Deleted expired proto events dir: " + dir.getPath());
+        } catch (IOException ioe) {
+          // Log error and continue to delete other expired dirs.
+          LOG.error("Error deleting expired proto events dir " + dir.getPath(), ioe);
+        }
+      }
+    } catch (IOException e) {
+      LOG.error("Error while trying to delete expired proto events from " + eventsBasePath, e);
+    }
+  }
+
+  /**
+   * Delete the events dir with it's owner as proxy user.
+   */
+  private void deleteDirByOwner(FileSystem fs, FileStatus eventsDir) throws IOException {
+    String owner = eventsDir.getOwner();
+    if (owner.equals(System.getProperty("user.name"))) {
+      fs.delete(eventsDir.getPath(), true);
+    } else {
+      LOG.info("Deleting " + eventsDir.getPath() + " as user " + owner);
+      UserGroupInformation ugi = UserGroupInformation.createProxyUser(owner,
+              UserGroupInformation.getLoginUser());
+      try {
+        ugi.doAs(new PrivilegedExceptionAction<Object>() {
+          @Override
+          public Object run() throws Exception {
+            // New FileSystem object to be obtained in user context for doAs flow.
+            try (FileSystem doAsFs = FileSystem.newInstance(eventsDir.getPath().toUri(), conf)) {
+              doAsFs.delete(eventsDir.getPath(), true);
+            }
+            return null;
+          }
+        });
+      } catch (InterruptedException ie) {
+        LOG.error("Could not delete " + eventsDir.getPath() + " for UGI: " + ugi, ie);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/6311e0b0/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
index 0820bea..f463437 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
@@ -102,8 +102,10 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import org.apache.commons.compress.utils.IOUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.exec.ExplainTask;
@@ -158,9 +160,7 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
             .collect(Collectors.toSet());
   }
 
-  public static final String HIVE_EVENTS_BASE_PATH = "hive.hook.proto.base-directory";
-  public static final String HIVE_HOOK_PROTO_QUEUE_CAPACITY = "hive.hook.proto.queue.capacity";
-  public static final int HIVE_HOOK_PROTO_QUEUE_CAPACITY_DEFAULT = 64;
+  private static final int HIVE_HOOK_PROTO_QUEUE_CAPACITY_DEFAULT = 64;
   private static final int WAIT_TIME = 5;
 
   public enum EventType {
@@ -190,9 +190,10 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
       this.clock = clock;
       // randomUUID is slow, since its cryptographically secure, only first query will take time.
       this.logFileName = "hive_" + UUID.randomUUID().toString();
-      String baseDir = conf.get(HIVE_EVENTS_BASE_PATH);
-      if (baseDir == null) {
-        LOG.error(HIVE_EVENTS_BASE_PATH + " is not set, logging disabled.");
+      String baseDir = conf.getVar(ConfVars.HIVE_PROTO_EVENTS_BASE_PATH);
+      if (StringUtils.isBlank(baseDir)) {
+        baseDir = null;
+        LOG.error(ConfVars.HIVE_PROTO_EVENTS_BASE_PATH.varname + " is not set, logging disabled.");
       }
 
       DatePartitionedLogger<HiveHookEventProto> tmpLogger = null;
@@ -211,7 +212,7 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
         return;
       }
 
-      int queueCapacity = conf.getInt(HIVE_HOOK_PROTO_QUEUE_CAPACITY,
+      int queueCapacity = conf.getInt(ConfVars.HIVE_PROTO_EVENTS_QUEUE_CAPACITY.varname,
           HIVE_HOOK_PROTO_QUEUE_CAPACITY_DEFAULT);
 
       ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)

http://git-wip-us.apache.org/repos/asf/hive/blob/6311e0b0/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
index 96fb73c..8124528 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
@@ -64,7 +64,7 @@ public class TestHiveProtoLoggingHook {
   public void setup() throws Exception {
     conf = new HiveConf();
     tmpFolder = folder.newFolder().getAbsolutePath();
-    conf.set(HiveProtoLoggingHook.HIVE_EVENTS_BASE_PATH, tmpFolder);
+    conf.setVar(HiveConf.ConfVars.HIVE_PROTO_EVENTS_BASE_PATH, tmpFolder);
     QueryState state = new QueryState.Builder().withHiveConf(conf).build();
     @SuppressWarnings("serial")
     QueryPlan queryPlan = new QueryPlan(HiveOperation.QUERY) {};

http://git-wip-us.apache.org/repos/asf/hive/blob/6311e0b0/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index 19da432..74a301f 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -763,7 +763,8 @@ public class MetastoreConf {
         EventCleanerTask.class.getName() + "," + RuntimeStatsCleanerTask.class.getName() + "," +
         "org.apache.hadoop.hive.metastore.repl.DumpDirCleanerTask" + "," +
         MaterializationsCacheCleanerTask.class.getName() + "," +
-            MaterializationsRebuildLockCleanerTask.class.getName() + "," + RuntimeStatsCleanerTask.class.getName(),
+            MaterializationsRebuildLockCleanerTask.class.getName() + "," + RuntimeStatsCleanerTask.class.getName() + "," +
+            "org.apache.hadoop.hive.metastore.HiveProtoEventsCleanerTask",
         "Comma separated list of tasks that will be started in separate threads.  These will " +
             "always be started, regardless of whether the metastore is running in embedded mode " +
             "or in server mode.  They must implement " + MetastoreTaskThread.class.getName()),