You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/05 17:31:51 UTC
[04/51] [partial] hive git commit: HIVE-14671 : merge master into
hive-14535 (Wei Zheng)
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
index 0b1ac4b..b018adb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
@@ -37,6 +37,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import com.google.common.annotations.VisibleForTesting;
+
import org.apache.hadoop.hive.common.StringInternUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,6 +46,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.Driver.DriverState;
+import org.apache.hadoop.hive.ql.Driver.LockedDriverState;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.log.PerfLogger;
@@ -363,8 +366,12 @@ public class CombineHiveInputFormat<K extends WritableComparable, V extends Writ
Map<CombinePathInputFormat, CombineFilter> poolMap =
new HashMap<CombinePathInputFormat, CombineFilter>();
Set<Path> poolSet = new HashSet<Path>();
+ LockedDriverState lDrvStat = LockedDriverState.getLockedDriverState();
for (Path path : paths) {
+ if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT)
+ throw new IOException("Operation is Canceled. ");
+
PartitionDesc part = HiveFileFormatUtils.getFromPathRecursively(
pathToPartitionInfo, path, IOPrepareCache.get().allocatePartitionDescMap());
TableDesc tableDesc = part.getTableDesc();
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
index cc77e4c..f73a8e3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
@@ -87,7 +87,7 @@ public final class HiveFileFormatUtils {
public static class FileChecker {
// we don't have many file formats that implement InputFormatChecker. We won't be holding
// multiple instances of such classes
- private static int MAX_CACHE_SIZE = 16;
+ private static final int MAX_CACHE_SIZE = 16;
// immutable maps
Map<Class<? extends InputFormat>, Class<? extends InputFormatChecker>> inputFormatCheckerMap;
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index c697407..9b83cb4 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -89,14 +89,13 @@ import org.apache.hive.common.util.ReflectionUtil;
*/
public class HiveInputFormat<K extends WritableComparable, V extends Writable>
implements InputFormat<K, V>, JobConfigurable {
-
private static final String CLASS_NAME = HiveInputFormat.class.getName();
private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
/**
* A cache of InputFormat instances.
*/
- private static Map<Class, InputFormat<WritableComparable, Writable>> inputFormats
+ private static final Map<Class, InputFormat<WritableComparable, Writable>> inputFormats
= new ConcurrentHashMap<Class, InputFormat<WritableComparable, Writable>>();
private JobConf job;
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java b/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
index d391164..f41edc4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
@@ -839,7 +839,7 @@ public class RCFile {
// the max size of memory for buffering records before writes them out
private int columnsBufferSize = 4 * 1024 * 1024; // 4M
// the conf string for COLUMNS_BUFFER_SIZE
- public static String COLUMNS_BUFFER_SIZE_CONF_STR = "hive.io.rcfile.record.buffer.size";
+ public static final String COLUMNS_BUFFER_SIZE_CONF_STR = "hive.io.rcfile.record.buffer.size";
// how many records already buffered
private int bufferedRecords = 0;
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
index 96ca736..cbd38ed 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.orc.FileMetadata;
import org.apache.orc.PhysicalWriter;
-import org.apache.orc.impl.MemoryManager;
+import org.apache.orc.MemoryManager;
import org.apache.orc.TypeDescription;
import org.apache.orc.impl.OrcTail;
@@ -258,7 +258,7 @@ public final class OrcFile extends org.apache.orc.OrcFile {
/**
* A package local option to set the memory manager.
*/
- protected WriterOptions memory(MemoryManager value) {
+ public WriterOptions memory(MemoryManager value) {
super.memory(value);
return this;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 59682db..8fb7211 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -158,7 +158,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
private static final Logger LOG = LoggerFactory.getLogger(OrcInputFormat.class);
- private static boolean isDebugEnabled = LOG.isDebugEnabled();
+ private static final boolean isDebugEnabled = LOG.isDebugEnabled();
static final HadoopShims SHIMS = ShimLoader.getHadoopShims();
private static final long DEFAULT_MIN_SPLIT_SIZE = 16 * 1024 * 1024;
@@ -1531,7 +1531,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
Reader.Options readerOptions = new Reader.Options(context.conf);
if (readerTypes == null) {
readerIncluded = genIncludedColumns(fileSchema, context.conf);
- evolution = new SchemaEvolution(fileSchema, readerOptions.include(readerIncluded));
+ evolution = new SchemaEvolution(fileSchema, null, readerOptions.include(readerIncluded));
} else {
// The reader schema always comes in without ACID columns.
TypeDescription readerSchema = OrcUtils.convertTypeFromProtobuf(readerTypes, 0);
@@ -1913,10 +1913,6 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
}
- // The schema type description does not include the ACID fields (i.e. it is the
- // non-ACID original schema).
- private static boolean SCHEMA_TYPES_IS_ORIGINAL = true;
-
@Override
public RowReader<OrcStruct> getReader(InputSplit inputSplit,
Options options)
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
index 0ac3ec5..5b2e9b5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
@@ -18,9 +18,12 @@
package org.apache.hadoop.hive.ql.io.orc.encoded;
import java.io.IOException;
+import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.IdentityHashMap;
import java.util.List;
import org.slf4j.Logger;
@@ -43,11 +46,13 @@ import org.apache.orc.impl.RecordReaderUtils;
import org.apache.orc.impl.StreamName;
import org.apache.orc.StripeInformation;
import org.apache.orc.impl.BufferChunk;
-import org.apache.hadoop.hive.llap.DebugUtils;
import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch;
import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.PoolFactory;
import org.apache.orc.OrcProto;
+import sun.misc.Cleaner;
+
+
/**
* Encoded reader implementation.
*
@@ -80,6 +85,17 @@ import org.apache.orc.OrcProto;
*/
class EncodedReaderImpl implements EncodedReader {
public static final Logger LOG = LoggerFactory.getLogger(EncodedReaderImpl.class);
+ private static Field cleanerField;
+ static {
+ try {
+ // TODO: To make it work for JDK9 use CleanerUtil from https://issues.apache.org/jira/browse/HADOOP-12760
+ final Class<?> dbClazz = Class.forName("java.nio.DirectByteBuffer");
+ cleanerField = dbClazz.getDeclaredField("cleaner");
+ cleanerField.setAccessible(true);
+ } catch (Throwable t) {
+ cleanerField = null;
+ }
+ }
private static final Object POOLS_CREATION_LOCK = new Object();
private static Pools POOLS;
private static class Pools {
@@ -204,8 +220,8 @@ class EncodedReaderImpl implements EncodedReader {
@Override
public void readEncodedColumns(int stripeIx, StripeInformation stripe,
- OrcProto.RowIndex[] indexes, List<OrcProto.ColumnEncoding> encodings, List<OrcProto.Stream> streamList,
- boolean[] included, boolean[][] colRgs,
+ OrcProto.RowIndex[] indexes, List<OrcProto.ColumnEncoding> encodings,
+ List<OrcProto.Stream> streamList, boolean[] included, boolean[][] colRgs,
Consumer<OrcEncodedColumnBatch> consumer) throws IOException {
// Note: for now we don't have to setError here, caller will setError if we throw.
// We are also not supposed to call setDone, since we are only part of the operation.
@@ -303,15 +319,35 @@ class EncodedReaderImpl implements EncodedReader {
}
}
+ // TODO: the memory release could be optimized - we could release original buffers after we
+ // are fully done with each original buffer from disk. For now release all at the end;
+ // it doesn't increase the total amount of memory we hold, just the duration a bit.
+ // This is much simpler - we can just remember original ranges after reading them, and
+ // release them at the end. In a few cases where it's easy to determine that a buffer
+ // can be freed in advance, we remove it from the map.
+ IdentityHashMap<ByteBuffer, Boolean> toRelease = null;
if (!isAllInCache.value) {
if (!isDataReaderOpen) {
this.dataReader.open();
isDataReaderOpen = true;
}
dataReader.readFileData(toRead.next, stripeOffset, cacheWrapper.getAllocator().isDirectAlloc());
+ toRelease = new IdentityHashMap<>();
+ DiskRangeList drl = toRead.next;
+ while (drl != null) {
+ if (drl instanceof BufferChunk) {
+ toRelease.put(drl.getData(), true);
+ }
+ drl = drl.next;
+ }
}
// 3. For uncompressed case, we need some special processing before read.
+ // Basically, we are trying to create artificial, consistent ranges to cache, as there are
+ // no CBs in an uncompressed file. At the end of this processing, the list would contain
+ // either cache buffers, or buffers allocated by us and not cached (if we are only reading
+ // parts of the data for some ranges and don't want to cache it). Both are represented by
+ // CacheChunks, so the list is just CacheChunk-s from that point on.
DiskRangeList iter = toRead.next; // Keep "toRead" list for future use, don't extract().
if (codec == null) {
for (int colIx = 0; colIx < colCtxs.length; ++colIx) {
@@ -326,6 +362,12 @@ class EncodedReaderImpl implements EncodedReader {
}
}
}
+ // Release buffers as we are done with all the streams... also see toRelease comment.\
+ // With uncompressed streams, we know we are done earlier.
+ if (toRelease != null) {
+ releaseBuffers(toRelease.keySet(), true);
+ toRelease = null;
+ }
if (isTracingEnabled) {
LOG.trace("Disk ranges after pre-read (file " + fileKey + ", base offset "
+ stripeOffset + "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next));
@@ -374,8 +416,8 @@ class EncodedReaderImpl implements EncodedReader {
if (sctx.stripeLevelStream == null) {
sctx.stripeLevelStream = POOLS.csdPool.take();
// We will be using this for each RG while also sending RGs to processing.
- // To avoid buffers being unlocked, run refcount one ahead; we will not increase
- // it when building the last RG, so each RG processing will decref once, and the
+ // To avoid buffers being unlocked, run refcount one ahead; so each RG
+ // processing will decref once, and the
// last one will unlock the buffers.
sctx.stripeLevelStream.incRef();
// For stripe-level streams we don't need the extra refcount on the block.
@@ -383,14 +425,12 @@ class EncodedReaderImpl implements EncodedReader {
long unlockUntilCOffset = sctx.offset + sctx.length;
DiskRangeList lastCached = readEncodedStream(stripeOffset, iter,
sctx.offset, sctx.offset + sctx.length, sctx.stripeLevelStream,
- unlockUntilCOffset, sctx.offset);
+ unlockUntilCOffset, sctx.offset, toRelease);
if (lastCached != null) {
iter = lastCached;
}
}
- if (!isLastRg) {
- sctx.stripeLevelStream.incRef();
- }
+ sctx.stripeLevelStream.incRef();
cb = sctx.stripeLevelStream;
} else {
// This stream can be separated by RG using index. Let's do that.
@@ -411,7 +451,7 @@ class EncodedReaderImpl implements EncodedReader {
boolean isStartOfStream = sctx.bufferIter == null;
DiskRangeList lastCached = readEncodedStream(stripeOffset,
(isStartOfStream ? iter : sctx.bufferIter), cOffset, endCOffset, cb,
- unlockUntilCOffset, sctx.offset);
+ unlockUntilCOffset, sctx.offset, toRelease);
if (lastCached != null) {
sctx.bufferIter = iter = lastCached;
}
@@ -437,7 +477,27 @@ class EncodedReaderImpl implements EncodedReader {
}
// Release the unreleased buffers. See class comment about refcounts.
+ for (int colIx = 0; colIx < colCtxs.length; ++colIx) {
+ ColumnReadContext ctx = colCtxs[colIx];
+ if (ctx == null) continue; // This column is not included.
+ for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) {
+ StreamContext sctx = ctx.streams[streamIx];
+ if (sctx == null || sctx.stripeLevelStream == null) continue;
+ if (0 != sctx.stripeLevelStream.decRef()) continue;
+ for (MemoryBuffer buf : sctx.stripeLevelStream.getCacheBuffers()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Unlocking {} at the end of processing", buf);
+ }
+ cacheWrapper.releaseBuffer(buf);
+ }
+ }
+ }
+
releaseInitialRefcounts(toRead.next);
+ // Release buffers as we are done with all the streams... also see toRelease comment.
+ if (toRelease != null) {
+ releaseBuffers(toRelease.keySet(), true);
+ }
releaseCacheChunksIntoObjectPool(toRead.next);
}
@@ -605,8 +665,8 @@ class EncodedReaderImpl implements EncodedReader {
* the master list, so they are safe to keep as iterators for various streams.
*/
public DiskRangeList readEncodedStream(long baseOffset, DiskRangeList start, long cOffset,
- long endCOffset, ColumnStreamData csd, long unlockUntilCOffset, long streamOffset)
- throws IOException {
+ long endCOffset, ColumnStreamData csd, long unlockUntilCOffset, long streamOffset,
+ IdentityHashMap<ByteBuffer, Boolean> toRelease) throws IOException {
if (csd.getCacheBuffers() == null) {
csd.setCacheBuffers(new ArrayList<MemoryBuffer>());
} else {
@@ -615,10 +675,10 @@ class EncodedReaderImpl implements EncodedReader {
if (cOffset == endCOffset) return null;
boolean isCompressed = codec != null;
List<ProcCacheChunk> toDecompress = null;
- List<ByteBuffer> toRelease = null;
List<IncompleteCb> badEstimates = null;
+ List<ByteBuffer> toReleaseCopies = null;
if (isCompressed) {
- toRelease = !dataReader.isTrackingDiskRanges() ? null : new ArrayList<ByteBuffer>();
+ toReleaseCopies = new ArrayList<>();
toDecompress = new ArrayList<>();
badEstimates = new ArrayList<>();
}
@@ -636,8 +696,8 @@ class EncodedReaderImpl implements EncodedReader {
// 2. Go thru the blocks; add stuff to results and prepare the decompression work (see below).
try {
lastUncompressed = isCompressed ?
- prepareRangesForCompressedRead(cOffset, endCOffset, streamOffset,
- unlockUntilCOffset, current, csd, toRelease, toDecompress, badEstimates)
+ prepareRangesForCompressedRead(cOffset, endCOffset, streamOffset, unlockUntilCOffset,
+ current, csd, toRelease, toReleaseCopies, toDecompress, badEstimates)
: prepareRangesForUncompressedRead(
cOffset, endCOffset, streamOffset, unlockUntilCOffset, current, csd);
} catch (Exception ex) {
@@ -657,7 +717,10 @@ class EncodedReaderImpl implements EncodedReader {
assert result == null; // We don't expect conflicts from bad estimates.
}
- if (toDecompress == null || toDecompress.isEmpty()) return lastUncompressed; // Nothing to do.
+ if (toDecompress == null || toDecompress.isEmpty()) {
+ releaseBuffers(toReleaseCopies, false);
+ return lastUncompressed; // Nothing to do.
+ }
// 3. Allocate the buffers, prepare cache keys.
// At this point, we have read all the CBs we need to read. cacheBuffers contains some cache
@@ -690,21 +753,18 @@ class EncodedReaderImpl implements EncodedReader {
cacheWrapper.reuseBuffer(chunk.getBuffer());
}
- // 5. Release original compressed buffers to zero-copy reader if needed.
- if (toRelease != null) {
- assert dataReader.isTrackingDiskRanges();
- for (ByteBuffer buffer : toRelease) {
- dataReader.releaseBuffer(buffer);
- }
- }
+ // 5. Release the copies we made directly to the cleaner.
+ releaseBuffers(toReleaseCopies, false);
// 6. Finally, put uncompressed data to cache.
if (fileKey != null) {
- long[] collisionMask = cacheWrapper.putFileData(fileKey, cacheKeys, targetBuffers, baseOffset);
+ long[] collisionMask = cacheWrapper.putFileData(
+ fileKey, cacheKeys, targetBuffers, baseOffset);
processCacheCollisions(collisionMask, toDecompress, targetBuffers, csd.getCacheBuffers());
}
- // 7. It may happen that we know we won't use some compression buffers anymore.
+ // 7. It may happen that we know we won't use some cache buffers anymore (the alternative
+ // is that we will use the same buffers for other streams in separate calls).
// Release initial refcounts.
for (ProcCacheChunk chunk : toDecompress) {
ponderReleaseInitialRefcount(unlockUntilCOffset, streamOffset, chunk);
@@ -713,9 +773,11 @@ class EncodedReaderImpl implements EncodedReader {
return lastUncompressed;
}
+ /** Subset of readEncodedStream specific to compressed streams, separate to avoid long methods. */
private CacheChunk prepareRangesForCompressedRead(long cOffset, long endCOffset,
- long streamOffset, long unlockUntilCOffset, DiskRangeList current, ColumnStreamData columnStreamData,
- List<ByteBuffer> toRelease, List<ProcCacheChunk> toDecompress,
+ long streamOffset, long unlockUntilCOffset, DiskRangeList current,
+ ColumnStreamData columnStreamData, IdentityHashMap<ByteBuffer, Boolean> toRelease,
+ List<ByteBuffer> toReleaseCopies, List<ProcCacheChunk> toDecompress,
List<IncompleteCb> badEstimates) throws IOException {
if (cOffset > current.getOffset()) {
// Target compression block is in the middle of the range; slice the range in two.
@@ -762,8 +824,8 @@ class EncodedReaderImpl implements EncodedReader {
throw new RuntimeException(msg);
}
BufferChunk bc = (BufferChunk)current;
- ProcCacheChunk newCached = addOneCompressionBuffer(
- bc, columnStreamData.getCacheBuffers(), toDecompress, toRelease, badEstimates);
+ ProcCacheChunk newCached = addOneCompressionBuffer(bc, columnStreamData.getCacheBuffers(),
+ toDecompress, toRelease, toReleaseCopies, badEstimates);
lastUncompressed = (newCached == null) ? lastUncompressed : newCached;
next = (newCached != null) ? newCached.next : null;
currentOffset = (next != null) ? next.getOffset() : -1;
@@ -777,9 +839,12 @@ class EncodedReaderImpl implements EncodedReader {
return lastUncompressed;
}
+ /** Subset of readEncodedStream specific to uncompressed streams, separate to avoid long methods. */
private CacheChunk prepareRangesForUncompressedRead(long cOffset, long endCOffset,
- long streamOffset, long unlockUntilCOffset, DiskRangeList current, ColumnStreamData columnStreamData)
- throws IOException {
+ long streamOffset, long unlockUntilCOffset, DiskRangeList current,
+ ColumnStreamData columnStreamData) throws IOException {
+ // Note: we are called after preReadUncompressedStream, so it doesn't have to do nearly as much
+ // as prepareRangesForCompressedRead does; e.g. every buffer is already a CacheChunk.
long currentOffset = cOffset;
CacheChunk lastUncompressed = null;
boolean isFirst = true;
@@ -819,11 +884,10 @@ class EncodedReaderImpl implements EncodedReader {
* We could avoid copy in non-zcr case and manage the buffer that was not allocated by our
* allocator. Uncompressed case is not mainline though so let's not complicate it.
*/
- private DiskRangeList preReadUncompressedStream(long baseOffset,
- DiskRangeList start, long streamOffset, long streamEnd) throws IOException {
+ private DiskRangeList preReadUncompressedStream(long baseOffset, DiskRangeList start,
+ long streamOffset, long streamEnd) throws IOException {
if (streamOffset == streamEnd) return null;
List<UncompressedCacheChunk> toCache = null;
- List<ByteBuffer> toRelease = null;
// 1. Find our bearings in the stream.
DiskRangeList current = findIntersectingPosition(start, streamOffset, streamEnd);
@@ -860,9 +924,6 @@ class EncodedReaderImpl implements EncodedReader {
if (current.getOffset() >= partEnd) {
continue; // We have no data at all for this part of the stream (could be unneeded), skip.
}
- if (toRelease == null && dataReader.isTrackingDiskRanges()) {
- toRelease = new ArrayList<ByteBuffer>();
- }
// We have some disk buffers... see if we have entire part, etc.
UncompressedCacheChunk candidateCached = null; // We will cache if we have the entire part.
DiskRangeList next = current;
@@ -877,21 +938,15 @@ class EncodedReaderImpl implements EncodedReader {
current = next;
if (noMoreDataForPart) break; // Done with this part.
- boolean wasSplit = false;
if (current.getEnd() > partEnd) {
// If the current buffer contains multiple parts, split it.
current = current.split(partEnd);
- wasSplit = true;
}
if (isTracingEnabled) {
LOG.trace("Processing uncompressed file data at ["
+ current.getOffset() + ", " + current.getEnd() + ")");
}
BufferChunk curBc = (BufferChunk)current;
- if (!wasSplit && toRelease != null) {
- toRelease.add(curBc.getChunk()); // TODO: is it valid to give zcr the modified 2nd part?
- }
-
// Track if we still have the entire part.
long hadEntirePartTo = hasEntirePartTo;
// We have data until the end of current block if we had it until the beginning.
@@ -952,15 +1007,7 @@ class EncodedReaderImpl implements EncodedReader {
++ix;
}
- // 5. Release original compressed buffers to zero-copy reader if needed.
- if (toRelease != null) {
- assert dataReader.isTrackingDiskRanges();
- for (ByteBuffer buf : toRelease) {
- dataReader.releaseBuffer(buf);
- }
- }
-
- // 6. Finally, put uncompressed data to cache.
+ // 5. Put uncompressed data to cache.
if (fileKey != null) {
long[] collisionMask = cacheWrapper.putFileData(fileKey, cacheKeys, targetBuffers, baseOffset);
processCacheCollisions(collisionMask, toCache, targetBuffers, null);
@@ -969,7 +1016,6 @@ class EncodedReaderImpl implements EncodedReader {
return lastUncompressed;
}
-
private int determineUncompressedPartSize() {
// We will break the uncompressed data in the cache in the chunks that are the size
// of the prevalent ORC compression buffer (the default), or maximum allocation (since we
@@ -1178,7 +1224,8 @@ class EncodedReaderImpl implements EncodedReader {
*/
private ProcCacheChunk addOneCompressionBuffer(BufferChunk current,
List<MemoryBuffer> cacheBuffers, List<ProcCacheChunk> toDecompress,
- List<ByteBuffer> toRelease, List<IncompleteCb> badEstimates) throws IOException {
+ IdentityHashMap<ByteBuffer, Boolean> toRelease, List<ByteBuffer> toReleaseCopies,
+ List<IncompleteCb> badEstimates) throws IOException {
ByteBuffer slice = null;
ByteBuffer compressed = current.getChunk();
long cbStartOffset = current.getOffset();
@@ -1201,12 +1248,8 @@ class EncodedReaderImpl implements EncodedReader {
// Simple case - CB fits entirely in the disk range.
slice = compressed.slice();
slice.limit(chunkLength);
- ProcCacheChunk cc = addOneCompressionBlockByteBuffer(slice, isUncompressed,
+ return addOneCompressionBlockByteBuffer(slice, isUncompressed,
cbStartOffset, cbEndOffset, chunkLength, current, toDecompress, cacheBuffers);
- if (compressed.remaining() <= 0 && dataReader.isTrackingDiskRanges()) {
- toRelease.add(compressed);
- }
- return cc;
}
if (current.getEnd() < cbEndOffset && !current.hasContiguousNext()) {
badEstimates.add(addIncompleteCompressionBuffer(cbStartOffset, current, 0));
@@ -1216,6 +1259,7 @@ class EncodedReaderImpl implements EncodedReader {
// TODO: we could remove extra copy for isUncompressed case by copying directly to cache.
// We need to consolidate 2 or more buffers into one to decompress.
ByteBuffer copy = allocateBuffer(chunkLength, compressed.isDirect());
+ toReleaseCopies.add(copy); // We will always release copies at the end.
int remaining = chunkLength - compressed.remaining();
int originalPos = compressed.position();
copy.put(compressed);
@@ -1224,12 +1268,8 @@ class EncodedReaderImpl implements EncodedReader {
}
DiskRangeList next = current.next;
current.removeSelf();
- if (dataReader.isTrackingDiskRanges()) {
- if (originalPos == 0) {
- dataReader.releaseBuffer(compressed); // We copied the entire buffer.
- } else {
- toRelease.add(compressed); // There might be slices depending on this buffer.
- }
+ if (originalPos == 0 && toRelease.remove(compressed)) {
+ releaseBuffer(compressed, true);
}
int extraChunkCount = 0;
@@ -1246,15 +1286,15 @@ class EncodedReaderImpl implements EncodedReader {
copy.put(slice);
ProcCacheChunk cc = addOneCompressionBlockByteBuffer(copy, isUncompressed,
cbStartOffset, cbEndOffset, remaining, (BufferChunk)next, toDecompress, cacheBuffers);
- if (compressed.remaining() <= 0 && dataReader.isTrackingDiskRanges()) {
- dataReader.releaseBuffer(compressed); // We copied the entire buffer.
- }
+ if (compressed.remaining() <= 0 && toRelease.remove(compressed)) {
+ releaseBuffer(compressed, true); // We copied the entire buffer.
+ } // else there's more data to process; will be handled in next call.
return cc;
}
remaining -= compressed.remaining();
- copy.put(compressed);
- if (dataReader.isTrackingDiskRanges()) {
- dataReader.releaseBuffer(compressed); // We copied the entire buffer.
+ copy.put(compressed); // TODO: move into the if below; account for release call
+ if (toRelease.remove(compressed)) {
+ releaseBuffer(compressed, true); // We copied the entire buffer.
}
DiskRangeList tmp = next;
next = next.hasContiguousNext() ? next.next : null;
@@ -1270,6 +1310,38 @@ class EncodedReaderImpl implements EncodedReader {
}
}
+ private void releaseBuffers(Collection<ByteBuffer> toRelease, boolean isFromDataReader) {
+ if (toRelease == null) return;
+ for (ByteBuffer buf : toRelease) {
+ releaseBuffer(buf, isFromDataReader);
+ }
+ }
+
+ private void releaseBuffer(ByteBuffer bb, boolean isFromDataReader) {
+ if (isTracingEnabled) {
+ LOG.trace("Releasing the buffer " + System.identityHashCode(bb));
+ }
+ if (isFromDataReader && dataReader.isTrackingDiskRanges()) {
+ dataReader.releaseBuffer(bb);
+ return;
+ }
+ Field localCf = cleanerField;
+ if (!bb.isDirect() || localCf == null) return;
+ try {
+ Cleaner cleaner = (Cleaner) localCf.get(bb);
+ if (cleaner != null) {
+ cleaner.clean();
+ } else {
+ LOG.debug("Unable to clean a buffer using cleaner - no cleaner");
+ }
+ } catch (Exception e) {
+ // leave it for GC to clean up
+ LOG.warn("Unable to clean direct buffers using Cleaner.");
+ cleanerField = null;
+ }
+ }
+
+
private IncompleteCb addIncompleteCompressionBuffer(
long cbStartOffset, DiskRangeList target, int extraChunkCount) {
IncompleteCb icb = new IncompleteCb(cbStartOffset, target.getEnd());
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java
index 26f1e75..a7bb5ee 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java
@@ -21,6 +21,7 @@ import java.util.Properties;
import java.util.TimeZone;
import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetTableUtils;
+import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils;
import org.apache.parquet.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -139,14 +140,11 @@ public class MapredParquetOutputFormat extends FileOutputFormat<NullWritable, Pa
String timeZoneID =
tableProperties.getProperty(ParquetTableUtils.PARQUET_INT96_WRITE_ZONE_PROPERTY);
if (!Strings.isNullOrEmpty(timeZoneID)) {
- if (!Arrays.asList(TimeZone.getAvailableIDs()).contains(timeZoneID)) {
- throw new IllegalStateException("Unexpected timezone id found for parquet int96 conversion: " + timeZoneID);
- }
+
+ NanoTimeUtils.validateTimeZone(timeZoneID);
return TimeZone.getTimeZone(timeZoneID);
}
- // If no timezone is defined in table properties, then adjust timestamps using
- // PARQUET_INT96_NO_ADJUSTMENT_ZONE timezone
- return TimeZone.getTimeZone(ParquetTableUtils.PARQUET_INT96_NO_ADJUSTMENT_ZONE);
+ return TimeZone.getDefault();
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java
index 8e33b7d..2954601 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java
@@ -20,6 +20,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
import org.apache.hadoop.hive.ql.io.parquet.read.ParquetFilterPredicateConverter;
import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetTableUtils;
+import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils;
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.serde2.SerDeStats;
@@ -44,7 +45,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.TimeZone;
@@ -170,7 +170,7 @@ public class ParquetRecordReaderBase {
boolean skipConversion = HiveConf.getBoolVar(configuration,
HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION);
FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
- if (!Strings.nullToEmpty(fileMetaData.getCreatedBy()).startsWith("parquet-mr") ||
+ if (!Strings.nullToEmpty(fileMetaData.getCreatedBy()).startsWith("parquet-mr") &&
skipConversion) {
// Impala writes timestamp values using GMT only. We should not try to convert Impala
// files to other type of timezones.
@@ -179,16 +179,12 @@ public class ParquetRecordReaderBase {
// TABLE_PARQUET_INT96_TIMEZONE is a table property used to detect what timezone conversion
// to use when reading Parquet timestamps.
timeZoneID = configuration.get(ParquetTableUtils.PARQUET_INT96_WRITE_ZONE_PROPERTY,
- ParquetTableUtils.PARQUET_INT96_NO_ADJUSTMENT_ZONE);
-
- if (!Arrays.asList(TimeZone.getAvailableIDs()).contains(timeZoneID)) {
- throw new IllegalStateException("Unexpected timezone id found for parquet int96 conversion: " + timeZoneID);
- }
+ TimeZone.getDefault().getID());
+ NanoTimeUtils.validateTimeZone(timeZoneID);
}
// 'timeZoneID' should be valid, since we did not throw exception above
- configuration.set(ParquetTableUtils.PARQUET_INT96_WRITE_ZONE_PROPERTY,
- TimeZone.getTimeZone(timeZoneID).getID());
+ configuration.set(ParquetTableUtils.PARQUET_INT96_WRITE_ZONE_PROPERTY,timeZoneID);
}
public FilterCompat.Filter setFilter(final JobConf conf, MessageType schema) {
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java
index 5dc8088..dbd6fb3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java
@@ -152,13 +152,26 @@ public class NanoTimeUtils {
calendar.setTimeInMillis(utcCalendar.getTimeInMillis());
- Calendar adjusterCalendar = copyToCalendarWithTZ(calendar, Calendar.getInstance());
+ Calendar adjusterCalendar = copyToCalendarWithTZ(calendar, getLocalCalendar());
Timestamp ts = new Timestamp(adjusterCalendar.getTimeInMillis());
ts.setNanos((int) nanos);
return ts;
}
+ /**
+ * Check if the string id is a valid java TimeZone id.
+ * TimeZone#getTimeZone will return "GMT" if the id cannot be understood.
+ * @param timeZoneID
+ */
+ public static void validateTimeZone(String timeZoneID) {
+ if (TimeZone.getTimeZone(timeZoneID).getID().equals("GMT")
+ && !"GMT".equals(timeZoneID)) {
+ throw new IllegalStateException(
+ "Unexpected timezone id found for parquet int96 conversion: " + timeZoneID);
+ }
+ }
+
private static Calendar copyToCalendarWithTZ(Calendar from, Calendar to) {
if(from.getTimeZone().getID().equals(to.getTimeZone().getID())) {
return from;
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
index 6ca1963..312cdac 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
@@ -121,8 +121,9 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
try {
serDeStats = new SerDeStats();
projectionPusher = new ProjectionPusher();
- if (oldInputSplit != null) {
- initialize(getSplit(oldInputSplit, conf), conf);
+ ParquetInputSplit inputSplit = getSplit(oldInputSplit, conf);
+ if (inputSplit != null) {
+ initialize(inputSplit, conf);
setTimeZoneConversion(jobConf, ((FileSplit) oldInputSplit).getPath());
}
colsToInclude = ColumnProjectionUtils.getReadColumnIDs(conf);
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
index 00c9645..5401c7b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.common.StatsSetupConst;
-import org.apache.hadoop.hive.common.StatsSetupConst.StatDB;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.Context;
@@ -58,7 +57,6 @@ import org.apache.hadoop.hive.ql.stats.StatsPublisher;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
@@ -77,8 +75,6 @@ import org.apache.logging.log4j.core.appender.RollingFileAppender;
@SuppressWarnings( { "deprecation"})
public class PartialScanTask extends Task<PartialScanWork> implements
Serializable, HadoopJobExecHook {
-
-
private static final long serialVersionUID = 1L;
protected transient JobConf job;
@@ -274,7 +270,7 @@ public class PartialScanTask extends Task<PartialScanWork> implements
return "RCFile Statistics Partial Scan";
}
- public static String INPUT_SEPERATOR = ":";
+ public static final String INPUT_SEPERATOR = ":";
public static void main(String[] args) {
String inputPathStr = null;
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index d255265..01e8a48 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -21,6 +21,11 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.plan.HiveOperation;
+import org.apache.hadoop.hive.ql.plan.LockDatabaseDesc;
+import org.apache.hadoop.hive.ql.plan.LockTableDesc;
+import org.apache.hadoop.hive.ql.plan.UnlockDatabaseDesc;
+import org.apache.hadoop.hive.ql.plan.UnlockTableDesc;
import org.apache.hive.common.util.ShutdownHookManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,6 +62,13 @@ import java.util.concurrent.atomic.AtomicInteger;
* with a single thread accessing it at a time, with the exception of {@link #heartbeat()} method.
* The later may (usually will) be called from a timer thread.
* See {@link #getMS()} for more important concurrency/metastore access notes.
+ *
+ * Each statement that the TM (transaction manager) should be aware of should belong to a transaction.
+ * Effectively, that means any statement that has side effects. Exceptions are statements like
+ * Show Compactions, Show Tables, Use Database foo, etc. The transaction is started either
+ * explicitly ( via Start Transaction SQL statement from end user - not fully supported) or
+ * implicitly by the {@link org.apache.hadoop.hive.ql.Driver} (which looks exactly as autoCommit=true
+ * from end user poit of view). See more at {@link #isExplicitTransaction}.
*/
public final class DbTxnManager extends HiveTxnManagerImpl {
@@ -76,7 +88,47 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
* to keep apart multiple writes of the same data within the same transaction
* Also see {@link org.apache.hadoop.hive.ql.io.AcidOutputFormat.Options}
*/
- private int statementId = -1;
+ private int writeId = -1;
+ /**
+ * counts number of statements in the current transaction
+ */
+ private int numStatements = 0;
+ /**
+ * if {@code true} it means current transaction is started via START TRANSACTION which means it cannot
+ * include any Operations which cannot be rolled back (drop partition; write to non-acid table).
+ * If false, it's a single statement transaction which can include any statement. This is not a
+ * contradiction from the user point of view who doesn't know anything about the implicit txn
+ * and cannot call rollback (the statement of course can fail in which case there is nothing to
+ * rollback (assuming the statement is well implemented)).
+ *
+ * This is done so that all commands run in a transaction which simplifies implementation and
+ * allows a simple implementation of multi-statement txns which don't require a lock manager
+ * capable of deadlock detection. (todo: not fully implemented; elaborate on how this LM works)
+ *
+ * Also, critically important, ensuring that everything runs in a transaction assigns an order
+ * to all operations in the system - needed for replication/DR.
+ *
+ * We don't want to allow non-transactional statements in a user demarcated txn because the effect
+ * of such statement is "visible" immediately on statement completion, but the user may
+ * issue a rollback but the action of the statement can't be undone (and has possibly already been
+ * seen by another txn). For example,
+ * start transaction
+ * insert into transactional_table values(1);
+ * insert into non_transactional_table select * from transactional_table;
+ * rollback
+ *
+ * The user would be in for a surprise especially if they are not aware of transactional
+ * properties of the tables involved.
+ *
+ * As a side note: what should the lock manager do with locks for non-transactional resources?
+ * Should it it release them at the end of the stmt or txn?
+ * Some interesting thoughts: http://mysqlmusings.blogspot.com/2009/02/mixing-engines-in-transactions.html
+ */
+ private boolean isExplicitTransaction = false;
+ /**
+ * To ensure transactions don't nest.
+ */
+ private int startTransactionCount = 0;
// QueryId for the query in current transaction
private String queryId;
@@ -141,15 +193,22 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
@VisibleForTesting
long openTxn(Context ctx, String user, long delay) throws LockException {
- //todo: why don't we lock the snapshot here??? Instead of having client make an explicit call
- //whenever it chooses
+ /*Q: why don't we lock the snapshot here??? Instead of having client make an explicit call
+ whenever it chooses
+ A: If we want to rely on locks for transaction scheduling we must get the snapshot after lock
+ acquisition. Relying on locks is a pessimistic strategy which works better under high
+ contention.*/
init();
+ getLockManager();
if(isTxnOpen()) {
throw new LockException("Transaction already opened. " + JavaUtils.txnIdToString(txnId));
}
try {
txnId = getMS().openTxn(user);
- statementId = 0;
+ writeId = 0;
+ numStatements = 0;
+ isExplicitTransaction = false;
+ startTransactionCount = 0;
LOG.debug("Opened " + JavaUtils.txnIdToString(txnId));
ctx.setHeartbeater(startHeartbeat(delay));
return txnId;
@@ -159,8 +218,8 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
}
/**
- * we don't expect multiple thread to call this method concurrently but {@link #lockMgr} will
- * be read by a different threads that one writing it, thus it's {@code volatile}
+ * we don't expect multiple threads to call this method concurrently but {@link #lockMgr} will
+ * be read by a different threads than one writing it, thus it's {@code volatile}
*/
@Override
public HiveLockManager getLockManager() throws LockException {
@@ -179,24 +238,95 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
catch(LockException e) {
if(e.getCause() instanceof TxnAbortedException) {
txnId = 0;
- statementId = -1;
+ writeId = -1;
}
throw e;
}
}
/**
- * This is for testing only. Normally client should call {@link #acquireLocks(org.apache.hadoop.hive.ql.QueryPlan, org.apache.hadoop.hive.ql.Context, String)}
+ * Watermark to include in error msgs and logs
+ * @param queryPlan
+ * @return
+ */
+ private static String getQueryIdWaterMark(QueryPlan queryPlan) {
+ return "queryId=" + queryPlan.getQueryId();
+ }
+
+ private void markExplicitTransaction(QueryPlan queryPlan) throws LockException {
+ isExplicitTransaction = true;
+ if(++startTransactionCount > 1) {
+ throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_IN_TXN, queryPlan.getOperationName(),
+ JavaUtils.txnIdToString(getCurrentTxnId()), queryPlan.getQueryId());
+ }
+
+ }
+ /**
+ * Ensures that the current SQL statement is appropriate for the current state of the
+ * Transaction Manager (e.g. can call commit unless you called start transaction)
+ *
+ * Note that support for multi-statement txns is a work-in-progress so it's only supported in
+ * HiveConf#HIVE_IN_TEST/HiveConf#TEZ_HIVE_IN_TEST.
+ * @param queryPlan
+ * @throws LockException
+ */
+ private void verifyState(QueryPlan queryPlan) throws LockException {
+ if(!isTxnOpen()) {
+ throw new LockException("No transaction context for operation: " + queryPlan.getOperationName() +
+ " for " + getQueryIdWaterMark(queryPlan));
+ }
+ if(queryPlan.getOperation() == null) {
+ throw new IllegalStateException("Unkown HiverOperation for " + getQueryIdWaterMark(queryPlan));
+ }
+ numStatements++;
+ switch (queryPlan.getOperation()) {
+ case START_TRANSACTION:
+ markExplicitTransaction(queryPlan);
+ break;
+ case COMMIT:
+ case ROLLBACK:
+ if(!isTxnOpen()) {
+ throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN, queryPlan.getOperationName());
+ }
+ if(!isExplicitTransaction) {
+ throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_IN_IMPLICIT_TXN, queryPlan.getOperationName());
+ }
+ break;
+ default:
+ if(!queryPlan.getOperation().isAllowedInTransaction() && isExplicitTransaction) {
+ //for example, drop table in an explicit txn is not allowed
+ //in some cases this requires looking at more than just the operation
+ //for example HiveOperation.LOAD - OK if target is MM table but not OK if non-acid table
+ throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_IN_TXN, queryPlan.getOperationName(),
+ JavaUtils.txnIdToString(getCurrentTxnId()), queryPlan.getQueryId());
+ }
+ }
+ /*
+ Should we allow writing to non-transactional tables in an explicit transaction? The user may
+ issue ROLLBACK but these tables won't rollback.
+ Can do this by checking ReadEntity/WriteEntity to determine whether it's reading/writing
+ any non acid and raise an appropriate error
+ * Driver.acidSinks and Driver.acidInQuery can be used if any acid is in the query*/
+ }
+ /**
+ * Normally client should call {@link #acquireLocks(org.apache.hadoop.hive.ql.QueryPlan, org.apache.hadoop.hive.ql.Context, String)}
* @param isBlocking if false, the method will return immediately; thus the locks may be in LockState.WAITING
* @return null if no locks were needed
*/
+ @VisibleForTesting
LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isBlocking) throws LockException {
init();
- // Make sure we've built the lock manager
+ // Make sure we've built the lock manager
getLockManager();
-
+ verifyState(plan);
boolean atLeastOneLock = false;
queryId = plan.getQueryId();
+ switch (plan.getOperation()) {
+ case SET_AUTOCOMMIT:
+ /**This is here for documentation purposes. This TM doesn't support this - only has one
+ * mode of operation documented at {@link DbTxnManager#isExplicitTransaction}*/
+ return null;
+ }
LockRequestBuilder rqstBuilder = new LockRequestBuilder(queryId);
//link queryId to txnId
@@ -240,8 +370,8 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
// This is a file or something we don't hold locks for.
continue;
}
- if(t != null && AcidUtils.isFullAcidTable(t)) {
- compBuilder.setIsAcid(true);
+ if(t != null) {
+ compBuilder.setIsAcid(AcidUtils.isFullAcidTable(t));
}
LockComponent comp = compBuilder.build();
LOG.debug("Adding lock component to lock request " + comp.toString());
@@ -262,7 +392,33 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
}
LockComponentBuilder compBuilder = new LockComponentBuilder();
Table t = null;
+ switch (output.getType()) {
+ case DATABASE:
+ compBuilder.setDbName(output.getDatabase().getName());
+ break;
+
+ case TABLE:
+ case DUMMYPARTITION: // in case of dynamic partitioning lock the table
+ t = output.getTable();
+ compBuilder.setDbName(t.getDbName());
+ compBuilder.setTableName(t.getTableName());
+ break;
+
+ case PARTITION:
+ compBuilder.setPartitionName(output.getPartition().getName());
+ t = output.getPartition().getTable();
+ compBuilder.setDbName(t.getDbName());
+ compBuilder.setTableName(t.getTableName());
+ break;
+
+ default:
+ // This is a file or something we don't hold locks for.
+ continue;
+ }
switch (output.getWriteType()) {
+ /* base this on HiveOperation instead? this and DDL_NO_LOCK is peppered all over the code...
+ Seems much cleaner if each stmt is identified as a particular HiveOperation (which I'd think
+ makes sense everywhere). This however would be problematic for merge...*/
case DDL_EXCLUSIVE:
case INSERT_OVERWRITE:
compBuilder.setExclusive();
@@ -270,10 +426,9 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
break;
case INSERT:
- t = getTable(output);
+ assert t != null;
if(AcidUtils.isFullAcidTable(t)) {
compBuilder.setShared();
- compBuilder.setIsAcid(true);
}
else {
if (conf.getBoolVar(HiveConf.ConfVars.HIVE_TXN_STRICT_LOCKING_MODE)) {
@@ -281,7 +436,6 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
} else { // this is backward compatible for non-ACID resources, w/o ACID semantics
compBuilder.setShared();
}
- compBuilder.setIsAcid(false);
}
compBuilder.setOperationType(DataOperationType.INSERT);
break;
@@ -293,12 +447,10 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
case UPDATE:
compBuilder.setSemiShared();
compBuilder.setOperationType(DataOperationType.UPDATE);
- t = getTable(output);
break;
case DELETE:
compBuilder.setSemiShared();
compBuilder.setOperationType(DataOperationType.DELETE);
- t = getTable(output);
break;
case DDL_NO_LOCK:
@@ -307,34 +459,11 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
default:
throw new RuntimeException("Unknown write type " +
output.getWriteType().toString());
-
}
- switch (output.getType()) {
- case DATABASE:
- compBuilder.setDbName(output.getDatabase().getName());
- break;
-
- case TABLE:
- case DUMMYPARTITION: // in case of dynamic partitioning lock the table
- t = output.getTable();
- compBuilder.setDbName(t.getDbName());
- compBuilder.setTableName(t.getTableName());
- break;
-
- case PARTITION:
- compBuilder.setPartitionName(output.getPartition().getName());
- t = output.getPartition().getTable();
- compBuilder.setDbName(t.getDbName());
- compBuilder.setTableName(t.getTableName());
- break;
-
- default:
- // This is a file or something we don't hold locks for.
- continue;
- }
- if(t != null && AcidUtils.isFullAcidTable(t)) {
- compBuilder.setIsAcid(true);
+ if(t != null) {
+ compBuilder.setIsAcid(AcidUtils.isFullAcidTable(t));
}
+
compBuilder.setIsDynamicPartitionWrite(output.isDynamicPartitionWrite());
LockComponent comp = compBuilder.build();
LOG.debug("Adding lock component to lock request " + comp.toString());
@@ -405,7 +534,8 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
e);
} finally {
txnId = 0;
- statementId = -1;
+ writeId = -1;
+ numStatements = 0;
}
}
@@ -429,7 +559,8 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
e);
} finally {
txnId = 0;
- statementId = -1;
+ writeId = -1;
+ numStatements = 0;
}
}
@@ -556,6 +687,26 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
public boolean supportsExplicitLock() {
return false;
}
+ @Override
+ public int lockTable(Hive db, LockTableDesc lockTbl) throws HiveException {
+ super.lockTable(db, lockTbl);
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public int unlockTable(Hive hiveDB, UnlockTableDesc unlockTbl) throws HiveException {
+ super.unlockTable(hiveDB, unlockTbl);
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public int lockDatabase(Hive hiveDB, LockDatabaseDesc lockDb) throws HiveException {
+ super.lockDatabase(hiveDB, lockDb);
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public int unlockDatabase(Hive hiveDB, UnlockDatabaseDesc unlockDb) throws HiveException {
+ super.unlockDatabase(hiveDB, unlockDb);
+ throw new UnsupportedOperationException();
+ }
@Override
public boolean useNewShowLocksFormat() {
@@ -566,7 +717,44 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
public boolean supportsAcid() {
return true;
}
-
+ /**
+ * In an explicit txn start_transaction is the 1st statement and we record the snapshot at the
+ * start of the txn for Snapshot Isolation. For Read Committed (not supported yet) we'd record
+ * it before executing each statement (but after lock acquisition if using lock based concurrency
+ * control).
+ * For implicit txn, the stmt that triggered/started the txn is the first statement
+ */
+ @Override
+ public boolean recordSnapshot(QueryPlan queryPlan) {
+ assert isTxnOpen();
+ assert numStatements > 0 : "was acquireLocks() called already?";
+ if(queryPlan.getOperation() == HiveOperation.START_TRANSACTION) {
+ //here if start of explicit txn
+ assert isExplicitTransaction;
+ assert numStatements == 1;
+ return true;
+ }
+ else if(!isExplicitTransaction) {
+ assert numStatements == 1 : "numStatements=" + numStatements + " in implicit txn";
+ if (queryPlan.hasAcidResourcesInQuery()) {
+ //1st and only stmt in implicit txn and uses acid resource
+ return true;
+ }
+ }
+ return false;
+ }
+ @Override
+ public boolean isImplicitTransactionOpen() {
+ if(!isTxnOpen()) {
+ //some commands like "show databases" don't start implicit transactions
+ return false;
+ }
+ if(!isExplicitTransaction) {
+ assert numStatements == 1 : "numStatements=" + numStatements;
+ return true;
+ }
+ return false;
+ }
@Override
protected void destruct() {
try {
@@ -626,7 +814,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
@Override
public int getWriteIdAndIncrement() {
assert isTxnOpen();
- return statementId++;
+ return writeId++;
}
private static long getHeartbeatInterval(Configuration conf) throws LockException {
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
index 53ee9c8..24df25b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
@@ -301,7 +301,8 @@ class DummyTxnManager extends HiveTxnManagerImpl {
new HiveLockObject.HiveLockObjectData(plan.getQueryId(),
String.valueOf(System.currentTimeMillis()),
"IMPLICIT",
- plan.getQueryStr());
+ plan.getQueryStr(),
+ conf);
if (db != null) {
locks.add(new HiveLockObj(new HiveLockObject(db.getName(), lockData),
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java
index fff03df..a514339 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java
@@ -23,6 +23,7 @@ import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.hive.common.StringInternUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.ql.metadata.DummyPartition;
import org.apache.hadoop.hive.ql.metadata.Hive;
@@ -48,16 +49,23 @@ public class HiveLockObject {
* Note: The parameters are used to uniquely identify a HiveLockObject.
* The parameters will be stripped off any ':' characters in order not
* to interfere with the way the data is serialized (':' delimited string).
+ * The query string might be truncated depending on HIVE_LOCK_QUERY_STRING_MAX_LENGTH
+ * @param queryId The query identifier will be added to the object without change
+ * @param lockTime The lock time will be added to the object without change
+ * @param lockMode The lock mode will be added to the object without change
+ * @param queryStr The query string might be truncated based on
+ * HIVE_LOCK_QUERY_STRING_MAX_LENGTH conf variable
+ * @param conf The hive configuration based on which we decide if we should truncate the query
+ * string or not
*/
- public HiveLockObjectData(String queryId,
- String lockTime,
- String lockMode,
- String queryStr) {
+ public HiveLockObjectData(String queryId, String lockTime, String lockMode, String queryStr,
+ HiveConf conf) {
this.queryId = removeDelimiter(queryId);
this.lockTime = StringInternUtils.internIfNotNull(removeDelimiter(lockTime));
this.lockMode = removeDelimiter(lockMode);
this.queryStr = StringInternUtils.internIfNotNull(
- removeDelimiter(queryStr == null ? null : queryStr.trim()));
+ queryStr == null ? null : StringUtils.substring(removeDelimiter(queryStr.trim()), 0,
+ conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_QUERY_STRING_MAX_LENGTH)));
}
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
index 187a658..b24351c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
@@ -72,7 +72,7 @@ public interface HiveTxnManager {
/**
* Acquire all of the locks needed by a query. If used with a query that
- * requires transactions, this should be called after {@link #openTxn(String)}.
+ * requires transactions, this should be called after {@link #openTxn(Context, String)}.
* A list of acquired locks will be stored in the
* {@link org.apache.hadoop.hive.ql.Context} object and can be retrieved
* via {@link org.apache.hadoop.hive.ql.Context#getHiveLocks}.
@@ -208,17 +208,13 @@ public interface HiveTxnManager {
boolean supportsAcid();
/**
- * This behaves exactly as
- * https://docs.oracle.com/javase/6/docs/api/java/sql/Connection.html#setAutoCommit(boolean)
- */
- void setAutoCommit(boolean autoCommit) throws LockException;
-
- /**
- * This behaves exactly as
- * https://docs.oracle.com/javase/6/docs/api/java/sql/Connection.html#getAutoCommit()
+ * For resources that support MVCC, the state of the DB must be recorded for the duration of the
+ * operation/transaction. Returns {@code true} if current statment needs to do this.
*/
- boolean getAutoCommit();
+ boolean recordSnapshot(QueryPlan queryPlan);
+ boolean isImplicitTransactionOpen();
+
boolean isTxnOpen();
/**
* if {@code isTxnOpen()}, returns the currently active transaction ID
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java
index a371a5a..8dbbf87 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java
@@ -45,7 +45,6 @@ import org.apache.hadoop.hive.ql.plan.UnlockTableDesc;
abstract class HiveTxnManagerImpl implements HiveTxnManager {
protected HiveConf conf;
- private boolean isAutoCommit = true;//true by default; matches JDBC spec
void setHiveConf(HiveConf c) {
conf = c;
@@ -68,16 +67,6 @@ abstract class HiveTxnManagerImpl implements HiveTxnManager {
destruct();
}
@Override
- public void setAutoCommit(boolean autoCommit) throws LockException {
- isAutoCommit = autoCommit;
- }
-
- @Override
- public boolean getAutoCommit() {
- return isAutoCommit;
- }
-
- @Override
public int lockTable(Hive db, LockTableDesc lockTbl) throws HiveException {
HiveLockManager lockMgr = getAndCheckLockManager();
@@ -93,7 +82,8 @@ abstract class HiveTxnManagerImpl implements HiveTxnManager {
new HiveLockObjectData(lockTbl.getQueryId(),
String.valueOf(System.currentTimeMillis()),
"EXPLICIT",
- lockTbl.getQueryStr());
+ lockTbl.getQueryStr(),
+ conf);
if (partSpec == null) {
HiveLock lck = lockMgr.lock(new HiveLockObject(tbl, lockData), mode, true);
@@ -151,7 +141,7 @@ abstract class HiveTxnManagerImpl implements HiveTxnManager {
HiveLockObjectData lockData =
new HiveLockObjectData(lockDb.getQueryId(),
String.valueOf(System.currentTimeMillis()),
- "EXPLICIT", lockDb.getQueryStr());
+ "EXPLICIT", lockDb.getQueryStr(), conf);
HiveLock lck = lockMgr.lock(new HiveLockObject(dbObj.getName(), lockData), mode, true);
if (lck == null) {
@@ -202,4 +192,13 @@ abstract class HiveTxnManagerImpl implements HiveTxnManager {
return lockMgr;
}
+ @Override
+ public boolean recordSnapshot(QueryPlan queryPlan) {
+ return false;
+ }
+ @Override
+ public boolean isImplicitTransactionOpen() {
+ return true;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
index c2a4806..9b46ae7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
@@ -285,8 +285,10 @@ public class ZooKeeperHiveLockManager implements HiveLockManager {
int tryNum = 0;
ZooKeeperHiveLock ret = null;
Set<String> conflictingLocks = new HashSet<String>();
+ Exception lastException = null;
do {
+ lastException = null;
tryNum++;
try {
if (tryNum > 1) {
@@ -298,26 +300,22 @@ public class ZooKeeperHiveLockManager implements HiveLockManager {
break;
}
} catch (Exception e1) {
+ lastException = e1;
if (e1 instanceof KeeperException) {
KeeperException e = (KeeperException) e1;
switch (e.code()) {
case CONNECTIONLOSS:
case OPERATIONTIMEOUT:
+ case NONODE:
+ case NODEEXISTS:
LOG.debug("Possibly transient ZooKeeper exception: ", e);
- continue;
+ break;
default:
LOG.error("Serious Zookeeper exception: ", e);
break;
}
- }
- if (tryNum >= numRetriesForLock) {
- console.printError("Unable to acquire " + key.getData().getLockMode()
- + ", " + mode + " lock " + key.getDisplayName() + " after "
- + tryNum + " attempts.");
- LOG.error("Exceeds maximum retries with errors: ", e1);
- printConflictingLocks(key,mode,conflictingLocks);
- conflictingLocks.clear();
- throw new LockException(e1);
+ } else {
+ LOG.error("Other unexpected exception: ", e1);
}
}
} while (tryNum < numRetriesForLock);
@@ -327,8 +325,11 @@ public class ZooKeeperHiveLockManager implements HiveLockManager {
+ ", " + mode + " lock " + key.getDisplayName() + " after "
+ tryNum + " attempts.");
printConflictingLocks(key,mode,conflictingLocks);
+ if (lastException != null) {
+ LOG.error("Exceeds maximum retries with errors: ", lastException);
+ throw new LockException(lastException);
+ }
}
- conflictingLocks.clear();
return ret;
}
@@ -350,6 +351,19 @@ public class ZooKeeperHiveLockManager implements HiveLockManager {
}
}
+ /**
+ * Creates a primitive lock object on ZooKeeper.
+ * @param key The lock data
+ * @param mode The lock mode (HiveLockMode - EXCLUSIVE/SHARED/SEMI_SHARED)
+ * @param keepAlive If true creating PERSISTENT ZooKeeper locks, otherwise EPHEMERAL ZooKeeper
+ * locks
+ * @param parentCreated If we expect, that the parent is already created then true, otherwise
+ * we will try to create the parents as well
+ * @param conflictingLocks The set where we should collect the conflicting locks when
+ * the logging level is set to DEBUG
+ * @return The created ZooKeeperHiveLock object, null if there was a conflicting lock
+ * @throws Exception If there was an unexpected Exception
+ */
private ZooKeeperHiveLock lockPrimitive(HiveLockObject key,
HiveLockMode mode, boolean keepAlive, boolean parentCreated,
Set<String> conflictingLocks)
@@ -390,7 +404,7 @@ public class ZooKeeperHiveLockManager implements HiveLockManager {
int seqNo = getSequenceNumber(res, getLockName(lastName, mode));
if (seqNo == -1) {
curatorFramework.delete().forPath(res);
- return null;
+ throw new LockException("The created node does not contain a sequence number: " + res);
}
List<String> children = curatorFramework.getChildren().forPath(lastName);
@@ -584,7 +598,6 @@ public class ZooKeeperHiveLockManager implements HiveLockManager {
/**
* @param conf Hive configuration
- * @param zkpClient The ZooKeeper client
* @param key The object to be compared against - if key is null, then get all locks
**/
private static List<HiveLock> getLocks(HiveConf conf,
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java b/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java
new file mode 100644
index 0000000..64ce100
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.log;
+
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.hive.common.LogUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.session.OperationLog;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.appender.RandomAccessFileAppender;
+import org.apache.logging.log4j.core.appender.routing.Route;
+import org.apache.logging.log4j.core.appender.routing.Routes;
+import org.apache.logging.log4j.core.appender.routing.RoutingAppender;
+import org.apache.logging.log4j.core.config.Configuration;
+import org.apache.logging.log4j.core.config.LoggerConfig;
+import org.apache.logging.log4j.core.config.Node;
+import org.apache.logging.log4j.core.config.plugins.Plugin;
+import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
+import org.apache.logging.log4j.core.config.plugins.PluginFactory;
+import org.apache.logging.log4j.core.config.plugins.processor.PluginEntry;
+import org.apache.logging.log4j.core.config.plugins.util.PluginType;
+import org.apache.logging.log4j.core.filter.AbstractFilter;
+import org.apache.logging.log4j.core.layout.PatternLayout;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+
+/**
+ * Divert appender to redirect operation logs to separate files.
+ */
+public class LogDivertAppender {
+ private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(LogDivertAppender.class.getName());
+ public static final String verboseLayout = "%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n";
+ public static final String nonVerboseLayout = "%-5p : %m%n";
+
+ /**
+ * A log filter that filters messages coming from the logger with the given names.
+ * It be used as a white list filter or a black list filter.
+ * We apply black list filter on the Loggers used by the log diversion stuff, so that
+ * they don't generate more logs for themselves when they process logs.
+ * White list filter is used for less verbose log collection
+ */
+ @Plugin(name = "NameFilter", category = "Core", elementType="filter", printObject = true)
+ private static class NameFilter extends AbstractFilter {
+ private Pattern namePattern;
+ private OperationLog.LoggingLevel loggingMode;
+
+ /* Patterns that are excluded in verbose logging level.
+ * Filter out messages coming from log processing classes, or we'll run an infinite loop.
+ */
+ private static final Pattern verboseExcludeNamePattern = Pattern.compile(Joiner.on("|").
+ join(new String[]{LOG.getName(), OperationLog.class.getName()}));
+
+ /* Patterns that are included in execution logging level.
+ * In execution mode, show only select logger messages.
+ */
+ private static final Pattern executionIncludeNamePattern = Pattern.compile(Joiner.on("|").
+ join(new String[]{"org.apache.hadoop.mapreduce.JobSubmitter",
+ "org.apache.hadoop.mapreduce.Job", "SessionState", Task.class.getName(),
+ Driver.class.getName(), "org.apache.hadoop.hive.ql.exec.spark.status.SparkJobMonitor"}));
+
+ /* Patterns that are included in performance logging level.
+ * In performance mode, show execution and performance logger messages.
+ */
+ private static final Pattern performanceIncludeNamePattern = Pattern.compile(
+ executionIncludeNamePattern.pattern() + "|" + PerfLogger.class.getName());
+
+ private void setCurrentNamePattern(OperationLog.LoggingLevel mode) {
+ if (mode == OperationLog.LoggingLevel.VERBOSE) {
+ this.namePattern = verboseExcludeNamePattern;
+ } else if (mode == OperationLog.LoggingLevel.EXECUTION) {
+ this.namePattern = executionIncludeNamePattern;
+ } else if (mode == OperationLog.LoggingLevel.PERFORMANCE) {
+ this.namePattern = performanceIncludeNamePattern;
+ }
+ }
+
+ public NameFilter(OperationLog.LoggingLevel loggingMode) {
+ this.loggingMode = loggingMode;
+ setCurrentNamePattern(loggingMode);
+ }
+
+ @Override
+ public Result filter(LogEvent event) {
+ boolean excludeMatches = (loggingMode == OperationLog.LoggingLevel.VERBOSE);
+
+ String logLevel = event.getContextMap().get(LogUtils.OPERATIONLOG_LEVEL_KEY);
+ logLevel = logLevel == null ? "" : logLevel;
+ OperationLog.LoggingLevel currentLoggingMode = OperationLog.getLoggingLevel(logLevel);
+ // If logging is disabled, deny everything.
+ if (currentLoggingMode == OperationLog.LoggingLevel.NONE) {
+ return Result.DENY;
+ }
+ // Look at the current session's setting
+ // and set the pattern and excludeMatches accordingly.
+ if (currentLoggingMode != loggingMode) {
+ loggingMode = currentLoggingMode;
+ excludeMatches = (loggingMode == OperationLog.LoggingLevel.VERBOSE);
+ setCurrentNamePattern(loggingMode);
+ }
+
+ boolean isMatch = namePattern.matcher(event.getLoggerName()).matches();
+
+ if (excludeMatches == isMatch) {
+ // Deny if this is black-list filter (excludeMatches = true) and it
+ // matched or if this is whitelist filter and it didn't match
+ return Result.DENY;
+ }
+
+ return Result.NEUTRAL;
+ }
+
+ @PluginFactory
+ public static NameFilter createFilter(
+ @PluginAttribute("loggingLevel") final String loggingLevel) {
+ // Name required for routing. Error out if it is not set.
+ Preconditions.checkNotNull(loggingLevel,
+ "loggingLevel must be specified for " + NameFilter.class.getName());
+
+ return new NameFilter(OperationLog.getLoggingLevel(loggingLevel));
+ }
+ }
+
+ /**
+ * Programmatically register a routing appender to Log4J configuration, which
+ * automatically writes the log of each query to an individual file.
+ * The equivilent property configuration is as follows:
+ * # queryId based routing file appender
+ appender.query-routing.type = Routing
+ appender.query-routing.name = query-routing
+ appender.query-routing.routes.type = Routes
+ appender.query-routing.routes.pattern = $${ctx:queryId}
+ # default route
+ appender.query-routing.routes.route-default.type = Route
+ appender.query-routing.routes.route-default.key = $${ctx:queryId}
+ appender.query-routing.routes.route-default.app.type = null
+ appender.query-routing.routes.route-default.app.name = Null
+ # queryId based route
+ appender.query-routing.routes.route-mdc.type = Route
+ appender.query-routing.routes.route-mdc.name = IrrelevantName-query-routing
+ appender.query-routing.routes.route-mdc.app.type = RandomAccessFile
+ appender.query-routing.routes.route-mdc.app.name = query-file-appender
+ appender.query-routing.routes.route-mdc.app.fileName = ${sys:hive.log.dir}/${ctx:sessionId}/${ctx:queryId}
+ appender.query-routing.routes.route-mdc.app.layout.type = PatternLayout
+ appender.query-routing.routes.route-mdc.app.layout.pattern = %d{ISO8601} %5p %c{2}: %m%n
+ * @param conf the configuration for HiveServer2 instance
+ */
+ public static void registerRoutingAppender(org.apache.hadoop.conf.Configuration conf) {
+ String loggingLevel = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LEVEL);
+ OperationLog.LoggingLevel loggingMode = OperationLog.getLoggingLevel(loggingLevel);
+ String layout = loggingMode == OperationLog.LoggingLevel.VERBOSE ? verboseLayout : nonVerboseLayout;
+ String logLocation = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION);
+
+ // Create NullAppender
+ PluginEntry nullEntry = new PluginEntry();
+ nullEntry.setClassName(NullAppender.class.getName());
+ nullEntry.setKey("null");
+ nullEntry.setName("appender");
+ PluginType<NullAppender> nullChildType = new PluginType<NullAppender>(nullEntry, NullAppender.class, "appender");
+ Node nullChildNode = new Node(null, "Null", nullChildType);
+
+ // Create default route
+ PluginEntry defaultEntry = new PluginEntry();
+ defaultEntry.setClassName(Route.class.getName());
+ defaultEntry.setKey("route");
+ defaultEntry.setName("Route");
+ PluginType<Route> defaultType = new PluginType<Route>(defaultEntry, Route.class, "Route");
+ Node nullNode = new Node(null, "Route", defaultType);
+ nullNode.getChildren().add(nullChildNode);
+ Route defaultRoute = Route.createRoute(null, "${ctx:queryId}", nullNode);
+
+ // Create queryId based route
+ PluginEntry entry = new PluginEntry();
+ entry.setClassName(Route.class.getName());
+ entry.setKey("route");
+ entry.setName("Route");
+ PluginType<Route> type = new PluginType<Route>(entry, Route.class, "Route");
+ Node node = new Node(null, "Route", type);
+
+ PluginEntry childEntry = new PluginEntry();
+ childEntry.setClassName(RandomAccessFileAppender.class.getName());
+ childEntry.setKey("randomaccessfile");
+ childEntry.setName("appender");
+ PluginType<RandomAccessFileAppender> childType = new PluginType<RandomAccessFileAppender>(childEntry, RandomAccessFileAppender.class, "appender");
+ Node childNode = new Node(node, "RandomAccessFile", childType);
+ childNode.getAttributes().put("name", "query-file-appender");
+ childNode.getAttributes().put("fileName", logLocation + "/${ctx:sessionId}/${ctx:queryId}");
+ node.getChildren().add(childNode);
+
+ PluginEntry filterEntry = new PluginEntry();
+ filterEntry.setClassName(NameFilter.class.getName());
+ filterEntry.setKey("namefilter");
+ filterEntry.setName("namefilter");
+ PluginType<NameFilter> filterType = new PluginType<NameFilter>(filterEntry, NameFilter.class, "filter");
+ Node filterNode = new Node(childNode, "NameFilter", filterType);
+ filterNode.getAttributes().put("loggingLevel", loggingMode.name());
+ childNode.getChildren().add(filterNode);
+
+ PluginEntry layoutEntry = new PluginEntry();
+ layoutEntry.setClassName(PatternLayout.class.getName());
+ layoutEntry.setKey("patternlayout");
+ layoutEntry.setName("layout");
+ PluginType<PatternLayout> layoutType = new PluginType<PatternLayout>(layoutEntry, PatternLayout.class, "layout");
+ Node layoutNode = new Node(childNode, "PatternLayout", layoutType);
+ layoutNode.getAttributes().put("pattern", layout);
+ childNode.getChildren().add(layoutNode);
+
+ Route mdcRoute = Route.createRoute(null, null, node);
+ Routes routes = Routes.createRoutes("${ctx:queryId}", defaultRoute, mdcRoute);
+
+ LoggerContext context = (LoggerContext) LogManager.getContext(false);
+ Configuration configuration = context.getConfiguration();
+
+ RoutingAppender routingAppender = RoutingAppender.createAppender("query-routing",
+ "true",
+ routes,
+ configuration,
+ null,
+ null,
+ null);
+
+ LoggerConfig loggerConfig = configuration.getRootLogger();
+ loggerConfig.addAppender(routingAppender, null, null);
+ context.updateLoggers();
+ routingAppender.start();
+ }
+}