You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/08 20:42:54 UTC
[04/51] [partial] hive git commit: Revert "HIVE-14671 : merge master
into hive-14535 (Wei Zheng)"
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
index b018adb..0b1ac4b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
@@ -37,7 +37,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import com.google.common.annotations.VisibleForTesting;
-
import org.apache.hadoop.hive.common.StringInternUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,8 +45,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
-import org.apache.hadoop.hive.ql.Driver.DriverState;
-import org.apache.hadoop.hive.ql.Driver.LockedDriverState;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.log.PerfLogger;
@@ -366,12 +363,8 @@ public class CombineHiveInputFormat<K extends WritableComparable, V extends Writ
Map<CombinePathInputFormat, CombineFilter> poolMap =
new HashMap<CombinePathInputFormat, CombineFilter>();
Set<Path> poolSet = new HashSet<Path>();
- LockedDriverState lDrvStat = LockedDriverState.getLockedDriverState();
for (Path path : paths) {
- if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT)
- throw new IOException("Operation is Canceled. ");
-
PartitionDesc part = HiveFileFormatUtils.getFromPathRecursively(
pathToPartitionInfo, path, IOPrepareCache.get().allocatePartitionDescMap());
TableDesc tableDesc = part.getTableDesc();
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
index f73a8e3..cc77e4c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
@@ -87,7 +87,7 @@ public final class HiveFileFormatUtils {
public static class FileChecker {
// we don't have many file formats that implement InputFormatChecker. We won't be holding
// multiple instances of such classes
- private static final int MAX_CACHE_SIZE = 16;
+ private static int MAX_CACHE_SIZE = 16;
// immutable maps
Map<Class<? extends InputFormat>, Class<? extends InputFormatChecker>> inputFormatCheckerMap;
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index 9b83cb4..c697407 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -89,13 +89,14 @@ import org.apache.hive.common.util.ReflectionUtil;
*/
public class HiveInputFormat<K extends WritableComparable, V extends Writable>
implements InputFormat<K, V>, JobConfigurable {
+
private static final String CLASS_NAME = HiveInputFormat.class.getName();
private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
/**
* A cache of InputFormat instances.
*/
- private static final Map<Class, InputFormat<WritableComparable, Writable>> inputFormats
+ private static Map<Class, InputFormat<WritableComparable, Writable>> inputFormats
= new ConcurrentHashMap<Class, InputFormat<WritableComparable, Writable>>();
private JobConf job;
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java b/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
index f41edc4..d391164 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
@@ -839,7 +839,7 @@ public class RCFile {
// the max size of memory for buffering records before writes them out
private int columnsBufferSize = 4 * 1024 * 1024; // 4M
// the conf string for COLUMNS_BUFFER_SIZE
- public static final String COLUMNS_BUFFER_SIZE_CONF_STR = "hive.io.rcfile.record.buffer.size";
+ public static String COLUMNS_BUFFER_SIZE_CONF_STR = "hive.io.rcfile.record.buffer.size";
// how many records already buffered
private int bufferedRecords = 0;
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
index cbd38ed..96ca736 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.orc.FileMetadata;
import org.apache.orc.PhysicalWriter;
-import org.apache.orc.MemoryManager;
+import org.apache.orc.impl.MemoryManager;
import org.apache.orc.TypeDescription;
import org.apache.orc.impl.OrcTail;
@@ -258,7 +258,7 @@ public final class OrcFile extends org.apache.orc.OrcFile {
/**
* A package local option to set the memory manager.
*/
- public WriterOptions memory(MemoryManager value) {
+ protected WriterOptions memory(MemoryManager value) {
super.memory(value);
return this;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 8fb7211..59682db 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -158,7 +158,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
private static final Logger LOG = LoggerFactory.getLogger(OrcInputFormat.class);
- private static final boolean isDebugEnabled = LOG.isDebugEnabled();
+ private static boolean isDebugEnabled = LOG.isDebugEnabled();
static final HadoopShims SHIMS = ShimLoader.getHadoopShims();
private static final long DEFAULT_MIN_SPLIT_SIZE = 16 * 1024 * 1024;
@@ -1531,7 +1531,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
Reader.Options readerOptions = new Reader.Options(context.conf);
if (readerTypes == null) {
readerIncluded = genIncludedColumns(fileSchema, context.conf);
- evolution = new SchemaEvolution(fileSchema, null, readerOptions.include(readerIncluded));
+ evolution = new SchemaEvolution(fileSchema, readerOptions.include(readerIncluded));
} else {
// The reader schema always comes in without ACID columns.
TypeDescription readerSchema = OrcUtils.convertTypeFromProtobuf(readerTypes, 0);
@@ -1913,6 +1913,10 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
}
+ // The schema type description does not include the ACID fields (i.e. it is the
+ // non-ACID original schema).
+ private static boolean SCHEMA_TYPES_IS_ORIGINAL = true;
+
@Override
public RowReader<OrcStruct> getReader(InputSplit inputSplit,
Options options)
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
index 5b2e9b5..0ac3ec5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
@@ -18,12 +18,9 @@
package org.apache.hadoop.hive.ql.io.orc.encoded;
import java.io.IOException;
-import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
-import java.util.IdentityHashMap;
import java.util.List;
import org.slf4j.Logger;
@@ -46,13 +43,11 @@ import org.apache.orc.impl.RecordReaderUtils;
import org.apache.orc.impl.StreamName;
import org.apache.orc.StripeInformation;
import org.apache.orc.impl.BufferChunk;
+import org.apache.hadoop.hive.llap.DebugUtils;
import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch;
import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.PoolFactory;
import org.apache.orc.OrcProto;
-import sun.misc.Cleaner;
-
-
/**
* Encoded reader implementation.
*
@@ -85,17 +80,6 @@ import sun.misc.Cleaner;
*/
class EncodedReaderImpl implements EncodedReader {
public static final Logger LOG = LoggerFactory.getLogger(EncodedReaderImpl.class);
- private static Field cleanerField;
- static {
- try {
- // TODO: To make it work for JDK9 use CleanerUtil from https://issues.apache.org/jira/browse/HADOOP-12760
- final Class<?> dbClazz = Class.forName("java.nio.DirectByteBuffer");
- cleanerField = dbClazz.getDeclaredField("cleaner");
- cleanerField.setAccessible(true);
- } catch (Throwable t) {
- cleanerField = null;
- }
- }
private static final Object POOLS_CREATION_LOCK = new Object();
private static Pools POOLS;
private static class Pools {
@@ -220,8 +204,8 @@ class EncodedReaderImpl implements EncodedReader {
@Override
public void readEncodedColumns(int stripeIx, StripeInformation stripe,
- OrcProto.RowIndex[] indexes, List<OrcProto.ColumnEncoding> encodings,
- List<OrcProto.Stream> streamList, boolean[] included, boolean[][] colRgs,
+ OrcProto.RowIndex[] indexes, List<OrcProto.ColumnEncoding> encodings, List<OrcProto.Stream> streamList,
+ boolean[] included, boolean[][] colRgs,
Consumer<OrcEncodedColumnBatch> consumer) throws IOException {
// Note: for now we don't have to setError here, caller will setError if we throw.
// We are also not supposed to call setDone, since we are only part of the operation.
@@ -319,35 +303,15 @@ class EncodedReaderImpl implements EncodedReader {
}
}
- // TODO: the memory release could be optimized - we could release original buffers after we
- // are fully done with each original buffer from disk. For now release all at the end;
- // it doesn't increase the total amount of memory we hold, just the duration a bit.
- // This is much simpler - we can just remember original ranges after reading them, and
- // release them at the end. In a few cases where it's easy to determine that a buffer
- // can be freed in advance, we remove it from the map.
- IdentityHashMap<ByteBuffer, Boolean> toRelease = null;
if (!isAllInCache.value) {
if (!isDataReaderOpen) {
this.dataReader.open();
isDataReaderOpen = true;
}
dataReader.readFileData(toRead.next, stripeOffset, cacheWrapper.getAllocator().isDirectAlloc());
- toRelease = new IdentityHashMap<>();
- DiskRangeList drl = toRead.next;
- while (drl != null) {
- if (drl instanceof BufferChunk) {
- toRelease.put(drl.getData(), true);
- }
- drl = drl.next;
- }
}
// 3. For uncompressed case, we need some special processing before read.
- // Basically, we are trying to create artificial, consistent ranges to cache, as there are
- // no CBs in an uncompressed file. At the end of this processing, the list would contain
- // either cache buffers, or buffers allocated by us and not cached (if we are only reading
- // parts of the data for some ranges and don't want to cache it). Both are represented by
- // CacheChunks, so the list is just CacheChunk-s from that point on.
DiskRangeList iter = toRead.next; // Keep "toRead" list for future use, don't extract().
if (codec == null) {
for (int colIx = 0; colIx < colCtxs.length; ++colIx) {
@@ -362,12 +326,6 @@ class EncodedReaderImpl implements EncodedReader {
}
}
}
- // Release buffers as we are done with all the streams... also see toRelease comment.\
- // With uncompressed streams, we know we are done earlier.
- if (toRelease != null) {
- releaseBuffers(toRelease.keySet(), true);
- toRelease = null;
- }
if (isTracingEnabled) {
LOG.trace("Disk ranges after pre-read (file " + fileKey + ", base offset "
+ stripeOffset + "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next));
@@ -416,8 +374,8 @@ class EncodedReaderImpl implements EncodedReader {
if (sctx.stripeLevelStream == null) {
sctx.stripeLevelStream = POOLS.csdPool.take();
// We will be using this for each RG while also sending RGs to processing.
- // To avoid buffers being unlocked, run refcount one ahead; so each RG
- // processing will decref once, and the
+ // To avoid buffers being unlocked, run refcount one ahead; we will not increase
+ // it when building the last RG, so each RG processing will decref once, and the
// last one will unlock the buffers.
sctx.stripeLevelStream.incRef();
// For stripe-level streams we don't need the extra refcount on the block.
@@ -425,12 +383,14 @@ class EncodedReaderImpl implements EncodedReader {
long unlockUntilCOffset = sctx.offset + sctx.length;
DiskRangeList lastCached = readEncodedStream(stripeOffset, iter,
sctx.offset, sctx.offset + sctx.length, sctx.stripeLevelStream,
- unlockUntilCOffset, sctx.offset, toRelease);
+ unlockUntilCOffset, sctx.offset);
if (lastCached != null) {
iter = lastCached;
}
}
- sctx.stripeLevelStream.incRef();
+ if (!isLastRg) {
+ sctx.stripeLevelStream.incRef();
+ }
cb = sctx.stripeLevelStream;
} else {
// This stream can be separated by RG using index. Let's do that.
@@ -451,7 +411,7 @@ class EncodedReaderImpl implements EncodedReader {
boolean isStartOfStream = sctx.bufferIter == null;
DiskRangeList lastCached = readEncodedStream(stripeOffset,
(isStartOfStream ? iter : sctx.bufferIter), cOffset, endCOffset, cb,
- unlockUntilCOffset, sctx.offset, toRelease);
+ unlockUntilCOffset, sctx.offset);
if (lastCached != null) {
sctx.bufferIter = iter = lastCached;
}
@@ -477,27 +437,7 @@ class EncodedReaderImpl implements EncodedReader {
}
// Release the unreleased buffers. See class comment about refcounts.
- for (int colIx = 0; colIx < colCtxs.length; ++colIx) {
- ColumnReadContext ctx = colCtxs[colIx];
- if (ctx == null) continue; // This column is not included.
- for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) {
- StreamContext sctx = ctx.streams[streamIx];
- if (sctx == null || sctx.stripeLevelStream == null) continue;
- if (0 != sctx.stripeLevelStream.decRef()) continue;
- for (MemoryBuffer buf : sctx.stripeLevelStream.getCacheBuffers()) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Unlocking {} at the end of processing", buf);
- }
- cacheWrapper.releaseBuffer(buf);
- }
- }
- }
-
releaseInitialRefcounts(toRead.next);
- // Release buffers as we are done with all the streams... also see toRelease comment.
- if (toRelease != null) {
- releaseBuffers(toRelease.keySet(), true);
- }
releaseCacheChunksIntoObjectPool(toRead.next);
}
@@ -665,8 +605,8 @@ class EncodedReaderImpl implements EncodedReader {
* the master list, so they are safe to keep as iterators for various streams.
*/
public DiskRangeList readEncodedStream(long baseOffset, DiskRangeList start, long cOffset,
- long endCOffset, ColumnStreamData csd, long unlockUntilCOffset, long streamOffset,
- IdentityHashMap<ByteBuffer, Boolean> toRelease) throws IOException {
+ long endCOffset, ColumnStreamData csd, long unlockUntilCOffset, long streamOffset)
+ throws IOException {
if (csd.getCacheBuffers() == null) {
csd.setCacheBuffers(new ArrayList<MemoryBuffer>());
} else {
@@ -675,10 +615,10 @@ class EncodedReaderImpl implements EncodedReader {
if (cOffset == endCOffset) return null;
boolean isCompressed = codec != null;
List<ProcCacheChunk> toDecompress = null;
+ List<ByteBuffer> toRelease = null;
List<IncompleteCb> badEstimates = null;
- List<ByteBuffer> toReleaseCopies = null;
if (isCompressed) {
- toReleaseCopies = new ArrayList<>();
+ toRelease = !dataReader.isTrackingDiskRanges() ? null : new ArrayList<ByteBuffer>();
toDecompress = new ArrayList<>();
badEstimates = new ArrayList<>();
}
@@ -696,8 +636,8 @@ class EncodedReaderImpl implements EncodedReader {
// 2. Go thru the blocks; add stuff to results and prepare the decompression work (see below).
try {
lastUncompressed = isCompressed ?
- prepareRangesForCompressedRead(cOffset, endCOffset, streamOffset, unlockUntilCOffset,
- current, csd, toRelease, toReleaseCopies, toDecompress, badEstimates)
+ prepareRangesForCompressedRead(cOffset, endCOffset, streamOffset,
+ unlockUntilCOffset, current, csd, toRelease, toDecompress, badEstimates)
: prepareRangesForUncompressedRead(
cOffset, endCOffset, streamOffset, unlockUntilCOffset, current, csd);
} catch (Exception ex) {
@@ -717,10 +657,7 @@ class EncodedReaderImpl implements EncodedReader {
assert result == null; // We don't expect conflicts from bad estimates.
}
- if (toDecompress == null || toDecompress.isEmpty()) {
- releaseBuffers(toReleaseCopies, false);
- return lastUncompressed; // Nothing to do.
- }
+ if (toDecompress == null || toDecompress.isEmpty()) return lastUncompressed; // Nothing to do.
// 3. Allocate the buffers, prepare cache keys.
// At this point, we have read all the CBs we need to read. cacheBuffers contains some cache
@@ -753,18 +690,21 @@ class EncodedReaderImpl implements EncodedReader {
cacheWrapper.reuseBuffer(chunk.getBuffer());
}
- // 5. Release the copies we made directly to the cleaner.
- releaseBuffers(toReleaseCopies, false);
+ // 5. Release original compressed buffers to zero-copy reader if needed.
+ if (toRelease != null) {
+ assert dataReader.isTrackingDiskRanges();
+ for (ByteBuffer buffer : toRelease) {
+ dataReader.releaseBuffer(buffer);
+ }
+ }
// 6. Finally, put uncompressed data to cache.
if (fileKey != null) {
- long[] collisionMask = cacheWrapper.putFileData(
- fileKey, cacheKeys, targetBuffers, baseOffset);
+ long[] collisionMask = cacheWrapper.putFileData(fileKey, cacheKeys, targetBuffers, baseOffset);
processCacheCollisions(collisionMask, toDecompress, targetBuffers, csd.getCacheBuffers());
}
- // 7. It may happen that we know we won't use some cache buffers anymore (the alternative
- // is that we will use the same buffers for other streams in separate calls).
+ // 7. It may happen that we know we won't use some compression buffers anymore.
// Release initial refcounts.
for (ProcCacheChunk chunk : toDecompress) {
ponderReleaseInitialRefcount(unlockUntilCOffset, streamOffset, chunk);
@@ -773,11 +713,9 @@ class EncodedReaderImpl implements EncodedReader {
return lastUncompressed;
}
- /** Subset of readEncodedStream specific to compressed streams, separate to avoid long methods. */
private CacheChunk prepareRangesForCompressedRead(long cOffset, long endCOffset,
- long streamOffset, long unlockUntilCOffset, DiskRangeList current,
- ColumnStreamData columnStreamData, IdentityHashMap<ByteBuffer, Boolean> toRelease,
- List<ByteBuffer> toReleaseCopies, List<ProcCacheChunk> toDecompress,
+ long streamOffset, long unlockUntilCOffset, DiskRangeList current, ColumnStreamData columnStreamData,
+ List<ByteBuffer> toRelease, List<ProcCacheChunk> toDecompress,
List<IncompleteCb> badEstimates) throws IOException {
if (cOffset > current.getOffset()) {
// Target compression block is in the middle of the range; slice the range in two.
@@ -824,8 +762,8 @@ class EncodedReaderImpl implements EncodedReader {
throw new RuntimeException(msg);
}
BufferChunk bc = (BufferChunk)current;
- ProcCacheChunk newCached = addOneCompressionBuffer(bc, columnStreamData.getCacheBuffers(),
- toDecompress, toRelease, toReleaseCopies, badEstimates);
+ ProcCacheChunk newCached = addOneCompressionBuffer(
+ bc, columnStreamData.getCacheBuffers(), toDecompress, toRelease, badEstimates);
lastUncompressed = (newCached == null) ? lastUncompressed : newCached;
next = (newCached != null) ? newCached.next : null;
currentOffset = (next != null) ? next.getOffset() : -1;
@@ -839,12 +777,9 @@ class EncodedReaderImpl implements EncodedReader {
return lastUncompressed;
}
- /** Subset of readEncodedStream specific to uncompressed streams, separate to avoid long methods. */
private CacheChunk prepareRangesForUncompressedRead(long cOffset, long endCOffset,
- long streamOffset, long unlockUntilCOffset, DiskRangeList current,
- ColumnStreamData columnStreamData) throws IOException {
- // Note: we are called after preReadUncompressedStream, so it doesn't have to do nearly as much
- // as prepareRangesForCompressedRead does; e.g. every buffer is already a CacheChunk.
+ long streamOffset, long unlockUntilCOffset, DiskRangeList current, ColumnStreamData columnStreamData)
+ throws IOException {
long currentOffset = cOffset;
CacheChunk lastUncompressed = null;
boolean isFirst = true;
@@ -884,10 +819,11 @@ class EncodedReaderImpl implements EncodedReader {
* We could avoid copy in non-zcr case and manage the buffer that was not allocated by our
* allocator. Uncompressed case is not mainline though so let's not complicate it.
*/
- private DiskRangeList preReadUncompressedStream(long baseOffset, DiskRangeList start,
- long streamOffset, long streamEnd) throws IOException {
+ private DiskRangeList preReadUncompressedStream(long baseOffset,
+ DiskRangeList start, long streamOffset, long streamEnd) throws IOException {
if (streamOffset == streamEnd) return null;
List<UncompressedCacheChunk> toCache = null;
+ List<ByteBuffer> toRelease = null;
// 1. Find our bearings in the stream.
DiskRangeList current = findIntersectingPosition(start, streamOffset, streamEnd);
@@ -924,6 +860,9 @@ class EncodedReaderImpl implements EncodedReader {
if (current.getOffset() >= partEnd) {
continue; // We have no data at all for this part of the stream (could be unneeded), skip.
}
+ if (toRelease == null && dataReader.isTrackingDiskRanges()) {
+ toRelease = new ArrayList<ByteBuffer>();
+ }
// We have some disk buffers... see if we have entire part, etc.
UncompressedCacheChunk candidateCached = null; // We will cache if we have the entire part.
DiskRangeList next = current;
@@ -938,15 +877,21 @@ class EncodedReaderImpl implements EncodedReader {
current = next;
if (noMoreDataForPart) break; // Done with this part.
+ boolean wasSplit = false;
if (current.getEnd() > partEnd) {
// If the current buffer contains multiple parts, split it.
current = current.split(partEnd);
+ wasSplit = true;
}
if (isTracingEnabled) {
LOG.trace("Processing uncompressed file data at ["
+ current.getOffset() + ", " + current.getEnd() + ")");
}
BufferChunk curBc = (BufferChunk)current;
+ if (!wasSplit && toRelease != null) {
+ toRelease.add(curBc.getChunk()); // TODO: is it valid to give zcr the modified 2nd part?
+ }
+
// Track if we still have the entire part.
long hadEntirePartTo = hasEntirePartTo;
// We have data until the end of current block if we had it until the beginning.
@@ -1007,7 +952,15 @@ class EncodedReaderImpl implements EncodedReader {
++ix;
}
- // 5. Put uncompressed data to cache.
+ // 5. Release original compressed buffers to zero-copy reader if needed.
+ if (toRelease != null) {
+ assert dataReader.isTrackingDiskRanges();
+ for (ByteBuffer buf : toRelease) {
+ dataReader.releaseBuffer(buf);
+ }
+ }
+
+ // 6. Finally, put uncompressed data to cache.
if (fileKey != null) {
long[] collisionMask = cacheWrapper.putFileData(fileKey, cacheKeys, targetBuffers, baseOffset);
processCacheCollisions(collisionMask, toCache, targetBuffers, null);
@@ -1016,6 +969,7 @@ class EncodedReaderImpl implements EncodedReader {
return lastUncompressed;
}
+
private int determineUncompressedPartSize() {
// We will break the uncompressed data in the cache in the chunks that are the size
// of the prevalent ORC compression buffer (the default), or maximum allocation (since we
@@ -1224,8 +1178,7 @@ class EncodedReaderImpl implements EncodedReader {
*/
private ProcCacheChunk addOneCompressionBuffer(BufferChunk current,
List<MemoryBuffer> cacheBuffers, List<ProcCacheChunk> toDecompress,
- IdentityHashMap<ByteBuffer, Boolean> toRelease, List<ByteBuffer> toReleaseCopies,
- List<IncompleteCb> badEstimates) throws IOException {
+ List<ByteBuffer> toRelease, List<IncompleteCb> badEstimates) throws IOException {
ByteBuffer slice = null;
ByteBuffer compressed = current.getChunk();
long cbStartOffset = current.getOffset();
@@ -1248,8 +1201,12 @@ class EncodedReaderImpl implements EncodedReader {
// Simple case - CB fits entirely in the disk range.
slice = compressed.slice();
slice.limit(chunkLength);
- return addOneCompressionBlockByteBuffer(slice, isUncompressed,
+ ProcCacheChunk cc = addOneCompressionBlockByteBuffer(slice, isUncompressed,
cbStartOffset, cbEndOffset, chunkLength, current, toDecompress, cacheBuffers);
+ if (compressed.remaining() <= 0 && dataReader.isTrackingDiskRanges()) {
+ toRelease.add(compressed);
+ }
+ return cc;
}
if (current.getEnd() < cbEndOffset && !current.hasContiguousNext()) {
badEstimates.add(addIncompleteCompressionBuffer(cbStartOffset, current, 0));
@@ -1259,7 +1216,6 @@ class EncodedReaderImpl implements EncodedReader {
// TODO: we could remove extra copy for isUncompressed case by copying directly to cache.
// We need to consolidate 2 or more buffers into one to decompress.
ByteBuffer copy = allocateBuffer(chunkLength, compressed.isDirect());
- toReleaseCopies.add(copy); // We will always release copies at the end.
int remaining = chunkLength - compressed.remaining();
int originalPos = compressed.position();
copy.put(compressed);
@@ -1268,8 +1224,12 @@ class EncodedReaderImpl implements EncodedReader {
}
DiskRangeList next = current.next;
current.removeSelf();
- if (originalPos == 0 && toRelease.remove(compressed)) {
- releaseBuffer(compressed, true);
+ if (dataReader.isTrackingDiskRanges()) {
+ if (originalPos == 0) {
+ dataReader.releaseBuffer(compressed); // We copied the entire buffer.
+ } else {
+ toRelease.add(compressed); // There might be slices depending on this buffer.
+ }
}
int extraChunkCount = 0;
@@ -1286,15 +1246,15 @@ class EncodedReaderImpl implements EncodedReader {
copy.put(slice);
ProcCacheChunk cc = addOneCompressionBlockByteBuffer(copy, isUncompressed,
cbStartOffset, cbEndOffset, remaining, (BufferChunk)next, toDecompress, cacheBuffers);
- if (compressed.remaining() <= 0 && toRelease.remove(compressed)) {
- releaseBuffer(compressed, true); // We copied the entire buffer.
- } // else there's more data to process; will be handled in next call.
+ if (compressed.remaining() <= 0 && dataReader.isTrackingDiskRanges()) {
+ dataReader.releaseBuffer(compressed); // We copied the entire buffer.
+ }
return cc;
}
remaining -= compressed.remaining();
- copy.put(compressed); // TODO: move into the if below; account for release call
- if (toRelease.remove(compressed)) {
- releaseBuffer(compressed, true); // We copied the entire buffer.
+ copy.put(compressed);
+ if (dataReader.isTrackingDiskRanges()) {
+ dataReader.releaseBuffer(compressed); // We copied the entire buffer.
}
DiskRangeList tmp = next;
next = next.hasContiguousNext() ? next.next : null;
@@ -1310,38 +1270,6 @@ class EncodedReaderImpl implements EncodedReader {
}
}
- private void releaseBuffers(Collection<ByteBuffer> toRelease, boolean isFromDataReader) {
- if (toRelease == null) return;
- for (ByteBuffer buf : toRelease) {
- releaseBuffer(buf, isFromDataReader);
- }
- }
-
- private void releaseBuffer(ByteBuffer bb, boolean isFromDataReader) {
- if (isTracingEnabled) {
- LOG.trace("Releasing the buffer " + System.identityHashCode(bb));
- }
- if (isFromDataReader && dataReader.isTrackingDiskRanges()) {
- dataReader.releaseBuffer(bb);
- return;
- }
- Field localCf = cleanerField;
- if (!bb.isDirect() || localCf == null) return;
- try {
- Cleaner cleaner = (Cleaner) localCf.get(bb);
- if (cleaner != null) {
- cleaner.clean();
- } else {
- LOG.debug("Unable to clean a buffer using cleaner - no cleaner");
- }
- } catch (Exception e) {
- // leave it for GC to clean up
- LOG.warn("Unable to clean direct buffers using Cleaner.");
- cleanerField = null;
- }
- }
-
-
private IncompleteCb addIncompleteCompressionBuffer(
long cbStartOffset, DiskRangeList target, int extraChunkCount) {
IncompleteCb icb = new IncompleteCb(cbStartOffset, target.getEnd());
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java
index a7bb5ee..26f1e75 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java
@@ -21,7 +21,6 @@ import java.util.Properties;
import java.util.TimeZone;
import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetTableUtils;
-import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils;
import org.apache.parquet.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -140,11 +139,14 @@ public class MapredParquetOutputFormat extends FileOutputFormat<NullWritable, Pa
String timeZoneID =
tableProperties.getProperty(ParquetTableUtils.PARQUET_INT96_WRITE_ZONE_PROPERTY);
if (!Strings.isNullOrEmpty(timeZoneID)) {
-
- NanoTimeUtils.validateTimeZone(timeZoneID);
+ if (!Arrays.asList(TimeZone.getAvailableIDs()).contains(timeZoneID)) {
+ throw new IllegalStateException("Unexpected timezone id found for parquet int96 conversion: " + timeZoneID);
+ }
return TimeZone.getTimeZone(timeZoneID);
}
- return TimeZone.getDefault();
+ // If no timezone is defined in table properties, then adjust timestamps using
+ // PARQUET_INT96_NO_ADJUSTMENT_ZONE timezone
+ return TimeZone.getTimeZone(ParquetTableUtils.PARQUET_INT96_NO_ADJUSTMENT_ZONE);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java
index 2954601..8e33b7d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java
@@ -20,7 +20,6 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
import org.apache.hadoop.hive.ql.io.parquet.read.ParquetFilterPredicateConverter;
import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetTableUtils;
-import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils;
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.serde2.SerDeStats;
@@ -45,6 +44,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.TimeZone;
@@ -170,7 +170,7 @@ public class ParquetRecordReaderBase {
boolean skipConversion = HiveConf.getBoolVar(configuration,
HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION);
FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
- if (!Strings.nullToEmpty(fileMetaData.getCreatedBy()).startsWith("parquet-mr") &&
+ if (!Strings.nullToEmpty(fileMetaData.getCreatedBy()).startsWith("parquet-mr") ||
skipConversion) {
// Impala writes timestamp values using GMT only. We should not try to convert Impala
// files to other type of timezones.
@@ -179,12 +179,16 @@ public class ParquetRecordReaderBase {
// TABLE_PARQUET_INT96_TIMEZONE is a table property used to detect what timezone conversion
// to use when reading Parquet timestamps.
timeZoneID = configuration.get(ParquetTableUtils.PARQUET_INT96_WRITE_ZONE_PROPERTY,
- TimeZone.getDefault().getID());
- NanoTimeUtils.validateTimeZone(timeZoneID);
+ ParquetTableUtils.PARQUET_INT96_NO_ADJUSTMENT_ZONE);
+
+ if (!Arrays.asList(TimeZone.getAvailableIDs()).contains(timeZoneID)) {
+ throw new IllegalStateException("Unexpected timezone id found for parquet int96 conversion: " + timeZoneID);
+ }
}
// 'timeZoneID' should be valid, since we did not throw exception above
- configuration.set(ParquetTableUtils.PARQUET_INT96_WRITE_ZONE_PROPERTY,timeZoneID);
+ configuration.set(ParquetTableUtils.PARQUET_INT96_WRITE_ZONE_PROPERTY,
+ TimeZone.getTimeZone(timeZoneID).getID());
}
public FilterCompat.Filter setFilter(final JobConf conf, MessageType schema) {
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java
index dbd6fb3..5dc8088 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java
@@ -152,26 +152,13 @@ public class NanoTimeUtils {
calendar.setTimeInMillis(utcCalendar.getTimeInMillis());
- Calendar adjusterCalendar = copyToCalendarWithTZ(calendar, getLocalCalendar());
+ Calendar adjusterCalendar = copyToCalendarWithTZ(calendar, Calendar.getInstance());
Timestamp ts = new Timestamp(adjusterCalendar.getTimeInMillis());
ts.setNanos((int) nanos);
return ts;
}
- /**
- * Check if the string id is a valid java TimeZone id.
- * TimeZone#getTimeZone will return "GMT" if the id cannot be understood.
- * @param timeZoneID
- */
- public static void validateTimeZone(String timeZoneID) {
- if (TimeZone.getTimeZone(timeZoneID).getID().equals("GMT")
- && !"GMT".equals(timeZoneID)) {
- throw new IllegalStateException(
- "Unexpected timezone id found for parquet int96 conversion: " + timeZoneID);
- }
- }
-
private static Calendar copyToCalendarWithTZ(Calendar from, Calendar to) {
if(from.getTimeZone().getID().equals(to.getTimeZone().getID())) {
return from;
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
index 312cdac..6ca1963 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
@@ -121,9 +121,8 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
try {
serDeStats = new SerDeStats();
projectionPusher = new ProjectionPusher();
- ParquetInputSplit inputSplit = getSplit(oldInputSplit, conf);
- if (inputSplit != null) {
- initialize(inputSplit, conf);
+ if (oldInputSplit != null) {
+ initialize(getSplit(oldInputSplit, conf), conf);
setTimeZoneConversion(jobConf, ((FileSplit) oldInputSplit).getPath());
}
colsToInclude = ColumnProjectionUtils.getReadColumnIDs(conf);
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
index 5401c7b..00c9645 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.common.StatsSetupConst.StatDB;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.Context;
@@ -57,6 +58,7 @@ import org.apache.hadoop.hive.ql.stats.StatsPublisher;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
@@ -75,6 +77,8 @@ import org.apache.logging.log4j.core.appender.RollingFileAppender;
@SuppressWarnings( { "deprecation"})
public class PartialScanTask extends Task<PartialScanWork> implements
Serializable, HadoopJobExecHook {
+
+
private static final long serialVersionUID = 1L;
protected transient JobConf job;
@@ -270,7 +274,7 @@ public class PartialScanTask extends Task<PartialScanWork> implements
return "RCFile Statistics Partial Scan";
}
- public static final String INPUT_SEPERATOR = ":";
+ public static String INPUT_SEPERATOR = ":";
public static void main(String[] args) {
String inputPathStr = null;
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 01e8a48..d255265 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -21,11 +21,6 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.hive.ql.plan.HiveOperation;
-import org.apache.hadoop.hive.ql.plan.LockDatabaseDesc;
-import org.apache.hadoop.hive.ql.plan.LockTableDesc;
-import org.apache.hadoop.hive.ql.plan.UnlockDatabaseDesc;
-import org.apache.hadoop.hive.ql.plan.UnlockTableDesc;
import org.apache.hive.common.util.ShutdownHookManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,13 +57,6 @@ import java.util.concurrent.atomic.AtomicInteger;
* with a single thread accessing it at a time, with the exception of {@link #heartbeat()} method.
* The later may (usually will) be called from a timer thread.
* See {@link #getMS()} for more important concurrency/metastore access notes.
- *
- * Each statement that the TM (transaction manager) should be aware of should belong to a transaction.
- * Effectively, that means any statement that has side effects. Exceptions are statements like
- * Show Compactions, Show Tables, Use Database foo, etc. The transaction is started either
- * explicitly ( via Start Transaction SQL statement from end user - not fully supported) or
- * implicitly by the {@link org.apache.hadoop.hive.ql.Driver} (which looks exactly as autoCommit=true
- * from end user poit of view). See more at {@link #isExplicitTransaction}.
*/
public final class DbTxnManager extends HiveTxnManagerImpl {
@@ -88,47 +76,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
* to keep apart multiple writes of the same data within the same transaction
* Also see {@link org.apache.hadoop.hive.ql.io.AcidOutputFormat.Options}
*/
- private int writeId = -1;
- /**
- * counts number of statements in the current transaction
- */
- private int numStatements = 0;
- /**
- * if {@code true} it means current transaction is started via START TRANSACTION which means it cannot
- * include any Operations which cannot be rolled back (drop partition; write to non-acid table).
- * If false, it's a single statement transaction which can include any statement. This is not a
- * contradiction from the user point of view who doesn't know anything about the implicit txn
- * and cannot call rollback (the statement of course can fail in which case there is nothing to
- * rollback (assuming the statement is well implemented)).
- *
- * This is done so that all commands run in a transaction which simplifies implementation and
- * allows a simple implementation of multi-statement txns which don't require a lock manager
- * capable of deadlock detection. (todo: not fully implemented; elaborate on how this LM works)
- *
- * Also, critically important, ensuring that everything runs in a transaction assigns an order
- * to all operations in the system - needed for replication/DR.
- *
- * We don't want to allow non-transactional statements in a user demarcated txn because the effect
- * of such statement is "visible" immediately on statement completion, but the user may
- * issue a rollback but the action of the statement can't be undone (and has possibly already been
- * seen by another txn). For example,
- * start transaction
- * insert into transactional_table values(1);
- * insert into non_transactional_table select * from transactional_table;
- * rollback
- *
- * The user would be in for a surprise especially if they are not aware of transactional
- * properties of the tables involved.
- *
- * As a side note: what should the lock manager do with locks for non-transactional resources?
- * Should it it release them at the end of the stmt or txn?
- * Some interesting thoughts: http://mysqlmusings.blogspot.com/2009/02/mixing-engines-in-transactions.html
- */
- private boolean isExplicitTransaction = false;
- /**
- * To ensure transactions don't nest.
- */
- private int startTransactionCount = 0;
+ private int statementId = -1;
// QueryId for the query in current transaction
private String queryId;
@@ -193,22 +141,15 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
@VisibleForTesting
long openTxn(Context ctx, String user, long delay) throws LockException {
- /*Q: why don't we lock the snapshot here??? Instead of having client make an explicit call
- whenever it chooses
- A: If we want to rely on locks for transaction scheduling we must get the snapshot after lock
- acquisition. Relying on locks is a pessimistic strategy which works better under high
- contention.*/
+ //todo: why don't we lock the snapshot here??? Instead of having client make an explicit call
+ //whenever it chooses
init();
- getLockManager();
if(isTxnOpen()) {
throw new LockException("Transaction already opened. " + JavaUtils.txnIdToString(txnId));
}
try {
txnId = getMS().openTxn(user);
- writeId = 0;
- numStatements = 0;
- isExplicitTransaction = false;
- startTransactionCount = 0;
+ statementId = 0;
LOG.debug("Opened " + JavaUtils.txnIdToString(txnId));
ctx.setHeartbeater(startHeartbeat(delay));
return txnId;
@@ -218,8 +159,8 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
}
/**
- * we don't expect multiple threads to call this method concurrently but {@link #lockMgr} will
- * be read by a different threads than one writing it, thus it's {@code volatile}
+ * we don't expect multiple thread to call this method concurrently but {@link #lockMgr} will
+ * be read by a different threads that one writing it, thus it's {@code volatile}
*/
@Override
public HiveLockManager getLockManager() throws LockException {
@@ -238,95 +179,24 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
catch(LockException e) {
if(e.getCause() instanceof TxnAbortedException) {
txnId = 0;
- writeId = -1;
+ statementId = -1;
}
throw e;
}
}
/**
- * Watermark to include in error msgs and logs
- * @param queryPlan
- * @return
- */
- private static String getQueryIdWaterMark(QueryPlan queryPlan) {
- return "queryId=" + queryPlan.getQueryId();
- }
-
- private void markExplicitTransaction(QueryPlan queryPlan) throws LockException {
- isExplicitTransaction = true;
- if(++startTransactionCount > 1) {
- throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_IN_TXN, queryPlan.getOperationName(),
- JavaUtils.txnIdToString(getCurrentTxnId()), queryPlan.getQueryId());
- }
-
- }
- /**
- * Ensures that the current SQL statement is appropriate for the current state of the
- * Transaction Manager (e.g. can call commit unless you called start transaction)
- *
- * Note that support for multi-statement txns is a work-in-progress so it's only supported in
- * HiveConf#HIVE_IN_TEST/HiveConf#TEZ_HIVE_IN_TEST.
- * @param queryPlan
- * @throws LockException
- */
- private void verifyState(QueryPlan queryPlan) throws LockException {
- if(!isTxnOpen()) {
- throw new LockException("No transaction context for operation: " + queryPlan.getOperationName() +
- " for " + getQueryIdWaterMark(queryPlan));
- }
- if(queryPlan.getOperation() == null) {
- throw new IllegalStateException("Unkown HiverOperation for " + getQueryIdWaterMark(queryPlan));
- }
- numStatements++;
- switch (queryPlan.getOperation()) {
- case START_TRANSACTION:
- markExplicitTransaction(queryPlan);
- break;
- case COMMIT:
- case ROLLBACK:
- if(!isTxnOpen()) {
- throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN, queryPlan.getOperationName());
- }
- if(!isExplicitTransaction) {
- throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_IN_IMPLICIT_TXN, queryPlan.getOperationName());
- }
- break;
- default:
- if(!queryPlan.getOperation().isAllowedInTransaction() && isExplicitTransaction) {
- //for example, drop table in an explicit txn is not allowed
- //in some cases this requires looking at more than just the operation
- //for example HiveOperation.LOAD - OK if target is MM table but not OK if non-acid table
- throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_IN_TXN, queryPlan.getOperationName(),
- JavaUtils.txnIdToString(getCurrentTxnId()), queryPlan.getQueryId());
- }
- }
- /*
- Should we allow writing to non-transactional tables in an explicit transaction? The user may
- issue ROLLBACK but these tables won't rollback.
- Can do this by checking ReadEntity/WriteEntity to determine whether it's reading/writing
- any non acid and raise an appropriate error
- * Driver.acidSinks and Driver.acidInQuery can be used if any acid is in the query*/
- }
- /**
- * Normally client should call {@link #acquireLocks(org.apache.hadoop.hive.ql.QueryPlan, org.apache.hadoop.hive.ql.Context, String)}
+ * This is for testing only. Normally client should call {@link #acquireLocks(org.apache.hadoop.hive.ql.QueryPlan, org.apache.hadoop.hive.ql.Context, String)}
* @param isBlocking if false, the method will return immediately; thus the locks may be in LockState.WAITING
* @return null if no locks were needed
*/
- @VisibleForTesting
LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isBlocking) throws LockException {
init();
- // Make sure we've built the lock manager
+ // Make sure we've built the lock manager
getLockManager();
- verifyState(plan);
+
boolean atLeastOneLock = false;
queryId = plan.getQueryId();
- switch (plan.getOperation()) {
- case SET_AUTOCOMMIT:
- /**This is here for documentation purposes. This TM doesn't support this - only has one
- * mode of operation documented at {@link DbTxnManager#isExplicitTransaction}*/
- return null;
- }
LockRequestBuilder rqstBuilder = new LockRequestBuilder(queryId);
//link queryId to txnId
@@ -370,8 +240,8 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
// This is a file or something we don't hold locks for.
continue;
}
- if(t != null) {
- compBuilder.setIsAcid(AcidUtils.isFullAcidTable(t));
+ if(t != null && AcidUtils.isFullAcidTable(t)) {
+ compBuilder.setIsAcid(true);
}
LockComponent comp = compBuilder.build();
LOG.debug("Adding lock component to lock request " + comp.toString());
@@ -392,33 +262,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
}
LockComponentBuilder compBuilder = new LockComponentBuilder();
Table t = null;
- switch (output.getType()) {
- case DATABASE:
- compBuilder.setDbName(output.getDatabase().getName());
- break;
-
- case TABLE:
- case DUMMYPARTITION: // in case of dynamic partitioning lock the table
- t = output.getTable();
- compBuilder.setDbName(t.getDbName());
- compBuilder.setTableName(t.getTableName());
- break;
-
- case PARTITION:
- compBuilder.setPartitionName(output.getPartition().getName());
- t = output.getPartition().getTable();
- compBuilder.setDbName(t.getDbName());
- compBuilder.setTableName(t.getTableName());
- break;
-
- default:
- // This is a file or something we don't hold locks for.
- continue;
- }
switch (output.getWriteType()) {
- /* base this on HiveOperation instead? this and DDL_NO_LOCK is peppered all over the code...
- Seems much cleaner if each stmt is identified as a particular HiveOperation (which I'd think
- makes sense everywhere). This however would be problematic for merge...*/
case DDL_EXCLUSIVE:
case INSERT_OVERWRITE:
compBuilder.setExclusive();
@@ -426,9 +270,10 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
break;
case INSERT:
- assert t != null;
+ t = getTable(output);
if(AcidUtils.isFullAcidTable(t)) {
compBuilder.setShared();
+ compBuilder.setIsAcid(true);
}
else {
if (conf.getBoolVar(HiveConf.ConfVars.HIVE_TXN_STRICT_LOCKING_MODE)) {
@@ -436,6 +281,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
} else { // this is backward compatible for non-ACID resources, w/o ACID semantics
compBuilder.setShared();
}
+ compBuilder.setIsAcid(false);
}
compBuilder.setOperationType(DataOperationType.INSERT);
break;
@@ -447,10 +293,12 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
case UPDATE:
compBuilder.setSemiShared();
compBuilder.setOperationType(DataOperationType.UPDATE);
+ t = getTable(output);
break;
case DELETE:
compBuilder.setSemiShared();
compBuilder.setOperationType(DataOperationType.DELETE);
+ t = getTable(output);
break;
case DDL_NO_LOCK:
@@ -459,11 +307,34 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
default:
throw new RuntimeException("Unknown write type " +
output.getWriteType().toString());
+
}
- if(t != null) {
- compBuilder.setIsAcid(AcidUtils.isFullAcidTable(t));
- }
+ switch (output.getType()) {
+ case DATABASE:
+ compBuilder.setDbName(output.getDatabase().getName());
+ break;
+ case TABLE:
+ case DUMMYPARTITION: // in case of dynamic partitioning lock the table
+ t = output.getTable();
+ compBuilder.setDbName(t.getDbName());
+ compBuilder.setTableName(t.getTableName());
+ break;
+
+ case PARTITION:
+ compBuilder.setPartitionName(output.getPartition().getName());
+ t = output.getPartition().getTable();
+ compBuilder.setDbName(t.getDbName());
+ compBuilder.setTableName(t.getTableName());
+ break;
+
+ default:
+ // This is a file or something we don't hold locks for.
+ continue;
+ }
+ if(t != null && AcidUtils.isFullAcidTable(t)) {
+ compBuilder.setIsAcid(true);
+ }
compBuilder.setIsDynamicPartitionWrite(output.isDynamicPartitionWrite());
LockComponent comp = compBuilder.build();
LOG.debug("Adding lock component to lock request " + comp.toString());
@@ -534,8 +405,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
e);
} finally {
txnId = 0;
- writeId = -1;
- numStatements = 0;
+ statementId = -1;
}
}
@@ -559,8 +429,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
e);
} finally {
txnId = 0;
- writeId = -1;
- numStatements = 0;
+ statementId = -1;
}
}
@@ -687,26 +556,6 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
public boolean supportsExplicitLock() {
return false;
}
- @Override
- public int lockTable(Hive db, LockTableDesc lockTbl) throws HiveException {
- super.lockTable(db, lockTbl);
- throw new UnsupportedOperationException();
- }
- @Override
- public int unlockTable(Hive hiveDB, UnlockTableDesc unlockTbl) throws HiveException {
- super.unlockTable(hiveDB, unlockTbl);
- throw new UnsupportedOperationException();
- }
- @Override
- public int lockDatabase(Hive hiveDB, LockDatabaseDesc lockDb) throws HiveException {
- super.lockDatabase(hiveDB, lockDb);
- throw new UnsupportedOperationException();
- }
- @Override
- public int unlockDatabase(Hive hiveDB, UnlockDatabaseDesc unlockDb) throws HiveException {
- super.unlockDatabase(hiveDB, unlockDb);
- throw new UnsupportedOperationException();
- }
@Override
public boolean useNewShowLocksFormat() {
@@ -717,44 +566,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
public boolean supportsAcid() {
return true;
}
- /**
- * In an explicit txn start_transaction is the 1st statement and we record the snapshot at the
- * start of the txn for Snapshot Isolation. For Read Committed (not supported yet) we'd record
- * it before executing each statement (but after lock acquisition if using lock based concurrency
- * control).
- * For implicit txn, the stmt that triggered/started the txn is the first statement
- */
- @Override
- public boolean recordSnapshot(QueryPlan queryPlan) {
- assert isTxnOpen();
- assert numStatements > 0 : "was acquireLocks() called already?";
- if(queryPlan.getOperation() == HiveOperation.START_TRANSACTION) {
- //here if start of explicit txn
- assert isExplicitTransaction;
- assert numStatements == 1;
- return true;
- }
- else if(!isExplicitTransaction) {
- assert numStatements == 1 : "numStatements=" + numStatements + " in implicit txn";
- if (queryPlan.hasAcidResourcesInQuery()) {
- //1st and only stmt in implicit txn and uses acid resource
- return true;
- }
- }
- return false;
- }
- @Override
- public boolean isImplicitTransactionOpen() {
- if(!isTxnOpen()) {
- //some commands like "show databases" don't start implicit transactions
- return false;
- }
- if(!isExplicitTransaction) {
- assert numStatements == 1 : "numStatements=" + numStatements;
- return true;
- }
- return false;
- }
+
@Override
protected void destruct() {
try {
@@ -814,7 +626,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
@Override
public int getWriteIdAndIncrement() {
assert isTxnOpen();
- return writeId++;
+ return statementId++;
}
private static long getHeartbeatInterval(Configuration conf) throws LockException {
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
index 24df25b..53ee9c8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
@@ -301,8 +301,7 @@ class DummyTxnManager extends HiveTxnManagerImpl {
new HiveLockObject.HiveLockObjectData(plan.getQueryId(),
String.valueOf(System.currentTimeMillis()),
"IMPLICIT",
- plan.getQueryStr(),
- conf);
+ plan.getQueryStr());
if (db != null) {
locks.add(new HiveLockObj(new HiveLockObject(db.getName(), lockData),
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java
index a514339..fff03df 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java
@@ -23,7 +23,6 @@ import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.hive.common.StringInternUtils;
-import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.ql.metadata.DummyPartition;
import org.apache.hadoop.hive.ql.metadata.Hive;
@@ -49,23 +48,16 @@ public class HiveLockObject {
* Note: The parameters are used to uniquely identify a HiveLockObject.
* The parameters will be stripped off any ':' characters in order not
* to interfere with the way the data is serialized (':' delimited string).
- * The query string might be truncated depending on HIVE_LOCK_QUERY_STRING_MAX_LENGTH
- * @param queryId The query identifier will be added to the object without change
- * @param lockTime The lock time will be added to the object without change
- * @param lockMode The lock mode will be added to the object without change
- * @param queryStr The query string might be truncated based on
- * HIVE_LOCK_QUERY_STRING_MAX_LENGTH conf variable
- * @param conf The hive configuration based on which we decide if we should truncate the query
- * string or not
*/
- public HiveLockObjectData(String queryId, String lockTime, String lockMode, String queryStr,
- HiveConf conf) {
+ public HiveLockObjectData(String queryId,
+ String lockTime,
+ String lockMode,
+ String queryStr) {
this.queryId = removeDelimiter(queryId);
this.lockTime = StringInternUtils.internIfNotNull(removeDelimiter(lockTime));
this.lockMode = removeDelimiter(lockMode);
this.queryStr = StringInternUtils.internIfNotNull(
- queryStr == null ? null : StringUtils.substring(removeDelimiter(queryStr.trim()), 0,
- conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_QUERY_STRING_MAX_LENGTH)));
+ removeDelimiter(queryStr == null ? null : queryStr.trim()));
}
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
index b24351c..187a658 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
@@ -72,7 +72,7 @@ public interface HiveTxnManager {
/**
* Acquire all of the locks needed by a query. If used with a query that
- * requires transactions, this should be called after {@link #openTxn(Context, String)}.
+ * requires transactions, this should be called after {@link #openTxn(String)}.
* A list of acquired locks will be stored in the
* {@link org.apache.hadoop.hive.ql.Context} object and can be retrieved
* via {@link org.apache.hadoop.hive.ql.Context#getHiveLocks}.
@@ -208,13 +208,17 @@ public interface HiveTxnManager {
boolean supportsAcid();
/**
- * For resources that support MVCC, the state of the DB must be recorded for the duration of the
- * operation/transaction. Returns {@code true} if current statment needs to do this.
+ * This behaves exactly as
+ * https://docs.oracle.com/javase/6/docs/api/java/sql/Connection.html#setAutoCommit(boolean)
+ */
+ void setAutoCommit(boolean autoCommit) throws LockException;
+
+ /**
+ * This behaves exactly as
+ * https://docs.oracle.com/javase/6/docs/api/java/sql/Connection.html#getAutoCommit()
*/
- boolean recordSnapshot(QueryPlan queryPlan);
+ boolean getAutoCommit();
- boolean isImplicitTransactionOpen();
-
boolean isTxnOpen();
/**
* if {@code isTxnOpen()}, returns the currently active transaction ID
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java
index 8dbbf87..a371a5a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hive.ql.plan.UnlockTableDesc;
abstract class HiveTxnManagerImpl implements HiveTxnManager {
protected HiveConf conf;
+ private boolean isAutoCommit = true;//true by default; matches JDBC spec
void setHiveConf(HiveConf c) {
conf = c;
@@ -67,6 +68,16 @@ abstract class HiveTxnManagerImpl implements HiveTxnManager {
destruct();
}
@Override
+ public void setAutoCommit(boolean autoCommit) throws LockException {
+ isAutoCommit = autoCommit;
+ }
+
+ @Override
+ public boolean getAutoCommit() {
+ return isAutoCommit;
+ }
+
+ @Override
public int lockTable(Hive db, LockTableDesc lockTbl) throws HiveException {
HiveLockManager lockMgr = getAndCheckLockManager();
@@ -82,8 +93,7 @@ abstract class HiveTxnManagerImpl implements HiveTxnManager {
new HiveLockObjectData(lockTbl.getQueryId(),
String.valueOf(System.currentTimeMillis()),
"EXPLICIT",
- lockTbl.getQueryStr(),
- conf);
+ lockTbl.getQueryStr());
if (partSpec == null) {
HiveLock lck = lockMgr.lock(new HiveLockObject(tbl, lockData), mode, true);
@@ -141,7 +151,7 @@ abstract class HiveTxnManagerImpl implements HiveTxnManager {
HiveLockObjectData lockData =
new HiveLockObjectData(lockDb.getQueryId(),
String.valueOf(System.currentTimeMillis()),
- "EXPLICIT", lockDb.getQueryStr(), conf);
+ "EXPLICIT", lockDb.getQueryStr());
HiveLock lck = lockMgr.lock(new HiveLockObject(dbObj.getName(), lockData), mode, true);
if (lck == null) {
@@ -192,13 +202,4 @@ abstract class HiveTxnManagerImpl implements HiveTxnManager {
return lockMgr;
}
- @Override
- public boolean recordSnapshot(QueryPlan queryPlan) {
- return false;
- }
- @Override
- public boolean isImplicitTransactionOpen() {
- return true;
- }
-
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
index 9b46ae7..c2a4806 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
@@ -285,10 +285,8 @@ public class ZooKeeperHiveLockManager implements HiveLockManager {
int tryNum = 0;
ZooKeeperHiveLock ret = null;
Set<String> conflictingLocks = new HashSet<String>();
- Exception lastException = null;
do {
- lastException = null;
tryNum++;
try {
if (tryNum > 1) {
@@ -300,22 +298,26 @@ public class ZooKeeperHiveLockManager implements HiveLockManager {
break;
}
} catch (Exception e1) {
- lastException = e1;
if (e1 instanceof KeeperException) {
KeeperException e = (KeeperException) e1;
switch (e.code()) {
case CONNECTIONLOSS:
case OPERATIONTIMEOUT:
- case NONODE:
- case NODEEXISTS:
LOG.debug("Possibly transient ZooKeeper exception: ", e);
- break;
+ continue;
default:
LOG.error("Serious Zookeeper exception: ", e);
break;
}
- } else {
- LOG.error("Other unexpected exception: ", e1);
+ }
+ if (tryNum >= numRetriesForLock) {
+ console.printError("Unable to acquire " + key.getData().getLockMode()
+ + ", " + mode + " lock " + key.getDisplayName() + " after "
+ + tryNum + " attempts.");
+ LOG.error("Exceeds maximum retries with errors: ", e1);
+ printConflictingLocks(key,mode,conflictingLocks);
+ conflictingLocks.clear();
+ throw new LockException(e1);
}
}
} while (tryNum < numRetriesForLock);
@@ -325,11 +327,8 @@ public class ZooKeeperHiveLockManager implements HiveLockManager {
+ ", " + mode + " lock " + key.getDisplayName() + " after "
+ tryNum + " attempts.");
printConflictingLocks(key,mode,conflictingLocks);
- if (lastException != null) {
- LOG.error("Exceeds maximum retries with errors: ", lastException);
- throw new LockException(lastException);
- }
}
+ conflictingLocks.clear();
return ret;
}
@@ -351,19 +350,6 @@ public class ZooKeeperHiveLockManager implements HiveLockManager {
}
}
- /**
- * Creates a primitive lock object on ZooKeeper.
- * @param key The lock data
- * @param mode The lock mode (HiveLockMode - EXCLUSIVE/SHARED/SEMI_SHARED)
- * @param keepAlive If true creating PERSISTENT ZooKeeper locks, otherwise EPHEMERAL ZooKeeper
- * locks
- * @param parentCreated If we expect, that the parent is already created then true, otherwise
- * we will try to create the parents as well
- * @param conflictingLocks The set where we should collect the conflicting locks when
- * the logging level is set to DEBUG
- * @return The created ZooKeeperHiveLock object, null if there was a conflicting lock
- * @throws Exception If there was an unexpected Exception
- */
private ZooKeeperHiveLock lockPrimitive(HiveLockObject key,
HiveLockMode mode, boolean keepAlive, boolean parentCreated,
Set<String> conflictingLocks)
@@ -404,7 +390,7 @@ public class ZooKeeperHiveLockManager implements HiveLockManager {
int seqNo = getSequenceNumber(res, getLockName(lastName, mode));
if (seqNo == -1) {
curatorFramework.delete().forPath(res);
- throw new LockException("The created node does not contain a sequence number: " + res);
+ return null;
}
List<String> children = curatorFramework.getChildren().forPath(lastName);
@@ -598,6 +584,7 @@ public class ZooKeeperHiveLockManager implements HiveLockManager {
/**
* @param conf Hive configuration
+ * @param zkpClient The ZooKeeper client
* @param key The object to be compared against - if key is null, then get all locks
**/
private static List<HiveLock> getLocks(HiveConf conf,
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java b/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java
deleted file mode 100644
index 64ce100..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java
+++ /dev/null
@@ -1,249 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.log;
-
-import java.util.regex.Pattern;
-
-import org.apache.hadoop.hive.common.LogUtils;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.Driver;
-import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.log.PerfLogger;
-import org.apache.hadoop.hive.ql.session.OperationLog;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.core.LogEvent;
-import org.apache.logging.log4j.core.LoggerContext;
-import org.apache.logging.log4j.core.appender.RandomAccessFileAppender;
-import org.apache.logging.log4j.core.appender.routing.Route;
-import org.apache.logging.log4j.core.appender.routing.Routes;
-import org.apache.logging.log4j.core.appender.routing.RoutingAppender;
-import org.apache.logging.log4j.core.config.Configuration;
-import org.apache.logging.log4j.core.config.LoggerConfig;
-import org.apache.logging.log4j.core.config.Node;
-import org.apache.logging.log4j.core.config.plugins.Plugin;
-import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
-import org.apache.logging.log4j.core.config.plugins.PluginFactory;
-import org.apache.logging.log4j.core.config.plugins.processor.PluginEntry;
-import org.apache.logging.log4j.core.config.plugins.util.PluginType;
-import org.apache.logging.log4j.core.filter.AbstractFilter;
-import org.apache.logging.log4j.core.layout.PatternLayout;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-
-/**
- * Divert appender to redirect operation logs to separate files.
- */
-public class LogDivertAppender {
- private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(LogDivertAppender.class.getName());
- public static final String verboseLayout = "%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n";
- public static final String nonVerboseLayout = "%-5p : %m%n";
-
- /**
- * A log filter that filters messages coming from the logger with the given names.
- * It be used as a white list filter or a black list filter.
- * We apply black list filter on the Loggers used by the log diversion stuff, so that
- * they don't generate more logs for themselves when they process logs.
- * White list filter is used for less verbose log collection
- */
- @Plugin(name = "NameFilter", category = "Core", elementType="filter", printObject = true)
- private static class NameFilter extends AbstractFilter {
- private Pattern namePattern;
- private OperationLog.LoggingLevel loggingMode;
-
- /* Patterns that are excluded in verbose logging level.
- * Filter out messages coming from log processing classes, or we'll run an infinite loop.
- */
- private static final Pattern verboseExcludeNamePattern = Pattern.compile(Joiner.on("|").
- join(new String[]{LOG.getName(), OperationLog.class.getName()}));
-
- /* Patterns that are included in execution logging level.
- * In execution mode, show only select logger messages.
- */
- private static final Pattern executionIncludeNamePattern = Pattern.compile(Joiner.on("|").
- join(new String[]{"org.apache.hadoop.mapreduce.JobSubmitter",
- "org.apache.hadoop.mapreduce.Job", "SessionState", Task.class.getName(),
- Driver.class.getName(), "org.apache.hadoop.hive.ql.exec.spark.status.SparkJobMonitor"}));
-
- /* Patterns that are included in performance logging level.
- * In performance mode, show execution and performance logger messages.
- */
- private static final Pattern performanceIncludeNamePattern = Pattern.compile(
- executionIncludeNamePattern.pattern() + "|" + PerfLogger.class.getName());
-
- private void setCurrentNamePattern(OperationLog.LoggingLevel mode) {
- if (mode == OperationLog.LoggingLevel.VERBOSE) {
- this.namePattern = verboseExcludeNamePattern;
- } else if (mode == OperationLog.LoggingLevel.EXECUTION) {
- this.namePattern = executionIncludeNamePattern;
- } else if (mode == OperationLog.LoggingLevel.PERFORMANCE) {
- this.namePattern = performanceIncludeNamePattern;
- }
- }
-
- public NameFilter(OperationLog.LoggingLevel loggingMode) {
- this.loggingMode = loggingMode;
- setCurrentNamePattern(loggingMode);
- }
-
- @Override
- public Result filter(LogEvent event) {
- boolean excludeMatches = (loggingMode == OperationLog.LoggingLevel.VERBOSE);
-
- String logLevel = event.getContextMap().get(LogUtils.OPERATIONLOG_LEVEL_KEY);
- logLevel = logLevel == null ? "" : logLevel;
- OperationLog.LoggingLevel currentLoggingMode = OperationLog.getLoggingLevel(logLevel);
- // If logging is disabled, deny everything.
- if (currentLoggingMode == OperationLog.LoggingLevel.NONE) {
- return Result.DENY;
- }
- // Look at the current session's setting
- // and set the pattern and excludeMatches accordingly.
- if (currentLoggingMode != loggingMode) {
- loggingMode = currentLoggingMode;
- excludeMatches = (loggingMode == OperationLog.LoggingLevel.VERBOSE);
- setCurrentNamePattern(loggingMode);
- }
-
- boolean isMatch = namePattern.matcher(event.getLoggerName()).matches();
-
- if (excludeMatches == isMatch) {
- // Deny if this is black-list filter (excludeMatches = true) and it
- // matched or if this is whitelist filter and it didn't match
- return Result.DENY;
- }
-
- return Result.NEUTRAL;
- }
-
- @PluginFactory
- public static NameFilter createFilter(
- @PluginAttribute("loggingLevel") final String loggingLevel) {
- // Name required for routing. Error out if it is not set.
- Preconditions.checkNotNull(loggingLevel,
- "loggingLevel must be specified for " + NameFilter.class.getName());
-
- return new NameFilter(OperationLog.getLoggingLevel(loggingLevel));
- }
- }
-
- /**
- * Programmatically register a routing appender to Log4J configuration, which
- * automatically writes the log of each query to an individual file.
- * The equivilent property configuration is as follows:
- * # queryId based routing file appender
- appender.query-routing.type = Routing
- appender.query-routing.name = query-routing
- appender.query-routing.routes.type = Routes
- appender.query-routing.routes.pattern = $${ctx:queryId}
- # default route
- appender.query-routing.routes.route-default.type = Route
- appender.query-routing.routes.route-default.key = $${ctx:queryId}
- appender.query-routing.routes.route-default.app.type = null
- appender.query-routing.routes.route-default.app.name = Null
- # queryId based route
- appender.query-routing.routes.route-mdc.type = Route
- appender.query-routing.routes.route-mdc.name = IrrelevantName-query-routing
- appender.query-routing.routes.route-mdc.app.type = RandomAccessFile
- appender.query-routing.routes.route-mdc.app.name = query-file-appender
- appender.query-routing.routes.route-mdc.app.fileName = ${sys:hive.log.dir}/${ctx:sessionId}/${ctx:queryId}
- appender.query-routing.routes.route-mdc.app.layout.type = PatternLayout
- appender.query-routing.routes.route-mdc.app.layout.pattern = %d{ISO8601} %5p %c{2}: %m%n
- * @param conf the configuration for HiveServer2 instance
- */
- public static void registerRoutingAppender(org.apache.hadoop.conf.Configuration conf) {
- String loggingLevel = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LEVEL);
- OperationLog.LoggingLevel loggingMode = OperationLog.getLoggingLevel(loggingLevel);
- String layout = loggingMode == OperationLog.LoggingLevel.VERBOSE ? verboseLayout : nonVerboseLayout;
- String logLocation = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION);
-
- // Create NullAppender
- PluginEntry nullEntry = new PluginEntry();
- nullEntry.setClassName(NullAppender.class.getName());
- nullEntry.setKey("null");
- nullEntry.setName("appender");
- PluginType<NullAppender> nullChildType = new PluginType<NullAppender>(nullEntry, NullAppender.class, "appender");
- Node nullChildNode = new Node(null, "Null", nullChildType);
-
- // Create default route
- PluginEntry defaultEntry = new PluginEntry();
- defaultEntry.setClassName(Route.class.getName());
- defaultEntry.setKey("route");
- defaultEntry.setName("Route");
- PluginType<Route> defaultType = new PluginType<Route>(defaultEntry, Route.class, "Route");
- Node nullNode = new Node(null, "Route", defaultType);
- nullNode.getChildren().add(nullChildNode);
- Route defaultRoute = Route.createRoute(null, "${ctx:queryId}", nullNode);
-
- // Create queryId based route
- PluginEntry entry = new PluginEntry();
- entry.setClassName(Route.class.getName());
- entry.setKey("route");
- entry.setName("Route");
- PluginType<Route> type = new PluginType<Route>(entry, Route.class, "Route");
- Node node = new Node(null, "Route", type);
-
- PluginEntry childEntry = new PluginEntry();
- childEntry.setClassName(RandomAccessFileAppender.class.getName());
- childEntry.setKey("randomaccessfile");
- childEntry.setName("appender");
- PluginType<RandomAccessFileAppender> childType = new PluginType<RandomAccessFileAppender>(childEntry, RandomAccessFileAppender.class, "appender");
- Node childNode = new Node(node, "RandomAccessFile", childType);
- childNode.getAttributes().put("name", "query-file-appender");
- childNode.getAttributes().put("fileName", logLocation + "/${ctx:sessionId}/${ctx:queryId}");
- node.getChildren().add(childNode);
-
- PluginEntry filterEntry = new PluginEntry();
- filterEntry.setClassName(NameFilter.class.getName());
- filterEntry.setKey("namefilter");
- filterEntry.setName("namefilter");
- PluginType<NameFilter> filterType = new PluginType<NameFilter>(filterEntry, NameFilter.class, "filter");
- Node filterNode = new Node(childNode, "NameFilter", filterType);
- filterNode.getAttributes().put("loggingLevel", loggingMode.name());
- childNode.getChildren().add(filterNode);
-
- PluginEntry layoutEntry = new PluginEntry();
- layoutEntry.setClassName(PatternLayout.class.getName());
- layoutEntry.setKey("patternlayout");
- layoutEntry.setName("layout");
- PluginType<PatternLayout> layoutType = new PluginType<PatternLayout>(layoutEntry, PatternLayout.class, "layout");
- Node layoutNode = new Node(childNode, "PatternLayout", layoutType);
- layoutNode.getAttributes().put("pattern", layout);
- childNode.getChildren().add(layoutNode);
-
- Route mdcRoute = Route.createRoute(null, null, node);
- Routes routes = Routes.createRoutes("${ctx:queryId}", defaultRoute, mdcRoute);
-
- LoggerContext context = (LoggerContext) LogManager.getContext(false);
- Configuration configuration = context.getConfiguration();
-
- RoutingAppender routingAppender = RoutingAppender.createAppender("query-routing",
- "true",
- routes,
- configuration,
- null,
- null,
- null);
-
- LoggerConfig loggerConfig = configuration.getRootLogger();
- loggerConfig.addAppender(routingAppender, null, null);
- context.updateLoggers();
- routingAppender.start();
- }
-}