You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2018/07/04 07:38:27 UTC
hive git commit: HIVE-20025: Clean-up of event files created by
HiveProtoLoggingHook (Sankar Hariappan, reviewed by Harish Jaiprakash,
Anishek Agarwal)
Repository: hive
Updated Branches:
refs/heads/master ee8c72ae1 -> 6311e0b03
HIVE-20025: Clean-up of event files created by HiveProtoLoggingHook (Sankar Hariappan, reviewed by Harish Jaiprakash, Anishek Agarwal)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/6311e0b0
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/6311e0b0
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/6311e0b0
Branch: refs/heads/master
Commit: 6311e0b031b5937d39bde8aad9916c8d0911f0b3
Parents: ee8c72a
Author: Sankar Hariappan <sa...@apache.org>
Authored: Wed Jul 4 13:08:00 2018 +0530
Committer: Sankar Hariappan <sa...@apache.org>
Committed: Wed Jul 4 13:08:00 2018 +0530
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 11 ++
.../TestHiveProtoEventsCleanerTask.java | 141 ++++++++++++++++
.../metastore/HiveProtoEventsCleanerTask.java | 168 +++++++++++++++++++
.../hive/ql/hooks/HiveProtoLoggingHook.java | 15 +-
.../hive/ql/hooks/TestHiveProtoLoggingHook.java | 2 +-
.../hive/metastore/conf/MetastoreConf.java | 3 +-
6 files changed, 331 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/6311e0b0/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 7ef22d6..2da9086 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -625,6 +625,17 @@ public class HiveConf extends Configuration {
"Table alias will be added to column names for queries of type \"select *\" or \n" +
"if query explicitly uses table alias \"select r1.x..\"."),
+ HIVE_PROTO_EVENTS_BASE_PATH("hive.hook.proto.base-directory", "",
+ "Base directory into which the proto event messages are written by HiveProtoLoggingHook."),
+ HIVE_PROTO_EVENTS_QUEUE_CAPACITY("hive.hook.proto.queue.capacity", 64,
+ "Queue capacity for the proto events logging threads."),
+ HIVE_PROTO_EVENTS_CLEAN_FREQ("hive.hook.proto.events.clean.freq", "1d",
+ new TimeValidator(TimeUnit.DAYS),
+ "Frequency at which timer task runs to purge expired proto event files."),
+ HIVE_PROTO_EVENTS_TTL("hive.hook.proto.events.ttl", "7d",
+ new TimeValidator(TimeUnit.DAYS),
+ "Time-To-Live (TTL) of proto event files before cleanup."),
+
// Hadoop Configuration Properties
// Properties with null values are ignored and exist only for the purpose of giving us
// a symbolic name to reference in the Hive source code. Properties with non-null
http://git-wip-us.apache.org/repos/asf/hive/blob/6311e0b0/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveProtoEventsCleanerTask.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveProtoEventsCleanerTask.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveProtoEventsCleanerTask.java
new file mode 100644
index 0000000..e187fad
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveProtoEventsCleanerTask.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.HiveProtoEventsCleanerTask;
+import org.apache.hadoop.hive.metastore.utils.JavaUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestHiveProtoEventsCleanerTask {
+ protected static final Logger LOG = LoggerFactory.getLogger(TestHiveProtoEventsCleanerTask.class);
+
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder();
+ private Path baseDir;
+ private HiveConf hiveConf;
+ private SystemClock clock = SystemClock.getInstance();
+ private HiveProtoEventsCleanerTask cleanerTask;
+ private FileSystem fs;
+
+ private final String[] eventsSubDirs = new String[] { "query_data", "dag_meta", "dag_data", "app_data" };
+
+ @Before
+ public void setup() throws Exception {
+ hiveConf = new HiveConf(TestHiveProtoEventsCleanerTask.class);
+ String tmpFolder = folder.newFolder().getAbsolutePath();
+ hiveConf.setVar(HiveConf.ConfVars.HIVE_PROTO_EVENTS_BASE_PATH, tmpFolder + "/" + eventsSubDirs[0]);
+ HiveConf.setTimeVar(hiveConf, ConfVars.HIVE_PROTO_EVENTS_TTL, 2, TimeUnit.DAYS);
+
+ baseDir = new Path(tmpFolder);
+ fs = baseDir.getFileSystem(hiveConf);
+ cleanerTask = JavaUtils.newInstance(HiveProtoEventsCleanerTask.class);
+ cleanerTask.setConf(hiveConf);
+ }
+
+ /**
+ * Returns the current date, using the underlying clock in UTC time.
+ */
+ private LocalDate getNow() {
+ // Use UTC date to ensure reader date is same on all timezones.
+ return LocalDateTime.ofEpochSecond(clock.getTime() / 1000, 0, ZoneOffset.UTC).toLocalDate();
+ }
+
+ /**
+ * Returns the directory name for a given date.
+ */
+ public String getDirForDate(LocalDate date) {
+ return "date=" + DateTimeFormatter.ISO_LOCAL_DATE.format(date);
+ }
+
+ private void addDatePartition(Path basePath, LocalDate date) throws Exception {
+ if (!fs.exists(basePath)) {
+ fs.mkdirs(basePath);
+ fs.setPermission(basePath, FsPermission.createImmutable((short)01777));
+ }
+
+ Path datePtn = new Path(basePath, getDirForDate(date));
+ fs.mkdirs(datePtn);
+ fs.setPermission(datePtn, FsPermission.createImmutable((short) 01777));
+ FsPermission.setUMask(hiveConf, FsPermission.createImmutable((short) 0066));
+ Path partFile = new Path(datePtn, "data");
+ FSDataOutputStream out = fs.create(partFile);
+ out.writeInt(1000);
+ out.close();
+ }
+
+ @Test
+ public void testCleanup() throws Exception {
+ int[] inRange = { 3, 5, 2, 1 }; // Must have one entry per eventsSubDirs
+ int[] outRange = { 2, 2, 2, 1 }; // Must have one entry per eventsSubDirs
+ LocalDate today = getNow();
+
+ // Add partitions for the given range of dates from today to past.
+ for (int i = 0; i < inRange.length; i++) {
+ Path basePath = new Path(baseDir + "/" + eventsSubDirs[i]);
+ for (int j = 0; j < inRange[i]; j++) {
+ addDatePartition(basePath, today.minusDays(j));
+ }
+ }
+
+ // Run the task to cleanup
+ cleanerTask.run();
+
+ // Verify if the remaining partitions are not expired ones.
+ String expiredPtn = getDirForDate(today.minusDays(2));
+ for (int i = 0; i < inRange.length; i++) {
+ Path basePath = new Path(baseDir + "/" + eventsSubDirs[i]);
+ FileStatus[] statuses = fs.listStatus(basePath);
+
+ // If the test setup created today and if test runs tomorrow, then extra dir will be deleted.
+ // So, checking for both cases.
+ assertTrue((statuses.length == outRange[i]) || (statuses.length == (outRange[i] - 1)));
+ for (FileStatus status : statuses) {
+ assertTrue(status.getPath().getName().compareTo(expiredPtn) >= 0);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/6311e0b0/metastore/src/java/org/apache/hadoop/hive/metastore/HiveProtoEventsCleanerTask.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveProtoEventsCleanerTask.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveProtoEventsCleanerTask.java
new file mode 100644
index 0000000..2a772e2
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveProtoEventsCleanerTask.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class HiveProtoEventsCleanerTask implements MetastoreTaskThread {
+ public static final Logger LOG = LoggerFactory.getLogger(HiveProtoEventsCleanerTask.class);
+
+ private final String[] eventsSubDirs = new String[] { "query_data", "dag_meta", "dag_data", "app_data" };
+ private List<Path> eventsBasePaths = new ArrayList<>();
+ private Configuration conf;
+ private long ttl;
+ private static String expiredDatePtn = null;
+ private static final SystemClock clock = SystemClock.getInstance();
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+
+ String hiveEventsDir = HiveConf.getVar(conf, ConfVars.HIVE_PROTO_EVENTS_BASE_PATH);
+ if (StringUtils.isBlank(hiveEventsDir)) {
+ return;
+ }
+ Path hiveEventsBasePath = new Path(hiveEventsDir);
+ Path baseDir = hiveEventsBasePath.getParent();
+ for (String subDir : eventsSubDirs) {
+ eventsBasePaths.add(new Path(baseDir, subDir));
+ }
+ assert(eventsBasePaths.get(0).equals(hiveEventsBasePath));
+ ttl = HiveConf.getTimeVar(conf, ConfVars.HIVE_PROTO_EVENTS_TTL, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public long runFrequency(TimeUnit unit) {
+ return HiveConf.getTimeVar(conf, ConfVars.HIVE_PROTO_EVENTS_CLEAN_FREQ, unit);
+ }
+
+ @Override
+ public void run() {
+ // If Hive proto logging is not enabled, then nothing to be cleaned-up.
+ if (eventsBasePaths.isEmpty()) {
+ return;
+ }
+
+ // Expired date should be computed each time we run cleaner thread.
+ computeExpiredDatePtn(ttl);
+ for (Path basePath : eventsBasePaths) {
+ cleanupDir(basePath);
+ }
+ }
+
+ /**
+ * Compute the expired date partition, using the underlying clock in UTC time.
+ */
+ private static void computeExpiredDatePtn(long ttl) {
+ // Use UTC date to ensure reader date is same on all timezones.
+ LocalDate expiredDate
+ = LocalDateTime.ofEpochSecond((clock.getTime() - ttl) / 1000, 0, ZoneOffset.UTC).toLocalDate();
+ expiredDatePtn = "date=" + DateTimeFormatter.ISO_LOCAL_DATE.format(expiredDate);
+ }
+
+ /**
+ * Path filters to include only expired date partitions based on TTL.
+ */
+ private static final PathFilter expiredDatePartitionsFilter = new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ String dirName = path.getName();
+ return ((dirName.startsWith("date="))
+ && (dirName.compareTo(expiredDatePtn) <= 0));
+ }
+ };
+
+ /**
+ * Finds the expired date partitioned events directory based on TTL and delete them.
+ */
+ private void cleanupDir(Path eventsBasePath) {
+ LOG.debug("Trying to delete expired proto events from " + eventsBasePath);
+ try {
+ FileSystem fs = FileSystem.get(eventsBasePath.toUri(), conf);
+ if (!fs.exists(eventsBasePath)) {
+ return;
+ }
+ FileStatus[] statuses = fs.listStatus(eventsBasePath, expiredDatePartitionsFilter);
+ for (FileStatus dir : statuses) {
+ try {
+ deleteDirByOwner(fs, dir);
+ LOG.info("Deleted expired proto events dir: " + dir.getPath());
+ } catch (IOException ioe) {
+ // Log error and continue to delete other expired dirs.
+ LOG.error("Error deleting expired proto events dir " + dir.getPath(), ioe);
+ }
+ }
+ } catch (IOException e) {
+ LOG.error("Error while trying to delete expired proto events from " + eventsBasePath, e);
+ }
+ }
+
+ /**
+ * Delete the events dir with it's owner as proxy user.
+ */
+ private void deleteDirByOwner(FileSystem fs, FileStatus eventsDir) throws IOException {
+ String owner = eventsDir.getOwner();
+ if (owner.equals(System.getProperty("user.name"))) {
+ fs.delete(eventsDir.getPath(), true);
+ } else {
+ LOG.info("Deleting " + eventsDir.getPath() + " as user " + owner);
+ UserGroupInformation ugi = UserGroupInformation.createProxyUser(owner,
+ UserGroupInformation.getLoginUser());
+ try {
+ ugi.doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws Exception {
+ // New FileSystem object to be obtained in user context for doAs flow.
+ try (FileSystem doAsFs = FileSystem.newInstance(eventsDir.getPath().toUri(), conf)) {
+ doAsFs.delete(eventsDir.getPath(), true);
+ }
+ return null;
+ }
+ });
+ } catch (InterruptedException ie) {
+ LOG.error("Could not delete " + eventsDir.getPath() + " for UGI: " + ugi, ie);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/6311e0b0/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
index 0820bea..f463437 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
@@ -102,8 +102,10 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.compress.utils.IOUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.exec.ExplainTask;
@@ -158,9 +160,7 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
.collect(Collectors.toSet());
}
- public static final String HIVE_EVENTS_BASE_PATH = "hive.hook.proto.base-directory";
- public static final String HIVE_HOOK_PROTO_QUEUE_CAPACITY = "hive.hook.proto.queue.capacity";
- public static final int HIVE_HOOK_PROTO_QUEUE_CAPACITY_DEFAULT = 64;
+ private static final int HIVE_HOOK_PROTO_QUEUE_CAPACITY_DEFAULT = 64;
private static final int WAIT_TIME = 5;
public enum EventType {
@@ -190,9 +190,10 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
this.clock = clock;
// randomUUID is slow, since its cryptographically secure, only first query will take time.
this.logFileName = "hive_" + UUID.randomUUID().toString();
- String baseDir = conf.get(HIVE_EVENTS_BASE_PATH);
- if (baseDir == null) {
- LOG.error(HIVE_EVENTS_BASE_PATH + " is not set, logging disabled.");
+ String baseDir = conf.getVar(ConfVars.HIVE_PROTO_EVENTS_BASE_PATH);
+ if (StringUtils.isBlank(baseDir)) {
+ baseDir = null;
+ LOG.error(ConfVars.HIVE_PROTO_EVENTS_BASE_PATH.varname + " is not set, logging disabled.");
}
DatePartitionedLogger<HiveHookEventProto> tmpLogger = null;
@@ -211,7 +212,7 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
return;
}
- int queueCapacity = conf.getInt(HIVE_HOOK_PROTO_QUEUE_CAPACITY,
+ int queueCapacity = conf.getInt(ConfVars.HIVE_PROTO_EVENTS_QUEUE_CAPACITY.varname,
HIVE_HOOK_PROTO_QUEUE_CAPACITY_DEFAULT);
ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
http://git-wip-us.apache.org/repos/asf/hive/blob/6311e0b0/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
index 96fb73c..8124528 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
@@ -64,7 +64,7 @@ public class TestHiveProtoLoggingHook {
public void setup() throws Exception {
conf = new HiveConf();
tmpFolder = folder.newFolder().getAbsolutePath();
- conf.set(HiveProtoLoggingHook.HIVE_EVENTS_BASE_PATH, tmpFolder);
+ conf.setVar(HiveConf.ConfVars.HIVE_PROTO_EVENTS_BASE_PATH, tmpFolder);
QueryState state = new QueryState.Builder().withHiveConf(conf).build();
@SuppressWarnings("serial")
QueryPlan queryPlan = new QueryPlan(HiveOperation.QUERY) {};
http://git-wip-us.apache.org/repos/asf/hive/blob/6311e0b0/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index 19da432..74a301f 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -763,7 +763,8 @@ public class MetastoreConf {
EventCleanerTask.class.getName() + "," + RuntimeStatsCleanerTask.class.getName() + "," +
"org.apache.hadoop.hive.metastore.repl.DumpDirCleanerTask" + "," +
MaterializationsCacheCleanerTask.class.getName() + "," +
- MaterializationsRebuildLockCleanerTask.class.getName() + "," + RuntimeStatsCleanerTask.class.getName(),
+ MaterializationsRebuildLockCleanerTask.class.getName() + "," + RuntimeStatsCleanerTask.class.getName() + "," +
+ "org.apache.hadoop.hive.metastore.HiveProtoEventsCleanerTask",
"Comma separated list of tasks that will be started in separate threads. These will " +
"always be started, regardless of whether the metastore is running in embedded mode " +
"or in server mode. They must implement " + MetastoreTaskThread.class.getName()),