You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/09/19 22:42:20 UTC
[34/34] hive git commit: HIVE-14700 : clean up file/txn information
via a metastore thread (Sergey Shelukhin)
HIVE-14700 : clean up file/txn information via a metastore thread (Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/70299dc4
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/70299dc4
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/70299dc4
Branch: refs/heads/hive-14535
Commit: 70299dc48f93433fb53611b05f8a719b841575c5
Parents: 8708398
Author: Sergey Shelukhin <se...@apache.org>
Authored: Mon Sep 19 15:40:19 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Mon Sep 19 15:40:19 2016 -0700
----------------------------------------------------------------------
.../hadoop/hive/common/ValidWriteIds.java | 40 +-
.../org/apache/hadoop/hive/conf/HiveConf.java | 21 +-
.../apache/hive/common/util/MockFileSystem.java | 622 +++++++++++++++++++
.../upgrade/derby/037-HIVE-14637.derby.sql | 2 +-
.../upgrade/derby/hive-schema-2.2.0.derby.sql | 2 +-
.../upgrade/mssql/022-HIVE-14637.mssql.sql | 3 +-
.../upgrade/mssql/hive-schema-2.2.0.mssql.sql | 3 +-
.../upgrade/mysql/037-HIVE-14637.mysql.sql | 3 +-
.../upgrade/mysql/hive-schema-2.2.0.mysql.sql | 3 +-
.../upgrade/oracle/037-HIVE-14637.oracle.sql | 1 +
.../upgrade/oracle/hive-schema-2.2.0.oracle.sql | 1 +
.../postgres/036-HIVE-14637.postgres.sql | 1 +
.../postgres/hive-schema-2.2.0.postgres.sql | 1 +
.../hadoop/hive/metastore/HiveMetaStore.java | 26 +-
.../hadoop/hive/metastore/MetaStoreThread.java | 1 +
.../hadoop/hive/metastore/MetaStoreUtils.java | 10 +
.../hadoop/hive/metastore/MmCleanerThread.java | 397 ++++++++++++
.../hadoop/hive/metastore/ObjectStore.java | 147 +++--
.../apache/hadoop/hive/metastore/RawStore.java | 26 +-
.../hadoop/hive/metastore/hbase/HBaseStore.java | 29 +-
.../hive/metastore/model/MTableWrite.java | 12 +-
metastore/src/model/package.jdo | 3 +
.../DummyRawStoreControlledCommit.java | 25 +-
.../DummyRawStoreForJdoConnection.java | 25 +-
.../hadoop/hive/metastore/TestObjectStore.java | 177 +++++-
.../java/org/apache/hadoop/hive/ql/Driver.java | 3 +-
.../org/apache/hadoop/hive/ql/io/AcidUtils.java | 7 +-
.../apache/hadoop/hive/ql/metadata/Hive.java | 2 +-
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 4 +-
.../hive/ql/txn/compactor/CompactorThread.java | 1 -
.../apache/hadoop/hive/ql/io/TestAcidUtils.java | 12 +-
.../hive/ql/io/orc/TestInputOutputFormat.java | 549 +---------------
32 files changed, 1527 insertions(+), 632 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java b/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java
index b25a72d..b939b43 100644
--- a/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java
+++ b/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java
@@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory;
public class ValidWriteIds {
public static final ValidWriteIds NO_WRITE_IDS = new ValidWriteIds(-1, -1, false, null);
- private static final String MM_PREFIX = "mm";
+ public static final String MM_PREFIX = "mm";
private final static Logger LOG = LoggerFactory.getLogger(ValidWriteIds.class);
@@ -117,22 +117,8 @@ public class ValidWriteIds {
}
public boolean isValidInput(Path file) {
- String fileName = file.getName();
- String[] parts = fileName.split("_", 3);
- if (parts.length < 2 || !MM_PREFIX.equals(parts[0])) {
- LOG.info("Ignoring unknown file for a MM table: " + file
- + " (" + Arrays.toString(parts) + ")");
- return false;
- }
- long writeId = -1;
- try {
- writeId = Long.parseLong(parts[1]);
- } catch (NumberFormatException ex) {
- LOG.info("Ignoring unknown file for a MM table: " + file
- + "; parsing " + parts[1] + " got " + ex.getMessage());
- return false;
- }
- return isValid(writeId);
+ Long writeId = extractWriteId(file);
+ return (writeId != null) && isValid(writeId);
}
public static String getMmFilePrefix(long mmWriteId) {
@@ -155,4 +141,24 @@ public class ValidWriteIds {
return isMatch == (name.startsWith(prefix) || name.startsWith(tmpPrefix));
}
}
+
+
+ public static Long extractWriteId(Path file) {
+ String fileName = file.getName();
+ String[] parts = fileName.split("_", 3);
+ if (parts.length < 2 || !MM_PREFIX.equals(parts[0])) {
+ LOG.info("Cannot extract write ID for a MM table: " + file
+ + " (" + Arrays.toString(parts) + ")");
+ return null;
+ }
+ long writeId = -1;
+ try {
+ writeId = Long.parseLong(parts[1]);
+ } catch (NumberFormatException ex) {
+ LOG.info("Cannot extract write ID for a MM table: " + file
+ + "; parsing " + parts[1] + " got " + ex.getMessage());
+ return null;
+ }
+ return writeId;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 301159e..1a85f50 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -297,7 +297,10 @@ public class HiveConf extends Configuration {
HiveConf.ConfVars.METASTORE_HBASE_AGGR_STATS_MEMORY_TTL,
HiveConf.ConfVars.METASTORE_HBASE_AGGR_STATS_INVALIDATOR_FREQUENCY,
HiveConf.ConfVars.METASTORE_HBASE_AGGR_STATS_HBASE_TTL,
- HiveConf.ConfVars.METASTORE_HBASE_FILE_METADATA_THREADS
+ HiveConf.ConfVars.METASTORE_HBASE_FILE_METADATA_THREADS,
+ HiveConf.ConfVars.HIVE_METASTORE_MM_THREAD_SCAN_INTERVAL,
+ HiveConf.ConfVars.HIVE_METASTORE_MM_HEARTBEAT_TIMEOUT,
+ HiveConf.ConfVars.HIVE_METASTORE_MM_ABSOLUTE_TIMEOUT
};
/**
@@ -3104,6 +3107,22 @@ public class HiveConf extends Configuration {
"Log tracing id that can be used by upstream clients for tracking respective logs. " +
"Truncated to " + LOG_PREFIX_LENGTH + " characters. Defaults to use auto-generated session id."),
+ HIVE_METASTORE_MM_THREAD_SCAN_INTERVAL("hive.metastore.mm.thread.scan.interval", "900s",
+ new TimeValidator(TimeUnit.SECONDS),
+ "MM table housekeeping thread interval in this metastore instance. 0 to disable."),
+
+ HIVE_METASTORE_MM_HEARTBEAT_TIMEOUT("hive.metastore.mm.heartbeat.timeout", "1800s",
+ new TimeValidator(TimeUnit.SECONDS),
+ "MM write ID times out after this long if a heartbeat is not send. Currently disabled."), // TODO# heartbeating not implemented
+
+ HIVE_METASTORE_MM_ABSOLUTE_TIMEOUT("hive.metastore.mm.absolute.timeout", "7d",
+ new TimeValidator(TimeUnit.SECONDS),
+ "MM write ID cannot be outstanding for more than this long."),
+
+ HIVE_METASTORE_MM_ABORTED_GRACE_PERIOD("hive.metastore.mm.aborted.grace.period", "1d",
+ new TimeValidator(TimeUnit.SECONDS),
+ "MM write ID will not be removed up for that long after it has been aborted;\n" +
+ "this is to work around potential races e.g. with FS visibility, when deleting files."),
HIVE_CONF_RESTRICTED_LIST("hive.conf.restricted.list",
"hive.security.authenticator.manager,hive.security.authorization.manager,hive.users.in.admin.role," +
http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/common/src/test/org/apache/hive/common/util/MockFileSystem.java
----------------------------------------------------------------------
diff --git a/common/src/test/org/apache/hive/common/util/MockFileSystem.java b/common/src/test/org/apache/hive/common/util/MockFileSystem.java
new file mode 100644
index 0000000..e65fd33
--- /dev/null
+++ b/common/src/test/org/apache/hive/common/util/MockFileSystem.java
@@ -0,0 +1,622 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.common.util;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Progressable;
+
+public class MockFileSystem extends FileSystem {
+ final List<MockFile> files = new ArrayList<MockFile>();
+ final Map<MockFile, FileStatus> fileStatusMap = new HashMap<>();
+ Path workingDir = new Path("/");
+ // statics for when the mock fs is created via FileSystem.get
+ private static String blockedUgi = null;
+ private final static List<MockFile> globalFiles = new ArrayList<MockFile>();
+ protected Statistics statistics;
+ public boolean allowDelete = false;
+
+ public MockFileSystem() {
+ // empty
+ }
+
+ @Override
+ public void initialize(URI uri, Configuration conf) {
+ setConf(conf);
+ statistics = getStatistics("mock", getClass());
+ }
+
+ public MockFileSystem(Configuration conf, MockFile... files) {
+ setConf(conf);
+ this.files.addAll(Arrays.asList(files));
+ statistics = getStatistics("mock", getClass());
+ }
+
+ public static void setBlockedUgi(String s) {
+ blockedUgi = s;
+ }
+
+ public void clear() {
+ files.clear();
+ }
+
+ @Override
+ public URI getUri() {
+ try {
+ return new URI("mock:///");
+ } catch (URISyntaxException err) {
+ throw new IllegalArgumentException("huh?", err);
+ }
+ }
+
+ // increments file modification time
+ public void touch(MockFile file) {
+ if (fileStatusMap.containsKey(file)) {
+ FileStatus fileStatus = fileStatusMap.get(file);
+ FileStatus fileStatusNew = new FileStatus(fileStatus.getLen(), fileStatus.isDirectory(),
+ fileStatus.getReplication(), fileStatus.getBlockSize(),
+ fileStatus.getModificationTime() + 1, fileStatus.getAccessTime(),
+ fileStatus.getPermission(), fileStatus.getOwner(), fileStatus.getGroup(),
+ fileStatus.getPath());
+ fileStatusMap.put(file, fileStatusNew);
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static class MockAccessDenied extends IOException {
+ }
+
+ @Override
+ public FSDataInputStream open(Path path, int i) throws IOException {
+ statistics.incrementReadOps(1);
+ checkAccess();
+ MockFile file = findFile(path);
+ if (file != null) return new FSDataInputStream(new MockInputStream(file));
+ throw new IOException("File not found: " + path);
+ }
+
+ public MockFile findFile(Path path) {
+ for (MockFile file: files) {
+ if (file.path.equals(path)) {
+ return file;
+ }
+ }
+ for (MockFile file: globalFiles) {
+ if (file.path.equals(path)) {
+ return file;
+ }
+ }
+ return null;
+ }
+
+ private void checkAccess() throws IOException {
+ if (blockedUgi == null) return;
+ if (!blockedUgi.equals(UserGroupInformation.getCurrentUser().getShortUserName())) return;
+ throw new MockAccessDenied();
+ }
+
+ @Override
+ public FSDataOutputStream create(Path path, FsPermission fsPermission,
+ boolean overwrite, int bufferSize,
+ short replication, long blockSize,
+ Progressable progressable
+ ) throws IOException {
+ statistics.incrementWriteOps(1);
+ checkAccess();
+ MockFile file = findFile(path);
+ if (file == null) {
+ file = new MockFile(path.toString(), (int) blockSize, new byte[0]);
+ files.add(file);
+ }
+ return new MockOutputStream(file);
+ }
+
+ @Override
+ public FSDataOutputStream append(Path path, int bufferSize,
+ Progressable progressable
+ ) throws IOException {
+ statistics.incrementWriteOps(1);
+ checkAccess();
+ return create(path, FsPermission.getDefault(), true, bufferSize,
+ (short) 3, 256 * 1024, progressable);
+ }
+
+ @Override
+ public boolean rename(Path path, Path path2) throws IOException {
+ statistics.incrementWriteOps(1);
+ checkAccess();
+ return false;
+ }
+
+ @Override
+ public boolean delete(Path path) throws IOException {
+ statistics.incrementWriteOps(1);
+ checkAccess();
+ return false;
+ }
+
+ @Override
+ public boolean delete(Path path, boolean isRecursive) throws IOException {
+ statistics.incrementWriteOps(1);
+ checkAccess();
+ return allowDelete && isRecursive && deleteMatchingFiles(files, path.toString());
+ }
+
+ @Override
+ public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f)
+ throws IOException {
+ return new RemoteIterator<LocatedFileStatus>() {
+ private Iterator<LocatedFileStatus> iterator = listLocatedFileStatuses(f).iterator();
+
+ @Override
+ public boolean hasNext() throws IOException {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public LocatedFileStatus next() throws IOException {
+ return iterator.next();
+ }
+ };
+ }
+
+ private List<LocatedFileStatus> listLocatedFileStatuses(Path path) throws IOException {
+ statistics.incrementReadOps(1);
+ checkAccess();
+ path = path.makeQualified(this);
+ List<LocatedFileStatus> result = new ArrayList<>();
+ String pathname = path.toString();
+ String pathnameAsDir = pathname + "/";
+ Set<String> dirs = new TreeSet<String>();
+ MockFile file = findFile(path);
+ if (file != null) {
+ result.add(createLocatedStatus(file));
+ return result;
+ }
+ findMatchingLocatedFiles(files, pathnameAsDir, dirs, result);
+ findMatchingLocatedFiles(globalFiles, pathnameAsDir, dirs, result);
+ // for each directory add it once
+ for(String dir: dirs) {
+ result.add(createLocatedDirectory(new MockPath(this, pathnameAsDir + dir)));
+ }
+ return result;
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path path) throws IOException {
+ statistics.incrementReadOps(1);
+ checkAccess();
+ path = path.makeQualified(this);
+ List<FileStatus> result = new ArrayList<FileStatus>();
+ String pathname = path.toString();
+ String pathnameAsDir = pathname + "/";
+ Set<String> dirs = new TreeSet<String>();
+ MockFile file = findFile(path);
+ if (file != null) {
+ return new FileStatus[]{createStatus(file)};
+ }
+ findMatchingFiles(files, pathnameAsDir, dirs, result);
+ findMatchingFiles(globalFiles, pathnameAsDir, dirs, result);
+ // for each directory add it once
+ for(String dir: dirs) {
+ result.add(createDirectory(new MockPath(this, pathnameAsDir + dir)));
+ }
+ return result.toArray(new FileStatus[result.size()]);
+ }
+
+ private void findMatchingFiles(
+ List<MockFile> files, String pathnameAsDir, Set<String> dirs, List<FileStatus> result) {
+ for (MockFile file: files) {
+ String filename = file.path.toString();
+ if (filename.startsWith(pathnameAsDir)) {
+ String tail = filename.substring(pathnameAsDir.length());
+ int nextSlash = tail.indexOf('/');
+ if (nextSlash > 0) {
+ dirs.add(tail.substring(0, nextSlash));
+ } else {
+ result.add(createStatus(file));
+ }
+ }
+ }
+ }
+
+ private boolean deleteMatchingFiles(List<MockFile> files, String path) {
+ Iterator<MockFile> fileIter = files.iterator();
+ boolean result = true;
+ while (fileIter.hasNext()) {
+ MockFile file = fileIter.next();
+ String filename = file.path.toString();
+ if (!filename.startsWith(path)) continue;
+ if (filename.length() <= path.length() || filename.charAt(path.length()) != '/') continue;
+ if (file.cannotDelete) {
+ result = false;
+ continue;
+ }
+ assert !file.isDeleted;
+ file.isDeleted = true;
+ fileIter.remove();
+ }
+ return result;
+ }
+
+ private void findMatchingLocatedFiles(
+ List<MockFile> files, String pathnameAsDir, Set<String> dirs, List<LocatedFileStatus> result)
+ throws IOException {
+ for (MockFile file: files) {
+ String filename = file.path.toString();
+ if (filename.startsWith(pathnameAsDir)) {
+ String tail = filename.substring(pathnameAsDir.length());
+ int nextSlash = tail.indexOf('/');
+ if (nextSlash > 0) {
+ dirs.add(tail.substring(0, nextSlash));
+ } else {
+ result.add(createLocatedStatus(file));
+ }
+ }
+ }
+ }
+
+ @Override
+ public void setWorkingDirectory(Path path) {
+ workingDir = path;
+ }
+
+ @Override
+ public Path getWorkingDirectory() {
+ return workingDir;
+ }
+
+ @Override
+ public boolean mkdirs(Path path, FsPermission fsPermission) {
+ statistics.incrementWriteOps(1);
+ return false;
+ }
+
+ private FileStatus createStatus(MockFile file) {
+ if (fileStatusMap.containsKey(file)) {
+ return fileStatusMap.get(file);
+ }
+ FileStatus fileStatus = new FileStatus(file.length, false, 1, file.blockSize, 0, 0,
+ FsPermission.createImmutable((short) 644), "owen", "group",
+ file.path);
+ fileStatusMap.put(file, fileStatus);
+ return fileStatus;
+ }
+
+ private FileStatus createDirectory(Path dir) {
+ return new FileStatus(0, true, 0, 0, 0, 0,
+ FsPermission.createImmutable((short) 755), "owen", "group", dir);
+ }
+
+ private LocatedFileStatus createLocatedStatus(MockFile file) throws IOException {
+ FileStatus fileStatus = createStatus(file);
+ return new LocatedFileStatus(fileStatus,
+ getFileBlockLocationsImpl(fileStatus, 0, fileStatus.getLen(), false));
+ }
+
+ private LocatedFileStatus createLocatedDirectory(Path dir) throws IOException {
+ FileStatus fileStatus = createDirectory(dir);
+ return new LocatedFileStatus(fileStatus,
+ getFileBlockLocationsImpl(fileStatus, 0, fileStatus.getLen(), false));
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path path) throws IOException {
+ statistics.incrementReadOps(1);
+ checkAccess();
+ path = path.makeQualified(this);
+ String pathnameAsDir = path.toString() + "/";
+ MockFile file = findFile(path);
+ if (file != null) return createStatus(file);
+ for (MockFile dir : files) {
+ if (dir.path.toString().startsWith(pathnameAsDir)) {
+ return createDirectory(path);
+ }
+ }
+ for (MockFile dir : globalFiles) {
+ if (dir.path.toString().startsWith(pathnameAsDir)) {
+ return createDirectory(path);
+ }
+ }
+ throw new FileNotFoundException("File " + path + " does not exist");
+ }
+
+ @Override
+ public BlockLocation[] getFileBlockLocations(FileStatus stat,
+ long start, long len) throws IOException {
+ return getFileBlockLocationsImpl(stat, start, len, true);
+ }
+
+ private BlockLocation[] getFileBlockLocationsImpl(final FileStatus stat, final long start,
+ final long len,
+ final boolean updateStats) throws IOException {
+ if (updateStats) {
+ statistics.incrementReadOps(1);
+ }
+ checkAccess();
+ List<BlockLocation> result = new ArrayList<BlockLocation>();
+ MockFile file = findFile(stat.getPath());
+ if (file != null) {
+ for(MockBlock block: file.blocks) {
+ if (getOverlap(block.offset, block.length, start, len) > 0) {
+ String[] topology = new String[block.hosts.length];
+ for(int i=0; i < topology.length; ++i) {
+ topology[i] = "/rack/ " + block.hosts[i];
+ }
+ result.add(new BlockLocation(block.hosts, block.hosts,
+ topology, block.offset, block.length));
+ }
+ }
+ return result.toArray(new BlockLocation[result.size()]);
+ }
+ return new BlockLocation[0];
+ }
+
+
+ /**
+ * Compute the number of bytes that overlap between the two ranges.
+ * @param offset1 start of range1
+ * @param length1 length of range1
+ * @param offset2 start of range2
+ * @param length2 length of range2
+ * @return the number of bytes in the overlap range
+ */
+ private static long getOverlap(long offset1, long length1, long offset2, long length2) {
+ // c/p from OrcInputFormat
+ long end1 = offset1 + length1;
+ long end2 = offset2 + length2;
+ if (end2 <= offset1 || end1 <= offset2) {
+ return 0;
+ } else {
+ return Math.min(end1, end2) - Math.max(offset1, offset2);
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder buffer = new StringBuilder();
+ buffer.append("mockFs{files:[");
+ for(int i=0; i < files.size(); ++i) {
+ if (i != 0) {
+ buffer.append(", ");
+ }
+ buffer.append(files.get(i));
+ }
+ buffer.append("]}");
+ return buffer.toString();
+ }
+
+ public static void addGlobalFile(MockFile mockFile) {
+ globalFiles.add(mockFile);
+ }
+
+ public static void clearGlobalFiles() {
+ globalFiles.clear();
+ }
+
+
+ public static class MockBlock {
+ int offset;
+ int length;
+ final String[] hosts;
+
+ public MockBlock(String... hosts) {
+ this.hosts = hosts;
+ }
+
+ public void setOffset(int offset) {
+ this.offset = offset;
+ }
+
+ public void setLength(int length) {
+ this.length = length;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder buffer = new StringBuilder();
+ buffer.append("block{offset: ");
+ buffer.append(offset);
+ buffer.append(", length: ");
+ buffer.append(length);
+ buffer.append(", hosts: [");
+ for(int i=0; i < hosts.length; i++) {
+ if (i != 0) {
+ buffer.append(", ");
+ }
+ buffer.append(hosts[i]);
+ }
+ buffer.append("]}");
+ return buffer.toString();
+ }
+ }
+
+ public static class MockFile {
+ public final Path path;
+ public int blockSize;
+ public int length;
+ public MockBlock[] blocks;
+ public byte[] content;
+ public boolean cannotDelete = false;
+ // This is purely for testing convenience; has no bearing on FS operations such as list.
+ public boolean isDeleted = false;
+
+ public MockFile(String path, int blockSize, byte[] content,
+ MockBlock... blocks) {
+ this.path = new Path(path);
+ this.blockSize = blockSize;
+ this.blocks = blocks;
+ this.content = content;
+ this.length = content.length;
+ int offset = 0;
+ for(MockBlock block: blocks) {
+ block.offset = offset;
+ block.length = Math.min(length - offset, blockSize);
+ offset += block.length;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return path.hashCode() + 31 * length;
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (!(obj instanceof MockFile)) { return false; }
+ return ((MockFile) obj).path.equals(this.path) && ((MockFile) obj).length == this.length;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder buffer = new StringBuilder();
+ buffer.append("mockFile{path: ");
+ buffer.append(path.toString());
+ buffer.append(", blkSize: ");
+ buffer.append(blockSize);
+ buffer.append(", len: ");
+ buffer.append(length);
+ buffer.append(", blocks: [");
+ for(int i=0; i < blocks.length; i++) {
+ if (i != 0) {
+ buffer.append(", ");
+ }
+ buffer.append(blocks[i]);
+ }
+ buffer.append("]}");
+ return buffer.toString();
+ }
+ }
+
+ static class MockInputStream extends FSInputStream {
+ final MockFile file;
+ int offset = 0;
+
+ public MockInputStream(MockFile file) throws IOException {
+ this.file = file;
+ }
+
+ @Override
+ public void seek(long offset) throws IOException {
+ this.offset = (int) offset;
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return offset;
+ }
+
+ @Override
+ public boolean seekToNewSource(long l) throws IOException {
+ return false;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (offset < file.length) {
+ return file.content[offset++] & 0xff;
+ }
+ return -1;
+ }
+ }
+
+ public static class MockPath extends Path {
+ private final FileSystem fs;
+ public MockPath(FileSystem fs, String path) {
+ super(path);
+ this.fs = fs;
+ }
+ @Override
+ public FileSystem getFileSystem(Configuration conf) {
+ return fs;
+ }
+ }
+
+ public static class MockOutputStream extends FSDataOutputStream {
+ public final MockFile file;
+
+ public MockOutputStream(MockFile file) throws IOException {
+ super(new DataOutputBuffer(), null);
+ this.file = file;
+ }
+
+ /**
+ * Set the blocks and their location for the file.
+ * Must be called after the stream is closed or the block length will be
+ * wrong.
+ * @param blocks the list of blocks
+ */
+ public void setBlocks(MockBlock... blocks) {
+ file.blocks = blocks;
+ int offset = 0;
+ int i = 0;
+ while (offset < file.length && i < blocks.length) {
+ blocks[i].offset = offset;
+ blocks[i].length = Math.min(file.length - offset, file.blockSize);
+ offset += blocks[i].length;
+ i += 1;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ DataOutputBuffer buf = (DataOutputBuffer) getWrappedStream();
+ file.length = buf.getLength();
+ file.content = new byte[file.length];
+ MockBlock block = new MockBlock("host1");
+ block.setLength(file.length);
+ setBlocks(block);
+ System.arraycopy(buf.getData(), 0, file.content, 0, file.length);
+ }
+
+ @Override
+ public String toString() {
+ return "Out stream to " + file.toString();
+ }
+ }
+
+ public void addFile(MockFile file) {
+ files.add(file);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/scripts/upgrade/derby/037-HIVE-14637.derby.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/derby/037-HIVE-14637.derby.sql b/metastore/scripts/upgrade/derby/037-HIVE-14637.derby.sql
index 88a48f0..cb6e5f6 100644
--- a/metastore/scripts/upgrade/derby/037-HIVE-14637.derby.sql
+++ b/metastore/scripts/upgrade/derby/037-HIVE-14637.derby.sql
@@ -1,6 +1,6 @@
ALTER TABLE "TBLS" ADD "MM_WATERMARK_WRITE_ID" BIGINT DEFAULT -1;
ALTER TABLE "TBLS" ADD "MM_NEXT_WRITE_ID" BIGINT DEFAULT 0;
-CREATE TABLE "APP"."TBL_WRITES" ("TW_ID" BIGINT NOT NULL, "TBL_ID" BIGINT NOT NULL, "WRITE_ID" BIGINT NOT NULL, "STATE" CHAR(1) NOT NULL, "LAST_HEARTBEAT" BIGINT);
+CREATE TABLE "APP"."TBL_WRITES" ("TW_ID" BIGINT NOT NULL, "TBL_ID" BIGINT NOT NULL, "WRITE_ID" BIGINT NOT NULL, "STATE" CHAR(1) NOT NULL, "CREATED" BIGINT NOT NULL, "LAST_HEARTBEAT" BIGINT NOT NULL);
ALTER TABLE "APP"."TBL_WRITES" ADD CONSTRAINT "TBL_WRITES_PK" PRIMARY KEY ("TW_ID");
ALTER TABLE "APP"."TBL_WRITES" ADD CONSTRAINT "TBL_WRITES_FK1" FOREIGN KEY ("TBL_ID") REFERENCES "APP"."TBLS" ("TBL_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
CREATE UNIQUE INDEX "APP"."UNIQUEWRITE" ON "APP"."TBL_WRITES" ("TBL_ID", "WRITE_ID");
http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/scripts/upgrade/derby/hive-schema-2.2.0.derby.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/derby/hive-schema-2.2.0.derby.sql b/metastore/scripts/upgrade/derby/hive-schema-2.2.0.derby.sql
index f86ee4a..9da1703 100644
--- a/metastore/scripts/upgrade/derby/hive-schema-2.2.0.derby.sql
+++ b/metastore/scripts/upgrade/derby/hive-schema-2.2.0.derby.sql
@@ -112,7 +112,7 @@ ALTER TABLE "APP"."KEY_CONSTRAINTS" ADD CONSTRAINT "CONSTRAINTS_PK" PRIMARY KEY
CREATE INDEX "APP"."CONSTRAINTS_PARENT_TBL_ID_INDEX" ON "APP"."KEY_CONSTRAINTS"("PARENT_TBL_ID");
-CREATE TABLE "APP"."TBL_WRITES" ("TW_ID" BIGINT NOT NULL, "TBL_ID" BIGINT NOT NULL, "WRITE_ID" BIGINT NOT NULL, "STATE" CHAR(1) NOT NULL, "LAST_HEARTBEAT" BIGINT);
+CREATE TABLE "APP"."TBL_WRITES" ("TW_ID" BIGINT NOT NULL, "TBL_ID" BIGINT NOT NULL, "WRITE_ID" BIGINT NOT NULL, "STATE" CHAR(1) NOT NULL, "CREATED" BIGINT NOT NULL, "LAST_HEARTBEAT" BIGINT NOT NULL);
ALTER TABLE "APP"."TBL_WRITES" ADD CONSTRAINT "TBL_WRITES_PK" PRIMARY KEY ("TW_ID");
http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/scripts/upgrade/mssql/022-HIVE-14637.mssql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mssql/022-HIVE-14637.mssql.sql b/metastore/scripts/upgrade/mssql/022-HIVE-14637.mssql.sql
index 5d6f99f..9666d2b 100644
--- a/metastore/scripts/upgrade/mssql/022-HIVE-14637.mssql.sql
+++ b/metastore/scripts/upgrade/mssql/022-HIVE-14637.mssql.sql
@@ -7,7 +7,8 @@ CREATE TABLE TBL_WRITES
TBL_ID BIGINT NOT NULL,
WRITE_ID BIGINT NOT NULL,
STATE CHAR(1) NOT NULL,
- LAST_HEARTBEAT BIGINT
+ CREATED BIGINT NOT NULL,
+ LAST_HEARTBEAT BIGINT NOT NULL
);
ALTER TABLE TBL_WRITES ADD CONSTRAINT TBL_WRITES_PK PRIMARY KEY (TW_ID);
ALTER TABLE TBL_WRITES ADD CONSTRAINT TBL_WRITES_FK1 FOREIGN KEY (TBL_ID) REFERENCES TBLS (TBL_ID) ;
http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/scripts/upgrade/mssql/hive-schema-2.2.0.mssql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mssql/hive-schema-2.2.0.mssql.sql b/metastore/scripts/upgrade/mssql/hive-schema-2.2.0.mssql.sql
index 26b2ab3..31016e2 100644
--- a/metastore/scripts/upgrade/mssql/hive-schema-2.2.0.mssql.sql
+++ b/metastore/scripts/upgrade/mssql/hive-schema-2.2.0.mssql.sql
@@ -600,7 +600,8 @@ CREATE TABLE TBL_WRITES
TBL_ID BIGINT NOT NULL,
WRITE_ID BIGINT NOT NULL,
STATE CHAR(1) NOT NULL,
- LAST_HEARTBEAT BIGINT
+ CREATED BIGINT NOT NULL,
+ LAST_HEARTBEAT BIGINT NOT NULL
);
ALTER TABLE TBL_WRITES ADD CONSTRAINT TBL_WRITES_PK PRIMARY KEY (TW_ID);
http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/scripts/upgrade/mysql/037-HIVE-14637.mysql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mysql/037-HIVE-14637.mysql.sql b/metastore/scripts/upgrade/mysql/037-HIVE-14637.mysql.sql
index c024584..9e34db2 100644
--- a/metastore/scripts/upgrade/mysql/037-HIVE-14637.mysql.sql
+++ b/metastore/scripts/upgrade/mysql/037-HIVE-14637.mysql.sql
@@ -7,7 +7,8 @@ CREATE TABLE IF NOT EXISTS `TBL_WRITES`
`TBL_ID` BIGINT NOT NULL,
`WRITE_ID` BIGINT NOT NULL,
`STATE` CHAR(1) NOT NULL,
- `LAST_HEARTBEAT` BIGINT,
+ `CREATED` BIGINT NOT NULL,
+ `LAST_HEARTBEAT` BIGINT NOT NULL,
PRIMARY KEY (`TW_ID`),
UNIQUE KEY `UNIQUEWRITE` (`TBL_ID`,`WRITE_ID`),
CONSTRAINT `TBL_WRITES_FK1` FOREIGN KEY (`TBL_ID`) REFERENCES `TBLS` (`TBL_ID`)
http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/scripts/upgrade/mysql/hive-schema-2.2.0.mysql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mysql/hive-schema-2.2.0.mysql.sql b/metastore/scripts/upgrade/mysql/hive-schema-2.2.0.mysql.sql
index b295950..3e73008 100644
--- a/metastore/scripts/upgrade/mysql/hive-schema-2.2.0.mysql.sql
+++ b/metastore/scripts/upgrade/mysql/hive-schema-2.2.0.mysql.sql
@@ -835,7 +835,8 @@ CREATE TABLE IF NOT EXISTS `TBL_WRITES`
`TBL_ID` BIGINT NOT NULL,
`WRITE_ID` BIGINT NOT NULL,
`STATE` CHAR(1) NOT NULL,
- `LAST_HEARTBEAT` BIGINT,
+ `CREATED` BIGINT NOT NULL,
+ `LAST_HEARTBEAT` BIGINT NOT NULL,
PRIMARY KEY (`TW_ID`),
UNIQUE KEY `UNIQUEWRITE` (`TBL_ID`,`WRITE_ID`),
CONSTRAINT `TBL_WRITES_FK1` FOREIGN KEY (`TBL_ID`) REFERENCES `TBLS` (`TBL_ID`)
http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/scripts/upgrade/oracle/037-HIVE-14637.oracle.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/oracle/037-HIVE-14637.oracle.sql b/metastore/scripts/upgrade/oracle/037-HIVE-14637.oracle.sql
index 9f6dbb2..218eefe 100644
--- a/metastore/scripts/upgrade/oracle/037-HIVE-14637.oracle.sql
+++ b/metastore/scripts/upgrade/oracle/037-HIVE-14637.oracle.sql
@@ -7,6 +7,7 @@ CREATE TABLE TBL_WRITES
TBL_ID NUMBER NOT NULL,
WRITE_ID NUMBER NOT NULL,
STATE CHAR(1) NOT NULL,
+ CREATED NUMBER NOT NULL,
LAST_HEARTBEAT NUMBER NOT NULL
);
ALTER TABLE TBL_WRITES ADD CONSTRAINT TBL_WRITES_PK PRIMARY KEY (TW_ID);
http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/scripts/upgrade/oracle/hive-schema-2.2.0.oracle.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/oracle/hive-schema-2.2.0.oracle.sql b/metastore/scripts/upgrade/oracle/hive-schema-2.2.0.oracle.sql
index 6972c20..5479712 100644
--- a/metastore/scripts/upgrade/oracle/hive-schema-2.2.0.oracle.sql
+++ b/metastore/scripts/upgrade/oracle/hive-schema-2.2.0.oracle.sql
@@ -805,6 +805,7 @@ CREATE TABLE TBL_WRITES
TBL_ID NUMBER NOT NULL,
WRITE_ID NUMBER NOT NULL,
STATE CHAR(1) NOT NULL,
+ CREATED NUMBER NOT NULL,
LAST_HEARTBEAT NUMBER NOT NULL
);
http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/scripts/upgrade/postgres/036-HIVE-14637.postgres.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/postgres/036-HIVE-14637.postgres.sql b/metastore/scripts/upgrade/postgres/036-HIVE-14637.postgres.sql
index f153837..310f51e 100644
--- a/metastore/scripts/upgrade/postgres/036-HIVE-14637.postgres.sql
+++ b/metastore/scripts/upgrade/postgres/036-HIVE-14637.postgres.sql
@@ -8,6 +8,7 @@ CREATE TABLE "TBL_WRITES"
"TBL_ID" BIGINT NOT NULL,
"WRITE_ID" BIGINT NOT NULL,
"STATE" CHAR(1) NOT NULL,
+ "CREATED" BIGINT NOT NULL,
"LAST_HEARTBEAT" BIGINT NOT NULL
);
ALTER TABLE ONLY "TBL_WRITES" ADD CONSTRAINT "TBL_WRITES_PK" PRIMARY KEY ("TW_ID");
http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/scripts/upgrade/postgres/hive-schema-2.2.0.postgres.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/postgres/hive-schema-2.2.0.postgres.sql b/metastore/scripts/upgrade/postgres/hive-schema-2.2.0.postgres.sql
index de997d3..bc865ed 100644
--- a/metastore/scripts/upgrade/postgres/hive-schema-2.2.0.postgres.sql
+++ b/metastore/scripts/upgrade/postgres/hive-schema-2.2.0.postgres.sql
@@ -614,6 +614,7 @@ CREATE TABLE "TBL_WRITES"
"TBL_ID" BIGINT NOT NULL,
"WRITE_ID" BIGINT NOT NULL,
"STATE" CHAR(1) NOT NULL,
+ "CREATED" BIGINT NOT NULL,
"LAST_HEARTBEAT" BIGINT NOT NULL
);
http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index aa6d1eb..128e06a 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -6460,7 +6460,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
Thread.sleep(random.nextInt(deadlockRetryBackoffMs));
}
- // Do a separate txn after we have reserved the number. TODO: If we fail, ignore on read.
+ // Do a separate txn after we have reserved the number.
boolean ok = false;
ms.openTransaction();
try {
@@ -6525,11 +6525,18 @@ public class HiveMetaStore extends ThriftHiveMetastore {
startFunction("heartbeat_write_id", " : db="
+ dbName + " tbl=" + tblName + " writeId=" + writeId);
Exception ex = null;
+ boolean wasAborted = false;
try {
boolean ok = false;
ms.openTransaction();
try {
MTableWrite tw = getActiveTableWrite(ms, dbName, tblName, writeId);
+ long absTimeout = HiveConf.getTimeVar(getConf(),
+ ConfVars.HIVE_METASTORE_MM_ABSOLUTE_TIMEOUT, TimeUnit.MILLISECONDS);
+ if (tw.getCreated() + absTimeout < System.currentTimeMillis()) {
+ tw.setState(String.valueOf(MM_WRITE_ABORTED));
+ wasAborted = true;
+ }
tw.setLastHeartbeat(System.currentTimeMillis());
ms.updateTableWrite(tw);
ok = true;
@@ -6542,6 +6549,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
} finally {
endFunction("heartbeat_write_id", ex == null, ex, tblName);
}
+ if (wasAborted) throw new MetaException("The write was aborted due to absolute timeout");
return new HeartbeatWriteIdResult();
}
@@ -6576,10 +6584,10 @@ public class HiveMetaStore extends ThriftHiveMetastore {
long watermarkId = tbl.isSetMmWatermarkWriteId() ? tbl.getMmWatermarkWriteId() : -1;
if (nextId > (watermarkId + 1)) {
// There may be some intermediate failed or active writes; get the valid ones.
- List<Long> ids = ms.getWriteIds(
+ List<Long> ids = ms.getTableWriteIds(
dbName, tblName, watermarkId, nextId, MM_WRITE_COMMITTED);
// TODO: we could optimize here and send the smaller of the lists, and also use ranges
- if (ids != null) {
+ if (!ids.isEmpty()) {
Iterator<Long> iter = ids.iterator();
long oldWatermarkId = watermarkId;
while (iter.hasNext()) {
@@ -7057,6 +7065,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
startCompactorInitiator(conf);
startCompactorWorkers(conf);
startCompactorCleaner(conf);
+ startMmHousekeepingThread(conf);
startHouseKeeperService(conf);
} catch (Throwable e) {
LOG.error("Failure when starting the compactor, compactions may not happen, " +
@@ -7096,6 +7105,16 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
}
+ private static void startMmHousekeepingThread(HiveConf conf) throws Exception {
+ long intervalMs = HiveConf.getTimeVar(conf,
+ ConfVars.HIVE_METASTORE_MM_THREAD_SCAN_INTERVAL, TimeUnit.MILLISECONDS);
+ if (intervalMs > 0) {
+ MetaStoreThread thread = new MmCleanerThread(intervalMs);
+ initializeAndStartThread(thread, conf);
+ }
+ }
+
+
private static MetaStoreThread instantiateThread(String classname) throws Exception {
Class<?> c = Class.forName(classname);
Object o = c.newInstance();
@@ -7118,6 +7137,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
thread.init(new AtomicBoolean(), new AtomicBoolean());
thread.start();
}
+
private static void startHouseKeeperService(HiveConf conf) throws Exception {
if(!HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_INITIATOR_ON)) {
return;
http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java
index a0c8d3b..d4d94ff 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java
@@ -51,6 +51,7 @@ public interface MetaStoreThread {
* thread should then assure that the loop has been gone completely through at
* least once.
*/
+ // TODO: move these test parameters to more specific places... there's no need to have them here
void init(AtomicBoolean stop, AtomicBoolean looped) throws MetaException;
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
index 41385f7..c2ce259 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
@@ -1884,4 +1884,14 @@ public class MetaStoreUtils {
}
csNew.setStatsObj(list);
}
+
+ public static boolean isMmTable(Table table) {
+ return isMmTable(table.getParameters());
+ }
+
+ public static boolean isMmTable(Map<String, String> params) {
+ // TODO: perhaps it should be a 3rd value for 'transactional'?
+ String value = params.get(hive_metastoreConstants.TABLE_IS_MM);
+ return value != null && value.equalsIgnoreCase("true");
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/src/java/org/apache/hadoop/hive/metastore/MmCleanerThread.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MmCleanerThread.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MmCleanerThread.java
new file mode 100644
index 0000000..6a7f588
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MmCleanerThread.java
@@ -0,0 +1,397 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidWriteIds;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.RawStore.FullTableName;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.model.MTableWrite;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Supplier;
+
+public class MmCleanerThread extends Thread implements MetaStoreThread {
+ private final static Logger LOG = LoggerFactory.getLogger(MmCleanerThread.class);
+ private HiveConf conf;
+ private int threadId;
+ private AtomicBoolean stop;
+ private long intervalMs;
+ private long heartbeatTimeoutMs, absTimeoutMs, abortedGraceMs;
+ /** Time override for tests. Only used for MM timestamp logic, not for the thread timing. */
+ private Supplier<Long> timeOverride = null;
+
+ public MmCleanerThread(long intervalMs) {
+ this.intervalMs = intervalMs;
+ }
+
+ @VisibleForTesting
+ void overrideTime(Supplier<Long> timeOverride) {
+ this.timeOverride = timeOverride;
+ }
+
+ private long getTimeMs() {
+ return timeOverride == null ? System.currentTimeMillis() : timeOverride.get();
+ }
+
+ @Override
+ public void setHiveConf(HiveConf conf) {
+ this.conf = conf;
+ heartbeatTimeoutMs = HiveConf.getTimeVar(
+ conf, ConfVars.HIVE_METASTORE_MM_HEARTBEAT_TIMEOUT, TimeUnit.MILLISECONDS);
+ absTimeoutMs = HiveConf.getTimeVar(
+ conf, ConfVars.HIVE_METASTORE_MM_ABSOLUTE_TIMEOUT, TimeUnit.MILLISECONDS);
+ abortedGraceMs = HiveConf.getTimeVar(
+ conf, ConfVars.HIVE_METASTORE_MM_ABORTED_GRACE_PERIOD, TimeUnit.MILLISECONDS);
+ if (heartbeatTimeoutMs > absTimeoutMs) {
+ throw new RuntimeException("Heartbeat timeout " + heartbeatTimeoutMs
+ + " cannot be larger than the absolute timeout " + absTimeoutMs);
+ }
+ }
+
+ @Override
+ public void setThreadId(int threadId) {
+ this.threadId = threadId;
+ }
+
+ @Override
+ public void init(AtomicBoolean stop, AtomicBoolean looped) throws MetaException {
+ this.stop = stop;
+ setPriority(MIN_PRIORITY);
+ setDaemon(true);
+ }
+
+ @Override
+ public void run() {
+ // Only get RS here, when we are already on the thread.
+ RawStore rs = getRs();
+ while (true) {
+ if (checkStop()) return;
+ long endTimeNs = System.nanoTime() + intervalMs * 1000000L;
+
+ runOneIteration(rs);
+
+ if (checkStop()) return;
+ long waitTimeMs = (endTimeNs - System.nanoTime()) / 1000000L;
+ if (waitTimeMs <= 0) continue;
+ try {
+ Thread.sleep(waitTimeMs);
+ } catch (InterruptedException e) {
+ LOG.error("Thread was interrupted and will now exit");
+ return;
+ }
+ }
+ }
+
+ private RawStore getRs() {
+ try {
+ return RawStoreProxy.getProxy(conf, conf,
+ conf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL), threadId);
+ } catch (MetaException e) {
+ LOG.error("Failed to get RawStore; the thread will now die", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private boolean checkStop() {
+ if (!stop.get()) return false;
+ LOG.info("Stopping due to an external request");
+ return true;
+ }
+
+ @VisibleForTesting
+ void runOneIteration(RawStore rs) {
+ // We only get the names here; we want to get and process each table in a separate DB txn.
+ List<FullTableName> mmTables = null;
+ try {
+ mmTables = rs.getAllMmTablesForCleanup();
+ } catch (MetaException e) {
+ LOG.error("Failed to get tables", e);
+ return;
+ }
+ for (FullTableName tableName : mmTables) {
+ try {
+ processOneTable(tableName, rs);
+ } catch (MetaException e) {
+ LOG.error("Failed to process " + tableName, e);
+ }
+ }
+ }
+
+ private void processOneTable(FullTableName table, RawStore rs) throws MetaException {
+ // 1. Time out writes that have been running for a while.
+ // a) Heartbeat timeouts (not enabled right now as heartbeat is not implemented).
+ // b) Absolute timeouts.
+ // c) Gaps that have the next ID and the derived absolute timeout. This is a small special
+ // case that can happen if we increment next ID but fail to insert the write ID record,
+ // which we do in separate txns to avoid making the conflict-prone increment txn longer.
+ LOG.info("Processing table " + table);
+ Table t = rs.getTable(table.dbName, table.tblName);
+ HashSet<Long> removeWriteIds = new HashSet<>(), cleanupOnlyWriteIds = new HashSet<>();
+ getWritesThatReadyForCleanUp(t, table, rs, removeWriteIds, cleanupOnlyWriteIds);
+
+ // 2. Delete the aborted writes' files from the FS.
+ deleteAbortedWriteIdFiles(table, rs, t, removeWriteIds);
+ deleteAbortedWriteIdFiles(table, rs, t, cleanupOnlyWriteIds);
+ // removeWriteIds-s now only contains the writes that were fully cleaned up after.
+
+ // 3. Advance the watermark.
+ advanceWatermark(table, rs, removeWriteIds);
+ }
+
+ private void getWritesThatReadyForCleanUp(Table t, FullTableName table, RawStore rs,
+ HashSet<Long> removeWriteIds, HashSet<Long> cleanupOnlyWriteIds) throws MetaException {
+ // We will generally ignore errors here. First, we expect some conflicts; second, we will get
+ // the final view of things after we do (or try, at any rate) all the updates.
+ long watermarkId = t.isSetMmWatermarkWriteId() ? t.getMmWatermarkWriteId() : -1,
+ nextWriteId = t.isSetMmNextWriteId() ? t.getMmNextWriteId() : 0;
+ long now = getTimeMs(), earliestOkHeartbeatMs = now - heartbeatTimeoutMs,
+ earliestOkCreateMs = now - absTimeoutMs, latestAbortedMs = now - abortedGraceMs;
+
+ List<MTableWrite> writes = rs.getTableWrites(
+ table.dbName, table.tblName, watermarkId, nextWriteId);
+ ListIterator<MTableWrite> iter = writes.listIterator(writes.size());
+ long expectedId = -1, nextCreated = -1;
+ // We will go in reverse order and add aborted writes for the gaps that have a following
+ // write ID that would imply that the previous one (created earlier) would have already
+ // expired, had it been open and not updated.
+ while (iter.hasPrevious()) {
+ MTableWrite write = iter.previous();
+ addTimedOutMissingWriteIds(rs, table.dbName, table.tblName, write.getWriteId(),
+ nextCreated, expectedId, earliestOkHeartbeatMs, cleanupOnlyWriteIds, now);
+ expectedId = write.getWriteId() - 1;
+ nextCreated = write.getCreated();
+ char state = write.getState().charAt(0);
+ if (state == HiveMetaStore.MM_WRITE_ABORTED) {
+ if (write.getLastHeartbeat() < latestAbortedMs) {
+ removeWriteIds.add(write.getWriteId());
+ } else {
+ cleanupOnlyWriteIds.add(write.getWriteId());
+ }
+ } else if (state == HiveMetaStore.MM_WRITE_OPEN && write.getCreated() < earliestOkCreateMs) {
+ // TODO: also check for heartbeat here.
+ if (expireTimedOutWriteId(rs, table.dbName, table.tblName, write.getWriteId(),
+ now, earliestOkCreateMs, earliestOkHeartbeatMs, cleanupOnlyWriteIds)) {
+ cleanupOnlyWriteIds.add(write.getWriteId());
+ }
+ }
+ }
+ addTimedOutMissingWriteIds(rs, table.dbName, table.tblName, watermarkId,
+ nextCreated, expectedId, earliestOkHeartbeatMs, cleanupOnlyWriteIds, now);
+ }
+
+ private void advanceWatermark(
+ FullTableName table, RawStore rs, HashSet<Long> cleanedUpWriteIds) {
+ if (!rs.openTransaction()) {
+ LOG.error("Cannot open transaction");
+ return;
+ }
+ boolean success = false;
+ try {
+ Table t = rs.getTable(table.dbName, table.tblName);
+ if (t == null) {
+ return;
+ }
+ long watermarkId = t.getMmWatermarkWriteId();
+ List<Long> writeIds = rs.getTableWriteIds(table.dbName, table.tblName, watermarkId,
+ t.getMmNextWriteId(), HiveMetaStore.MM_WRITE_COMMITTED);
+ long expectedId = watermarkId + 1;
+ boolean hasGap = false;
+ Iterator<Long> idIter = writeIds.iterator();
+ while (idIter.hasNext()) {
+ long next = idIter.next();
+ if (next < expectedId) continue;
+ while (next > expectedId) {
+ if (!cleanedUpWriteIds.contains(expectedId)) {
+ hasGap = true;
+ break;
+ }
+ ++expectedId;
+ }
+ if (hasGap) break;
+ ++expectedId;
+ }
+ // Make sure we also advance over the trailing aborted ones.
+ if (!hasGap) {
+ while (cleanedUpWriteIds.contains(expectedId)) {
+ ++expectedId;
+ }
+ }
+ long newWatermarkId = expectedId - 1;
+ if (newWatermarkId > watermarkId) {
+ t.setMmWatermarkWriteId(newWatermarkId);
+ rs.alterTable(table.dbName, table.tblName, t);
+ rs.deleteTableWrites(table.dbName, table.tblName, -1, expectedId);
+ }
+ success = true;
+ } catch (Exception ex) {
+ // TODO: should we try a couple times on conflicts? Aborted writes cannot be unaborted.
+ LOG.error("Failed to advance watermark", ex);
+ rs.rollbackTransaction();
+ }
+ if (success) {
+ tryCommit(rs);
+ }
+ }
+
+ private void deleteAbortedWriteIdFiles(
+ FullTableName table, RawStore rs, Table t, HashSet<Long> cleanUpWriteIds) {
+ if (cleanUpWriteIds.isEmpty()) return;
+ if (t.getPartitionKeysSize() > 0) {
+ for (String location : rs.getAllPartitionLocations(table.dbName, table.tblName)) {
+ deleteAbortedWriteIdFiles(location, cleanUpWriteIds);
+ }
+ } else {
+ deleteAbortedWriteIdFiles(t.getSd().getLocation(), cleanUpWriteIds);
+ }
+ }
+
+ private void deleteAbortedWriteIdFiles(String location, HashSet<Long> abortedWriteIds) {
+ LOG.info("Looking for " + abortedWriteIds.size() + " aborted write output in " + location);
+ Path path = new Path(location);
+ FileSystem fs;
+ FileStatus[] files;
+ try {
+ fs = path.getFileSystem(conf);
+ if (!fs.exists(path)) {
+ LOG.warn(path + " does not exist; assuming that the cleanup is not needed.");
+ return;
+ }
+ // TODO# do we need to account for any subdirectories here? decide after special-case jiras
+ files = fs.listStatus(path);
+ } catch (Exception ex) {
+ LOG.error("Failed to get files for " + path + "; cannot ensure cleanup for any writes");
+ abortedWriteIds.clear();
+ return;
+ }
+ for (FileStatus file : files) {
+ Path childPath = file.getPath();
+ if (!file.isDirectory()) {
+ LOG.warn("Skipping a non-directory file " + childPath);
+ continue;
+ }
+ Long writeId = ValidWriteIds.extractWriteId(childPath);
+ if (writeId == null) {
+ LOG.warn("Skipping an unknown directory " + childPath);
+ continue;
+ }
+ if (!abortedWriteIds.contains(writeId.longValue())) continue;
+ try {
+ if (!fs.delete(childPath, true)) throw new IOException("delete returned false");
+ } catch (Exception ex) {
+ LOG.error("Couldn't delete " + childPath + "; not cleaning up " + writeId, ex);
+ abortedWriteIds.remove(writeId.longValue());
+ }
+ }
+ }
+
+ private boolean expireTimedOutWriteId(RawStore rs, String dbName,
+ String tblName, long writeId, long now, long earliestOkCreatedMs,
+ long earliestOkHeartbeatMs, HashSet<Long> cleanupOnlyWriteIds) {
+ if (!rs.openTransaction()) {
+ return false;
+ }
+ try {
+ MTableWrite tw = rs.getTableWrite(dbName, tblName, writeId);
+ if (tw == null) {
+ // The write have been updated since the time when we thought it has expired.
+ tryCommit(rs);
+ return true;
+ }
+ char state = tw.getState().charAt(0);
+ if (state != HiveMetaStore.MM_WRITE_OPEN
+ || (tw.getCreated() > earliestOkCreatedMs
+ && tw.getLastHeartbeat() > earliestOkHeartbeatMs)) {
+ tryCommit(rs);
+ return true; // The write has been updated since the time when we thought it has expired.
+ }
+ tw.setState(String.valueOf(HiveMetaStore.MM_WRITE_ABORTED));
+ tw.setLastHeartbeat(now);
+ rs.updateTableWrite(tw);
+ } catch (Exception ex) {
+ LOG.error("Failed to update an expired table write", ex);
+ rs.rollbackTransaction();
+ return false;
+ }
+ boolean result = tryCommit(rs);
+ if (result) {
+ cleanupOnlyWriteIds.add(writeId);
+ }
+ return result;
+ }
+
+ private boolean tryCommit(RawStore rs) {
+ try {
+ return rs.commitTransaction();
+ } catch (Exception ex) {
+ LOG.error("Failed to commit transaction", ex);
+ return false;
+ }
+ }
+
+ private boolean addTimedOutMissingWriteIds(RawStore rs, String dbName, String tblName,
+ long foundPrevId, long nextCreated, long expectedId, long earliestOkHeartbeatMs,
+ HashSet<Long> cleanupOnlyWriteIds, long now) throws MetaException {
+ // Assume all missing ones are created at the same time as the next present write ID.
+ // We also assume missing writes never had any heartbeats.
+ if (nextCreated >= earliestOkHeartbeatMs || expectedId < 0) return true;
+ Table t = null;
+ List<Long> localCleanupOnlyWriteIds = new ArrayList<>();
+ while (foundPrevId < expectedId) {
+ if (t == null && !rs.openTransaction()) {
+ LOG.error("Cannot open transaction; skipping");
+ return false;
+ }
+ try {
+ if (t == null) {
+ t = rs.getTable(dbName, tblName);
+ }
+ // We don't need to double check if the write exists; the unique index will cause an error.
+ rs.createTableWrite(t, expectedId, HiveMetaStore.MM_WRITE_ABORTED, now);
+ } catch (Exception ex) {
+ // TODO: don't log conflict exceptions?.. although we barely ever expect them.
+ LOG.error("Failed to create a missing table write", ex);
+ rs.rollbackTransaction();
+ return false;
+ }
+ localCleanupOnlyWriteIds.add(expectedId);
+ --expectedId;
+ }
+ boolean result = (t == null || tryCommit(rs));
+ if (result) {
+ cleanupOnlyWriteIds.addAll(localCleanupOnlyWriteIds);
+ }
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index fb3b1ad..32e4daf 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -711,7 +711,7 @@ public class ObjectStore implements RawStore, Configurable {
}
public Database getDatabaseInternal(String name) throws MetaException, NoSuchObjectException {
- return new GetDbHelper(name, null, true, true) {
+ return new GetDbHelper(name, true, true) {
@Override
protected Database getSqlResult(GetHelper<Database> ctx) throws MetaException {
return directSql.getDatabase(dbName);
@@ -1183,14 +1183,7 @@ public class ObjectStore implements RawStore, Configurable {
pm.retrieveAll(result);
success = true;
} finally {
- if (success) {
- commitTransaction();
- } else {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ closeTransaction(success, query);
}
return result;
}
@@ -2951,15 +2944,13 @@ public class ObjectStore implements RawStore, Configurable {
public abstract class GetDbHelper extends GetHelper<Database> {
/**
* GetHelper for returning db info using directSql/JDO.
- * Since this is a db-level call, tblName is ignored, and null is passed irrespective of what is passed in.
* @param dbName The Database Name
- * @param tblName Placeholder param to match signature, always ignored.
* @param allowSql Whether or not we allow DirectSQL to perform this query.
* @param allowJdo Whether or not we allow ORM to perform this query.
* @throws MetaException
*/
public GetDbHelper(
- String dbName, String tblName, boolean allowSql, boolean allowJdo) throws MetaException {
+ String dbName,boolean allowSql, boolean allowJdo) throws MetaException {
super(dbName,null,allowSql,allowJdo);
}
@@ -8713,7 +8704,7 @@ public class ObjectStore implements RawStore, Configurable {
openTransaction();
try {
MTable mtbl = getMTable(tbl.getDbName(), tbl.getTableName());
- MTableWrite tw = new MTableWrite(mtbl, writeId, String.valueOf(state), heartbeat);
+ MTableWrite tw = new MTableWrite(mtbl, writeId, String.valueOf(state), heartbeat, heartbeat);
pm.makePersistent(tw);
success = true;
} finally {
@@ -8746,8 +8737,8 @@ public class ObjectStore implements RawStore, Configurable {
String dbName, String tblName, long writeId) throws MetaException {
boolean success = false;
Query query = null;
+ openTransaction();
try {
- openTransaction();
query = pm.newQuery(MTableWrite.class,
"table.tableName == t1 && table.database.name == t2 && writeId == t3");
query.declareParameters("java.lang.String t1, java.lang.String t2, java.lang.Long t3");
@@ -8762,45 +8753,129 @@ public class ObjectStore implements RawStore, Configurable {
}
return writes.get(0);
} finally {
- if (success) {
- commitTransaction();
- } else {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ closeTransaction(success, query);
}
}
@Override
- public List<Long> getWriteIds(String dbName, String tblName,
+ public List<Long> getTableWriteIds(String dbName, String tblName,
long watermarkId, long nextWriteId, char state) throws MetaException {
boolean success = false;
Query query = null;
+ openTransaction();
try {
- openTransaction();
+ boolean hasState = (state != '\0');
query = pm.newQuery("select writeId from org.apache.hadoop.hive.metastore.model.MTableWrite"
- + " where table.tableName == t1 && table.database.name == t2 && writeId >= t3"
- + " && writeId < t4 && state == t5");
+ + " where table.tableName == t1 && table.database.name == t2 && writeId > t3"
+ + " && writeId < t4" + (hasState ? " && state == t5" : ""));
query.declareParameters("java.lang.String t1, java.lang.String t2, java.lang.Long t3, "
- + "java.lang.Long t4, java.lang.String t5");
+ + "java.lang.Long t4" + (hasState ? ", java.lang.String t5" : ""));
query.setResult("writeId");
query.setOrdering("writeId asc");
@SuppressWarnings("unchecked")
- List<Long> writes = (List<Long>) query.executeWithArray(
- tblName, dbName, watermarkId, nextWriteId, String.valueOf(state));
+ List<Long> writes = (List<Long>) (hasState
+ ? query.executeWithArray(tblName, dbName, watermarkId, nextWriteId, String.valueOf(state))
+ : query.executeWithArray(tblName, dbName, watermarkId, nextWriteId));
+ success = true;
+ return (writes == null) ? new ArrayList<Long>() : new ArrayList<>(writes);
+ } finally {
+ closeTransaction(success, query);
+ }
+ }
+
+ @Override
+ public List<MTableWrite> getTableWrites(
+ String dbName, String tblName, long from, long to) throws MetaException {
+ boolean success = false;
+ Query query = null;
+ openTransaction();
+ try {
+ query = pm.newQuery(MTableWrite.class,
+ "table.tableName == t1 && table.database.name == t2 && writeId > t3 && writeId < t4");
+ query.declareParameters(
+ "java.lang.String t1, java.lang.String t2, java.lang.Long t3, java.lang.Long t4");
+ query.setOrdering("writeId asc");
+ @SuppressWarnings("unchecked")
+ List<MTableWrite> writes =
+ (List<MTableWrite>) query.executeWithArray(tblName, dbName, from, to);
success = true;
return (writes == null || writes.isEmpty()) ? null : new ArrayList<>(writes);
} finally {
- if (success) {
- commitTransaction();
- } else {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
+ closeTransaction(success, query);
+ }
+ }
+
+
+ @Override
+ public void deleteTableWrites(
+ String dbName, String tblName, long from, long to) throws MetaException {
+ boolean success = false;
+ Query query = null;
+ openTransaction();
+ try {
+ query = pm.newQuery(MTableWrite.class,
+ "table.tableName == t1 && table.database.name == t2 && writeId > t3 && writeId < t4");
+ query.declareParameters(
+ "java.lang.String t1, java.lang.String t2, java.lang.Long t3, java.lang.Long t4");
+ query.deletePersistentAll(tblName, dbName, from, to);
+ success = true;
+ } finally {
+ closeTransaction(success, query);
+ }
+ }
+
+ @Override
+ public List<FullTableName > getAllMmTablesForCleanup() throws MetaException {
+ boolean success = false;
+ Query query = null;
+ openTransaction();
+ try {
+ // If the table had no MM writes, there's nothing to clean up
+ query = pm.newQuery(MTable.class, "mmNextWriteId > 0");
+ @SuppressWarnings("unchecked")
+ List<MTable> tables = (List<MTable>) query.execute();
+ pm.retrieveAll(tables);
+ ArrayList<FullTableName> result = new ArrayList<>(tables.size());
+ for (MTable table : tables) {
+ if (MetaStoreUtils.isMmTable(table.getParameters())) {
+ result.add(new FullTableName(table.getDatabase().getName(), table.getTableName()));
+ }
}
+ success = true;
+ return result;
+ } finally {
+ closeTransaction(success, query);
+ }
+ }
+
+ @Override
+ public Collection<String> getAllPartitionLocations(String dbName, String tblName) {
+ boolean success = false;
+ Query query = null;
+ openTransaction();
+ try {
+ String q = "select sd.location from org.apache.hadoop.hive.metastore.model.MPartition"
+ + " where table.tableName == t1 && table.database.name == t2";
+ query = pm.newQuery();
+ query.declareParameters("java.lang.String t1, java.lang.String t2");
+ @SuppressWarnings("unchecked")
+ List<String> tables = (List<String>) query.execute();
+ pm.retrieveAll(tables);
+ success = true;
+ return new ArrayList<>(tables);
+ } finally {
+ closeTransaction(success, query);
+ }
+ }
+
+ private void closeTransaction(boolean success, Query query) {
+ if (success) {
+ commitTransaction();
+ } else {
+ rollbackTransaction();
+ }
+ if (query != null) {
+ query.closeAll();
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
index 170c07d..76ead25 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
@@ -23,6 +23,7 @@ import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.nio.ByteBuffer;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -697,5 +698,28 @@ public interface RawStore extends Configurable {
void createTableWrite(Table tbl, long writeId, char state, long heartbeat);
- List<Long> getWriteIds(String dbName, String tblName, long watermarkId, long nextWriteId, char state) throws MetaException;
+ List<Long> getTableWriteIds(String dbName, String tblName, long watermarkId, long nextWriteId, char state) throws MetaException;
+
+
+ public static final class FullTableName {
+ public final String dbName, tblName;
+
+ public FullTableName(String dbName, String tblName) {
+ this.dbName = dbName;
+ this.tblName = tblName;
+ }
+
+ @Override
+ public String toString() {
+ return dbName + "." + tblName;
+ }
+ }
+
+ List<FullTableName> getAllMmTablesForCleanup() throws MetaException;
+
+ public List<MTableWrite> getTableWrites(String dbName, String tblName, long from, long to) throws MetaException;
+
+ Collection<String> getAllPartitionLocations(String dbName, String tblName);
+
+ void deleteTableWrites(String dbName, String tblName, long from, long to) throws MetaException;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
index 829f0ae..ddc5a62 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
@@ -2759,9 +2759,36 @@ public class HBaseStore implements RawStore {
@Override
- public List<Long> getWriteIds(
+ public List<Long> getTableWriteIds(
String dbName, String tblName, long watermarkId, long nextWriteId, char state) {
// TODO: Auto-generated method stub
throw new UnsupportedOperationException();
}
+
+ @Override
+ public List<FullTableName> getAllMmTablesForCleanup() throws MetaException {
+ // TODO: Auto-generated method stub
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<MTableWrite> getTableWrites(String dbName, String tblName,
+ long from, long to) throws MetaException {
+ // TODO: Auto-generated method stub
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Collection<String> getAllPartitionLocations(String dbName,
+ String tblName) {
+ // TODO: Auto-generated method stub
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void deleteTableWrites(String dbName, String tblName, long from,
+ long to) throws MetaException {
+ // TODO: Auto-generated method stub
+ throw new UnsupportedOperationException();
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/src/model/org/apache/hadoop/hive/metastore/model/MTableWrite.java
----------------------------------------------------------------------
diff --git a/metastore/src/model/org/apache/hadoop/hive/metastore/model/MTableWrite.java b/metastore/src/model/org/apache/hadoop/hive/metastore/model/MTableWrite.java
index a7e5f3e..b7f398a 100644
--- a/metastore/src/model/org/apache/hadoop/hive/metastore/model/MTableWrite.java
+++ b/metastore/src/model/org/apache/hadoop/hive/metastore/model/MTableWrite.java
@@ -23,14 +23,16 @@ public class MTableWrite {
private long writeId;
private String state;
private long lastHeartbeat;
+ private long created;
public MTableWrite() {}
- public MTableWrite(MTable table, long writeId, String state, long lastHeartbeat) {
+ public MTableWrite(MTable table, long writeId, String state, long lastHeartbeat, long created) {
this.table = table;
this.writeId = writeId;
this.state = state;
this.lastHeartbeat = lastHeartbeat;
+ this.created = created;
}
public MTable getTable() {
@@ -49,6 +51,10 @@ public class MTableWrite {
return lastHeartbeat;
}
+ public long getCreated() {
+ return created;
+ }
+
public void setTable(MTable table) {
this.table = table;
}
@@ -64,4 +70,8 @@ public class MTableWrite {
public void setLastHeartbeat(long lastHeartbeat) {
this.lastHeartbeat = lastHeartbeat;
}
+
+ public void setCreated(long created) {
+ this.created = created;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/src/model/package.jdo
----------------------------------------------------------------------
diff --git a/metastore/src/model/package.jdo b/metastore/src/model/package.jdo
index bd71056..ce101dd 100644
--- a/metastore/src/model/package.jdo
+++ b/metastore/src/model/package.jdo
@@ -1082,6 +1082,9 @@
<field name="state">
<column name="STATE" length="1" jdbc-type="CHAR" allows-null="false"/>
</field>
+ <field name="created">
+ <column name="CREATED" jdbc-type="BIGINT" allows-null="false"/>
+ </field>
<field name="lastHeartbeat">
<column name="LAST_HEARTBEAT" jdbc-type="BIGINT" allows-null="false"/>
</field>
http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
index 98c543f..acbbf4e 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.metastore;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -886,8 +887,30 @@ public class DummyRawStoreControlledCommit implements RawStore, Configurable {
}
@Override
- public List<Long> getWriteIds(
+ public List<Long> getTableWriteIds(
String dbName, String tblName, long watermarkId, long nextWriteId, char state) {
return null;
}
+
+ @Override
+ public List<FullTableName> getAllMmTablesForCleanup() throws MetaException {
+ return null;
+ }
+
+ @Override
+ public List<MTableWrite> getTableWrites(String dbName, String tblName,
+ long from, long to) throws MetaException {
+ return null;
+ }
+
+ @Override
+ public Collection<String> getAllPartitionLocations(String dbName,
+ String tblName) {
+ return null;
+ }
+
+ @Override
+ public void deleteTableWrites(String dbName, String tblName, long from,
+ long to) throws MetaException {
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
index 8e54b16..787c1f0 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.metastore;
import java.nio.ByteBuffer;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -898,10 +899,32 @@ public class DummyRawStoreForJdoConnection implements RawStore {
}
@Override
- public List<Long> getWriteIds(
+ public List<Long> getTableWriteIds(
String dbName, String tblName, long watermarkId, long nextWriteId, char state) {
return null;
}
+
+ @Override
+ public List<FullTableName> getAllMmTablesForCleanup() throws MetaException {
+ return null;
+ }
+
+ @Override
+ public List<MTableWrite> getTableWrites(String dbName, String tblName,
+ long from, long to) throws MetaException {
+ return null;
+ }
+
+ @Override
+ public Collection<String> getAllPartitionLocations(String dbName,
+ String tblName) {
+ return null;
+ }
+
+ @Override
+ public void deleteTableWrites(String dbName, String tblName, long from,
+ long to) throws MetaException {
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java b/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java
index 0497159..a8d3495 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java
@@ -17,16 +17,20 @@
*/
package org.apache.hadoop.hive.metastore;
+import static org.junit.Assert.*;
+
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
import org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics;
import org.apache.hadoop.hive.common.metrics.metrics2.MetricsReporting;
import org.apache.hadoop.hive.common.metrics.MetricsTestUtils;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.FileMetadataExprType;
@@ -42,9 +46,13 @@ import org.apache.hadoop.hive.metastore.api.Role;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.metastore.model.MTableWrite;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hive.common.util.MockFileSystem;
+import org.apache.hive.common.util.MockFileSystem.MockFile;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -53,6 +61,8 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Supplier;
+
public class TestObjectStore {
private ObjectStore objectStore = null;
@@ -67,6 +77,15 @@ public class TestObjectStore {
private static final String ROLE2 = "testobjectstorerole2";
private static final Logger LOG = LoggerFactory.getLogger(TestObjectStore.class.getName());
+ private static final class LongSupplier implements Supplier<Long> {
+ public long value = 0;
+
+ @Override
+ public Long get() {
+ return value;
+ }
+ }
+
public static class MockPartitionExpressionProxy implements PartitionExpressionProxy {
@Override
public String convertExprToFilter(byte[] expr) throws MetaException {
@@ -142,7 +161,7 @@ public class TestObjectStore {
public void testTableOps() throws MetaException, InvalidObjectException, NoSuchObjectException, InvalidInputException {
Database db1 = new Database(DB1, "description", "locationurl", null);
objectStore.createDatabase(db1);
- StorageDescriptor sd = new StorageDescriptor(null, "location", null, null, false, 0, new SerDeInfo("SerDeName", "serializationLib", null), null, null, null);
+ StorageDescriptor sd = createFakeSd("location");
HashMap<String,String> params = new HashMap<String,String>();
params.put("EXTERNAL", "false");
Table tbl1 = new Table(TABLE1, DB1, "owner", 1, 2, 3, sd, null, params, "viewOriginalText", "viewExpandedText", "MANAGED_TABLE");
@@ -164,6 +183,156 @@ public class TestObjectStore {
objectStore.dropDatabase(DB1);
}
+
+
+ /**
+ * Test table operations
+ */
+ @Test
+ public void testMmCleaner() throws Exception {
+ HiveConf conf = new HiveConf();
+ conf.set(ConfVars.HIVE_METASTORE_MM_HEARTBEAT_TIMEOUT.varname, "3ms");
+ conf.set(ConfVars.HIVE_METASTORE_MM_ABSOLUTE_TIMEOUT.varname, "20ms");
+ conf.set(ConfVars.HIVE_METASTORE_MM_ABORTED_GRACE_PERIOD.varname, "5ms");
+ conf.set("fs.mock.impl", MockFileSystem.class.getName());
+
+ MockFileSystem mfs = (MockFileSystem)(new Path("mock:///").getFileSystem(conf));
+ mfs.clear();
+ mfs.allowDelete = true;
+ // Don't add the files just yet...
+ MockFile[] files = new MockFile[9];
+ for (int i = 0; i < files.length; ++i) {
+ files[i] = new MockFile("mock:/foo/mm_" + i + "/1", 0, new byte[0]);
+ }
+
+ LongSupplier time = new LongSupplier();
+
+ MmCleanerThread mct = new MmCleanerThread(0);
+ mct.setHiveConf(conf);
+ mct.overrideTime(time);
+
+ Database db1 = new Database(DB1, "description", "locationurl", null);
+ objectStore.createDatabase(db1);
+ StorageDescriptor sd = createFakeSd("mock:/foo");
+ HashMap<String,String> params = new HashMap<String,String>();
+ params.put("EXTERNAL", "false");
+ params.put(hive_metastoreConstants.TABLE_IS_MM, "true");
+ Table tbl = new Table(TABLE1, DB1, "owner", 1, 2, 3, sd,
+ null, params, null, null, "MANAGED_TABLE");
+ objectStore.createTable(tbl);
+
+ // Add write #0 so the watermark wouldn't advance; skip write #1, add #2 at 0, skip #3
+ createCompleteTableWrite(mfs, files, 0, time, tbl, HiveMetaStore.MM_WRITE_OPEN);
+ mfs.addFile(files[1]);
+ createCompleteTableWrite(mfs, files, 2, time, tbl, HiveMetaStore.MM_WRITE_OPEN);
+ mfs.addFile(files[3]);
+ tbl.setMmNextWriteId(4);
+ objectStore.alterTable(DB1, TABLE1, tbl);
+
+ mct.runOneIteration(objectStore);
+ List<Long> writes = getAbortedWrites();
+ assertEquals(0, writes.size()); // Missing write is not aborted before timeout.
+ time.value = 4; // Advance time.
+ mct.runOneIteration(objectStore);
+ writes = getAbortedWrites();
+ assertEquals(1, writes.size()); // Missing write is aborted after timeout.
+ assertEquals(1L, writes.get(0).longValue());
+ checkDeletedSet(files, 1);
+ // However, write #3 was not aborted as we cannot determine when it will time out.
+ createCompleteTableWrite(mfs, files, 4, time, tbl, HiveMetaStore.MM_WRITE_OPEN);
+ time.value = 8;
+ // It will now be aborted, since we have a following write.
+ mct.runOneIteration(objectStore);
+ writes = getAbortedWrites();
+ assertEquals(2, writes.size());
+ assertTrue(writes.contains(Long.valueOf(3)));
+ checkDeletedSet(files, 1, 3);
+
+ // Commit #0 and #2 and confirm that the watermark advances.
+ // It will only advance over #1, since #3 was aborted at 8 and grace period has not passed.
+ time.value = 10;
+ MTableWrite tw = objectStore.getTableWrite(DB1, TABLE1, 0);
+ tw.setState(String.valueOf(HiveMetaStore.MM_WRITE_COMMITTED));
+ objectStore.updateTableWrite(tw);
+ tw = objectStore.getTableWrite(DB1, TABLE1, 2);
+ tw.setState(String.valueOf(HiveMetaStore.MM_WRITE_COMMITTED));
+ objectStore.updateTableWrite(tw);
+ mct.runOneIteration(objectStore);
+ writes = getAbortedWrites();
+ assertEquals(1, writes.size());
+ assertEquals(3L, writes.get(0).longValue());
+ tbl = objectStore.getTable(DB1, TABLE1);
+ assertEquals(2L, tbl.getMmWatermarkWriteId());
+
+ // Now advance the time and see that watermark also advances over #3.
+ time.value = 16;
+ mct.runOneIteration(objectStore);
+ writes = getAbortedWrites();
+ assertEquals(0, writes.size());
+ tbl = objectStore.getTable(DB1, TABLE1);
+ assertEquals(3L, tbl.getMmWatermarkWriteId());
+
+ // Check that the open write gets aborted after some time; then the watermark advances.
+ time.value = 25;
+ mct.runOneIteration(objectStore);
+ writes = getAbortedWrites();
+ assertEquals(1, writes.size());
+ assertEquals(4L, writes.get(0).longValue());
+ time.value = 31;
+ mct.runOneIteration(objectStore);
+ tbl = objectStore.getTable(DB1, TABLE1);
+ assertEquals(4L, tbl.getMmWatermarkWriteId());
+ checkDeletedSet(files, 1, 3, 4); // The other two should still be deleted.
+
+ // Finally check that we cannot advance watermark if cleanup fails for some file.
+ createCompleteTableWrite(mfs, files, 5, time, tbl, HiveMetaStore.MM_WRITE_ABORTED);
+ createCompleteTableWrite(mfs, files, 6, time, tbl, HiveMetaStore.MM_WRITE_ABORTED);
+ createCompleteTableWrite(mfs, files, 7, time, tbl, HiveMetaStore.MM_WRITE_COMMITTED);
+ createCompleteTableWrite(mfs, files, 8, time, tbl, HiveMetaStore.MM_WRITE_ABORTED);
+ time.value = 37; // Skip the grace period.
+ files[6].cannotDelete = true;
+ mct.runOneIteration(objectStore);
+ checkDeletedSet(files, 1, 3, 4, 5, 8); // The other two should still be deleted.
+ tbl = objectStore.getTable(DB1, TABLE1);
+ assertEquals(5L, tbl.getMmWatermarkWriteId()); // Watermark only goes up to 5.
+ files[6].cannotDelete = false;
+ mct.runOneIteration(objectStore);
+ checkDeletedSet(files, 1, 3, 4, 5, 6, 8);
+ tbl = objectStore.getTable(DB1, TABLE1);
+ assertEquals(8L, tbl.getMmWatermarkWriteId()); // Now it advances all the way.
+
+ objectStore.dropTable(DB1, TABLE1);
+ objectStore.dropDatabase(DB1);
+ }
+
+ private void createCompleteTableWrite(MockFileSystem mfs, MockFile[] files,
+ int id, LongSupplier time, Table tbl, char state) throws MetaException, InvalidObjectException {
+ objectStore.createTableWrite(tbl, id, state, time.value);
+ mfs.addFile(files[id]);
+ tbl.setMmNextWriteId(id + 1);
+ objectStore.alterTable(DB1, TABLE1, tbl);
+ }
+
+ private void checkDeletedSet(MockFile[] files, int... deleted) {
+ for (int id : deleted) {
+ assertTrue("File " + id + " not deleted", files[id].isDeleted);
+ }
+ int count = 0;
+ for (MockFile file : files) {
+ if (file.isDeleted) ++count;
+ }
+ assertEquals(deleted.length, count); // Make sure nothing else is deleted.
+ }
+
+ private List<Long> getAbortedWrites() throws MetaException {
+ return objectStore.getTableWriteIds(DB1, TABLE1, -1, 10, HiveMetaStore.MM_WRITE_ABORTED);
+ }
+
+ private StorageDescriptor createFakeSd(String location) {
+ return new StorageDescriptor(null, location, null, null, false, 0,
+ new SerDeInfo("SerDeName", "serializationLib", null), null, null, null);
+ }
+
/**
* Tests partition operations
@@ -172,7 +341,7 @@ public class TestObjectStore {
public void testPartitionOps() throws MetaException, InvalidObjectException, NoSuchObjectException, InvalidInputException {
Database db1 = new Database(DB1, "description", "locationurl", null);
objectStore.createDatabase(db1);
- StorageDescriptor sd = new StorageDescriptor(null, "location", null, null, false, 0, new SerDeInfo("SerDeName", "serializationLib", null), null, null, null);
+ StorageDescriptor sd = createFakeSd("location");
HashMap<String,String> tableParams = new HashMap<String,String>();
tableParams.put("EXTERNAL", "false");
FieldSchema partitionKey1 = new FieldSchema("Country", serdeConstants.STRING_TYPE_NAME, "");
@@ -265,7 +434,7 @@ public class TestObjectStore {
MetricsFactory.init(conf);
CodahaleMetrics metrics = (CodahaleMetrics) MetricsFactory.getInstance();
- objectStore.new GetDbHelper("foo", null, true, true) {
+ objectStore.new GetDbHelper("foo", true, true) {
@Override
protected Database getSqlResult(ObjectStore.GetHelper<Database> ctx) throws MetaException {
return null;
@@ -282,7 +451,7 @@ public class TestObjectStore {
MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.COUNTER,
MetricsConstant.DIRECTSQL_ERRORS, "");
- objectStore.new GetDbHelper("foo", null, true, true) {
+ objectStore.new GetDbHelper("foo", true, true) {
@Override
protected Database getSqlResult(ObjectStore.GetHelper<Database> ctx) throws MetaException {
throw new RuntimeException();