You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/12/17 02:59:10 UTC
[2/4] hive git commit: HIVE-15147 : LLAP: use LLAP cache for
non-columnar formats in a somewhat general way (Sergey Shelukhin)
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
new file mode 100644
index 0000000..a70545e
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
@@ -0,0 +1,1248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.encoded;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.common.Pool.PoolObjectHelper;
+import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
+import org.apache.hadoop.hive.common.io.DiskRangeList;
+import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory;
+import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.ConsumerFeedback;
+import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
+import org.apache.hadoop.hive.llap.cache.LlapDataBuffer;
+import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
+import org.apache.hadoop.hive.llap.cache.SerDeLowLevelCacheImpl;
+import org.apache.hadoop.hive.llap.cache.SerDeLowLevelCacheImpl.FileData;
+import org.apache.hadoop.hive.llap.cache.SerDeLowLevelCacheImpl.StripeData;
+import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
+import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
+import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
+import org.apache.hadoop.hive.llap.io.decode.GenericColumnVectorProducer.TextStripeMetadata;
+import org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer;
+import org.apache.hadoop.hive.ql.io.HdfsUtils;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile.WriterOptions;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.WriterImpl;
+import org.apache.hadoop.hive.ql.io.orc.encoded.CacheChunk;
+import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.LineRecordReader;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SplitLocationInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.common.util.FixedSizedObjectPool;
+import org.apache.hive.common.util.Ref;
+import org.apache.orc.CompressionCodec;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcUtils;
+import org.apache.orc.OrcFile.EncodingStrategy;
+import org.apache.orc.OrcFile.Version;
+import org.apache.orc.OrcProto;
+import org.apache.orc.OrcProto.ColumnEncoding;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.OutStream;
+import org.apache.orc.impl.OutStream.OutputReceiver;
+import org.apache.orc.impl.PhysicalWriter;
+import org.apache.orc.impl.StreamName;
+import org.apache.tez.common.CallableWithNdc;
+import org.apache.tez.common.counters.TezCounters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+public class SerDeEncodedDataReader extends CallableWithNdc<Void>
+ implements ConsumerFeedback<OrcEncodedColumnBatch> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SerDeEncodedDataReader.class);
+ public static final FixedSizedObjectPool<ColumnStreamData> CSD_POOL =
+ new FixedSizedObjectPool<>(8192, new PoolObjectHelper<ColumnStreamData>() {
+ @Override
+ public ColumnStreamData create() {
+ return new ColumnStreamData();
+ }
+ @Override
+ public void resetBeforeOffer(ColumnStreamData t) {
+ t.reset();
+ }
+ });
+ public static final FixedSizedObjectPool<OrcEncodedColumnBatch> ECB_POOL =
+ new FixedSizedObjectPool<>(1024, new PoolObjectHelper<OrcEncodedColumnBatch>() {
+ @Override
+ public OrcEncodedColumnBatch create() {
+ return new OrcEncodedColumnBatch();
+ }
+ @Override
+ public void resetBeforeOffer(OrcEncodedColumnBatch t) {
+ t.reset();
+ }
+ });
+ public static final FixedSizedObjectPool<CacheChunk> TCC_POOL =
+ new FixedSizedObjectPool<>(1024, new PoolObjectHelper<CacheChunk>() {
+ @Override
+ public CacheChunk create() {
+ return new CacheChunk();
+ }
+ @Override
+ public void resetBeforeOffer(CacheChunk t) {
+ t.reset();
+ }
+ });
+ private final static DiskRangeListFactory CC_FACTORY = new DiskRangeListFactory() {
+ @Override
+ public DiskRangeList createCacheChunk(MemoryBuffer buffer, long offset, long end) {
+ CacheChunk tcc = TCC_POOL.take();
+ tcc.init(buffer, offset, end);
+ return tcc;
+ }
+ };
+
+
+ private final SerDeLowLevelCacheImpl cache;
+ private final BufferUsageManager bufferManager;
+ private final Configuration conf;
+ private final FileSplit split;
+ private List<Integer> columnIds;
+ private final OrcEncodedDataConsumer consumer;
+ private final QueryFragmentCounters counters;
+ private final UserGroupInformation ugi;
+
+ private final Object fileKey;
+ private final FileSystem fs;
+
+ private volatile boolean isStopped = false;
+ private final Deserializer sourceSerDe;
+ private final InputFormat<?, ?> sourceInputFormat;
+ private final Reporter reporter;
+ private final JobConf jobConf;
+ private final int allocSize;
+ private final int targetSliceRowCount;
+
+ private final boolean[] writerIncludes;
+ private WriterImpl orcWriter = null;
+ private CacheWriter cacheWriter = null;
+ /**
+ * Data from cache currently being processed. We store it here so that we could decref
+ * it in case of failures. We remove each slice from the data after it has been sent to
+ * the consumer, at which point the consumer is responsible for it.
+ */
+ private FileData cachedData;
+
+ public SerDeEncodedDataReader(SerDeLowLevelCacheImpl cache,
+ BufferUsageManager bufferManager, Configuration daemonConf, FileSplit split,
+ List<Integer> columnIds, OrcEncodedDataConsumer consumer,
+ JobConf jobConf, Reporter reporter, InputFormat<?, ?> sourceInputFormat,
+ Deserializer sourceSerDe, QueryFragmentCounters counters, TypeDescription schema)
+ throws IOException {
+ this.cache = cache;
+ this.bufferManager = bufferManager;
+ this.conf = daemonConf;
+ this.split = split;
+ this.columnIds = columnIds;
+ this.allocSize = determineAllocSize(bufferManager, daemonConf);
+ boolean isInTest = HiveConf.getBoolVar(daemonConf, ConfVars.HIVE_IN_TEST);
+ this.targetSliceRowCount = HiveConf.getIntVar(
+ isInTest ? jobConf : daemonConf, ConfVars.LLAP_IO_ENCODE_SLICE_ROW_COUNT);
+ LOG.info("TODO# targetSliceRowCount = " + targetSliceRowCount);
+ if (this.columnIds != null) {
+ Collections.sort(this.columnIds);
+ }
+ this.consumer = consumer;
+ this.counters = counters;
+ try {
+ this.ugi = UserGroupInformation.getCurrentUser();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ fs = split.getPath().getFileSystem(daemonConf);
+ fileKey = determineFileId(fs, split,
+ HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID));
+ this.sourceInputFormat = sourceInputFormat;
+ this.sourceSerDe = sourceSerDe;
+ this.reporter = reporter;
+ this.jobConf = jobConf;
+ this.writerIncludes = OrcInputFormat.genIncludedColumns(schema, columnIds);
+ }
+
+ private static int determineAllocSize(BufferUsageManager bufferManager, Configuration conf) {
+ long allocSize = HiveConf.getSizeVar(conf, ConfVars.LLAP_IO_ENCODE_ALLOC_SIZE);
+ int maxAllocSize = bufferManager.getAllocator().getMaxAllocation();
+ if (allocSize > maxAllocSize) {
+ LOG.error("Encode allocation size " + allocSize + " is being capped to the maximum "
+ + "allocation size " + bufferManager.getAllocator().getMaxAllocation());
+ allocSize = maxAllocSize;
+ }
+ return (int)allocSize;
+ }
+
+ @Override
+ public void stop() {
+ LOG.debug("Encoded reader is being stopped");
+ isStopped = true;
+ }
+
+ @Override
+ public void pause() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void unpause() {
+ throw new UnsupportedOperationException();
+ }
+
+ // TODO: move to base class?
+ @Override
+ protected Void callInternal() throws IOException, InterruptedException {
+ return ugi.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ return performDataRead();
+ }
+ });
+ }
+
+ private static final class LineRrOffsetReader extends PassThruOffsetReader {
+ private static final Method isCompressedMethod;
+ private final LineRecordReader lrReader;
+ private final LongWritable posKey;
+
+ static {
+ Method isCompressedMethodTmp;
+ try {
+ isCompressedMethodTmp = LineRecordReader.class.getDeclaredMethod("isCompressedInput");
+ isCompressedMethodTmp.setAccessible(true);
+ } catch (Throwable t) {
+ isCompressedMethodTmp = null;
+ LOG.info("TODO# cannot get LineRecordReader isCompressedInput method", t);
+ }
+ isCompressedMethod = isCompressedMethodTmp;
+ }
+
+ static ReaderWithOffsets create(LineRecordReader sourceReader) {
+ if (isCompressedMethod == null) return null;
+ Boolean isCompressed = null;
+ try {
+ isCompressed = (Boolean)isCompressedMethod.invoke(sourceReader);
+ } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
+ LOG.info("TODO# cannot check the reader for compression", e);
+ return new PassThruOffsetReader(sourceReader);
+ }
+ if (isCompressed) return null; // Cannot slice compressed files.
+ return new LineRrOffsetReader(sourceReader);
+ }
+
+ private LineRrOffsetReader(LineRecordReader sourceReader) {
+ super(sourceReader);
+ this.lrReader = sourceReader;
+ this.posKey = (LongWritable)key;
+ }
+
+ @Override
+ public long getCurrentRowStartOffset() {
+ return posKey.get();
+ }
+
+ @Override
+ public long getCurrentFileOffset() {
+ try {
+ return lrReader.getPos();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+
+ @SuppressWarnings("rawtypes")
+ private static class PassThruOffsetReader implements ReaderWithOffsets {
+ protected final RecordReader sourceReader;
+ protected final Object key;
+ protected final Writable value;
+
+ private PassThruOffsetReader(RecordReader sourceReader) {
+ this.sourceReader = sourceReader;
+ key = sourceReader.createKey();
+ value = (Writable)sourceReader.createValue();
+ }
+
+ @Override
+ public boolean next() throws IOException {
+ return sourceReader.next(key, value);
+ }
+
+ @Override
+ public Writable getCurrentValue() {
+ return value;
+ }
+
+ @Override
+ public void close() throws IOException {
+ sourceReader.close();
+ }
+
+ @Override
+ public long getCurrentRowStartOffset() {
+ return -1;
+ }
+
+ @Override
+ public long getCurrentFileOffset() {
+ return -1;
+ }
+ }
+
+ public static class CacheOutStream extends OutStream {
+ private final CacheOutputReceiver receiver;
+ public CacheOutStream(String name, int bufferSize, CompressionCodec codec,
+ CacheOutputReceiver receiver) throws IOException {
+ super(name, bufferSize, codec, receiver);
+ this.receiver = receiver;
+ }
+
+ @Override
+ public void clear() throws IOException {
+ super.clear();
+ receiver.clear();
+ }
+ }
+
+ private interface ReaderWithOffsets {
+ boolean next() throws IOException;
+ Writable getCurrentValue();
+ long getCurrentRowStartOffset();
+ void close() throws IOException;
+ long getCurrentFileOffset();
+ }
+
+ public static class CacheWriter implements PhysicalWriter {
+ // Struct.
+ private static class CacheStreamData {
+ private final List<MemoryBuffer> data;
+ private final boolean isSuppressed;
+ private final StreamName name;
+ public CacheStreamData(boolean isSuppressed, StreamName name, List<MemoryBuffer> data) {
+ this.isSuppressed = isSuppressed;
+ this.name = name;
+ this.data = data;
+ }
+ @Override
+ public String toString() {
+ return "CacheStreamData [name=" + name + ", isSuppressed="
+ + isSuppressed + ", data=" + toString(data) + "]";
+ }
+ private static String toString(List<MemoryBuffer> data) {
+ String s = "";
+ for (MemoryBuffer buffer : data) {
+ s += LlapDataBuffer.toDataString(buffer) + ", ";
+ }
+ return s;
+ }
+ }
+
+ private static class CacheStripeData {
+ private List<ColumnEncoding> encodings;
+ private long rowCount = -1;
+ private long knownTornStart, firstRowStart, lastRowStart, lastRowEnd;
+ private Map<Integer, List<CacheStreamData>> colStreams = new HashMap<>();
+ @Override
+ public String toString() {
+ return ("{disk data knownTornStart=" + knownTornStart
+ + ", firstRowStart=" + firstRowStart + ", lastRowStart="
+ + lastRowStart + ", lastRowEnd=" + lastRowEnd + ", rowCount=" + rowCount
+ + ", encodings=" + encodings + ", streams=" + colStreams + "}").replace('\n', ' ');
+ }
+
+ public String toCoordinateString() {
+ return "knownTornStart=" + knownTornStart + ", firstRowStart=" + firstRowStart
+ + ", lastRowStart=" + lastRowStart + ", lastRowEnd=" + lastRowEnd;
+ }
+ }
+
+ private CacheStripeData currentStripe;
+ private final List<CacheStripeData> stripes = new ArrayList<>();
+ private final BufferUsageManager bufferManager;
+ private final int bufferSize;
+ private final List<Integer> columnIds;
+ private final boolean[] writerIncludes;
+ // These are global since ORC reuses objects between stripes.
+ private final Map<StreamName, OutStream> streams = new HashMap<>();
+ private final Map<Integer, List<CacheOutStream>> colStreams = new HashMap<>();
+
+ public CacheWriter(BufferUsageManager bufferManager, int bufferSize, List<Integer> columnIds,
+ boolean[] writerIncludes) {
+ this.bufferManager = bufferManager;
+ this.bufferSize = bufferSize;
+ this.columnIds = columnIds;
+ this.writerIncludes = writerIncludes;
+ startStripe();
+ }
+
+ private void startStripe() {
+ if (currentStripe != null) {
+ stripes.add(currentStripe);
+ }
+ currentStripe = new CacheStripeData();
+ }
+
+ @Override
+ public void initialize() throws IOException {
+ }
+
+ @Override
+ public void writeFileMetadata(OrcProto.Metadata.Builder builder) throws IOException {
+ OrcProto.Metadata metadata = builder.build();
+ // LOG.info("TODO# Processing file metadata " + metadata);
+ }
+
+ @Override
+ public void writeFileFooter(OrcProto.Footer.Builder builder) throws IOException {
+ OrcProto.Footer footer = builder.build();
+ // LOG.info("TODO# Processing file footer " + footer);
+ validateIncludes(footer);
+ }
+
+ public void validateIncludes(OrcProto.Footer footer) throws IOException {
+ boolean[] translatedIncludes = columnIds == null ? null : OrcInputFormat.genIncludedColumns(
+ OrcUtils.convertTypeFromProtobuf(footer.getTypesList(), 0), columnIds);
+ if (translatedIncludes == null && writerIncludes == null) return;
+ if (translatedIncludes == null || writerIncludes == null) {
+ throwIncludesMismatchError(translatedIncludes);
+ }
+ int len = Math.min(translatedIncludes.length, writerIncludes.length);
+ for (int i = 0; i < len; ++i) {
+ // Translated includes may be a superset of writer includes due to cache.
+ if (!translatedIncludes[i] && writerIncludes[i]) {
+ throwIncludesMismatchError(translatedIncludes);
+ }
+ }
+ if (translatedIncludes.length < writerIncludes.length) {
+ for (int i = len; i < writerIncludes.length; ++i) {
+ if (writerIncludes[i]) {
+ throwIncludesMismatchError(translatedIncludes);
+ }
+ }
+ }
+
+ }
+
+ private String throwIncludesMismatchError(boolean[] translated) throws IOException {
+ String s = "Includes derived from the original table: " + DebugUtils.toString(writerIncludes)
+ + " but the ones derived from writer types are: " + DebugUtils.toString(translated);
+ LOG.error(s);
+ throw new IOException(s);
+ }
+
+ @Override
+ public void writePostScript(OrcProto.PostScript.Builder builder) throws IOException {
+ // LOG.info("TODO# Ignoring post script " + builder.build());
+ }
+
+ @Override
+ public void close() throws IOException {
+ // Closed from ORC writer, we still need the data. Do not discard anything.
+ }
+
+ public void discardData() {
+ LOG.info("TODO# discarding disk data (if any wasn't cached)");
+ for (CacheStripeData stripe : stripes) {
+ if (stripe.colStreams == null || stripe.colStreams.isEmpty()) continue;
+ for (List<CacheStreamData> streams : stripe.colStreams.values()) {
+ for (CacheStreamData cos : streams) {
+ for (MemoryBuffer buffer : cos.data) {
+ bufferManager.getAllocator().deallocate(buffer);
+ }
+ }
+ }
+ stripe.colStreams.clear();
+ }
+ }
+
+ @Override
+ public long getPhysicalStripeSize() {
+ return 0; // Always 0, no memory checks.
+ }
+
+ @Override
+ public boolean isCompressed() {
+ return false;
+ }
+
+ @Override
+ public OutStream getOrCreatePhysicalStream(StreamName name) throws IOException {
+ OutStream os = streams.get(name);
+ if (os != null) return os;
+ if (isNeeded(name)) {
+ LOG.info("Creating cache receiver for " + name);
+ CacheOutputReceiver or = new CacheOutputReceiver(bufferManager, name);
+ CacheOutStream cos = new CacheOutStream(name.toString(), bufferSize, null, or);
+ os = cos;
+ List<CacheOutStream> list = colStreams.get(name.getColumn());
+ if (list == null) {
+ list = new ArrayList<>();
+ colStreams.put(name.getColumn(), list);
+ }
+ list.add(cos);
+ } else {
+ LOG.info("Creating null receiver for " + name);
+ OutputReceiver or = new NullOutputReceiver(name);
+ os = new OutStream(name.toString(), bufferSize, null, or);
+ }
+ streams.put(name, os);
+ return os;
+ }
+
+ @Override
+ public void finalizeStripe(
+ OrcProto.StripeFooter.Builder footer,
+ OrcProto.StripeInformation.Builder dirEntry)
+ throws IOException {
+ List<ColumnEncoding> allEnc = footer.getColumnsList();
+ OrcProto.StripeInformation si = dirEntry.build();
+ LOG.info(("TODO# Finalizing stripe " + footer.build() + " => " + si).replace('\n', ' '));
+ currentStripe.encodings = new ArrayList<>(allEnc);
+ for (int i = 0; i < currentStripe.encodings.size(); ++i) {
+ // Don't record encodings for unneeded columns.
+ if (writerIncludes == null || writerIncludes[i]) continue;
+ currentStripe.encodings.set(i, null);
+ }
+ currentStripe.rowCount = si.getNumberOfRows();
+ // ORC writer reuses streams, so we need to clean them here and extract data.
+ for (Map.Entry<Integer, List<CacheOutStream>> e : colStreams.entrySet()) {
+ int colIx = e.getKey();
+ List<CacheOutStream> streams = e.getValue();
+ List<CacheStreamData> data = new ArrayList<>(streams.size());
+ for (CacheOutStream stream : streams) {
+ stream.flush();
+ List<MemoryBuffer> buffers = stream.receiver.buffers;
+ if (buffers == null) {
+ LOG.info("TODO# buffers are null for " + stream.receiver.name);
+ }
+ data.add(new CacheStreamData(stream.isSuppressed(), stream.receiver.name,
+ buffers == null ? new ArrayList<MemoryBuffer>() : new ArrayList<>(buffers)));
+ stream.clear();
+ }
+ currentStripe.colStreams.put(colIx, data);
+ }
+ startStripe();
+ }
+
+ @Override
+ public long estimateMemory() {
+ return 0; // We never ever use any memory.
+ }
+
+ @Override
+ public void writeIndexStream(StreamName name,
+ OrcProto.RowIndex.Builder rowIndex) throws IOException {
+ if (isNeeded(name)) {
+ // LOG.info("TODO# Saving index " + name);
+ // currentStripe.indexes.put(name.getColumn(), rowIndex.build());
+ } else {
+ // LOG.info("TODO# Ignoring index " + name + " => " + rowIndex);
+ }
+ }
+
+ private boolean isNeeded(StreamName name) {
+ return writerIncludes == null || writerIncludes[name.getColumn()];
+ }
+
+ @Override
+ public void writeBloomFilterStream(StreamName streamName,
+ OrcProto.BloomFilterIndex.Builder bloomFilterIndex) throws IOException {
+ // LOG.info("TODO# Ignoring bloom filter " + streamName + " => " + bloomFilterIndex);
+ }
+
+
+ @Override
+ public void flush() throws IOException {
+ }
+
+ @Override
+ public long getRawWriterPosition() throws IOException {
+ return -1; // Meaningless for this writer.
+ }
+
+ @Override
+ public void appendRawStripe(byte[] stripe, int offset, int length,
+ OrcProto.StripeInformation.Builder dirEntry) throws IOException {
+ throw new UnsupportedOperationException(); // Only used in ACID writer.
+ }
+
+ public void setCurrentStripeOffsets(long currentKnownTornStart,
+ long firstStartOffset, long lastStartOffset, long currentFileOffset) {
+ currentStripe.knownTornStart = currentKnownTornStart;
+ currentStripe.firstRowStart = firstStartOffset;
+ currentStripe.lastRowStart = lastStartOffset;
+ currentStripe.lastRowEnd = currentFileOffset;
+ }
+ }
+
+ private interface CacheOutput {
+ List<MemoryBuffer> getData();
+ StreamName getName();
+ }
+
+ private static final class CacheOutputReceiver implements CacheOutput, OutputReceiver {
+ private final BufferUsageManager bufferManager;
+ private final StreamName name;
+ private List<MemoryBuffer> buffers = null;
+ private int lastBufferPos = -1;
+
+ public CacheOutputReceiver(BufferUsageManager bufferManager, StreamName name) {
+ this.bufferManager = bufferManager;
+ this.name = name;
+ }
+
+ public void clear() {
+ buffers = null;
+ lastBufferPos = -1;
+ }
+
+ @Override
+ public void output(ByteBuffer buffer) throws IOException {
+ // TODO: avoid put() by working directly in OutStream?
+ LOG.info(name + " receiving a buffer of size " + buffer.remaining());
+ int size = buffer.remaining();
+ ByteBuffer bb = null;
+ if (buffers == null) {
+ buffers = new ArrayList<>();
+ }
+ if (!buffers.isEmpty()) {
+ MemoryBuffer lastBuffer = buffers.get(buffers.size() - 1);
+ bb = lastBuffer.getByteBufferRaw();
+ int written = lastBufferPos - bb.position();
+ if (bb.remaining() - written < size) {
+ lastBufferPos = -1;
+ bb = null;
+ }
+ }
+ boolean isNewBuffer = (lastBufferPos == -1);
+ if (isNewBuffer) {
+ MemoryBuffer[] dest = new MemoryBuffer[1];
+ bufferManager.getAllocator().allocateMultiple(dest, size);
+ LlapDataBuffer newBuffer = (LlapDataBuffer)dest[0];
+ bb = newBuffer.getByteBufferRaw();
+ lastBufferPos = bb.position();
+ buffers.add(newBuffer);
+ }
+ // Since there's no close() here, maintain the initial read position between writes.
+ int pos = bb.position();
+ bb.position(lastBufferPos);
+ bb.put(buffer);
+ lastBufferPos = bb.position();
+ bb.position(pos);
+ }
+
+ @Override
+ public List<MemoryBuffer> getData() {
+ return buffers;
+ }
+
+ @Override
+ public StreamName getName() {
+ return name;
+ }
+ }
+
+ private static class NullOutputReceiver implements OutputReceiver {
+ private final StreamName name;
+
+ public NullOutputReceiver(StreamName name) {
+ this.name = name;
+ }
+
+ @Override
+ public void output(ByteBuffer buffer) throws IOException {
+ LOG.info(name + " discarding a buffer of size " + buffer.remaining());
+ }
+ }
+
+ protected Void performDataRead() throws IOException {
+ try {
+ long startTime = counters.startTimeCounter();
+ LlapIoImpl.LOG.info("Processing data for {}", split.getPath());
+ if (processStop()) {
+ recordReaderTime(startTime);
+ return null;
+ }
+
+ Boolean isFromCache = null;
+ try {
+ isFromCache = readFileWithCache(startTime);
+ } finally {
+ if (cachedData != null && cachedData.getData() != null) {
+ for (StripeData sd : cachedData.getData()) {
+ unlockAllBuffers(sd);
+ }
+ }
+ }
+ if (isFromCache == null) return null; // Stop requested, and handled inside.
+ if (!isFromCache) {
+ if (!processOneFileSplit(split, startTime, Ref.from(0), null)) return null;
+ }
+
+ // Done with all the things.
+ recordReaderTime(startTime);
+ LOG.info("TODO# calling setDone");
+ consumer.setDone();
+
+ LlapIoImpl.LOG.trace("done processing {}", split);
+ return null;
+ } catch (Throwable e) {
+ LOG.error("TODO# threw", e);
+ consumer.setError(e);
+ throw e;
+ } finally {
+ cleanupReaders();
+ }
+ }
+
+ private void unlockAllBuffers(StripeData si) {
+ for (int i = 0; i < si.getData().length; ++i) {
+ LlapDataBuffer[][] colData = si.getData()[i];
+ if (colData == null) continue;
+ for (int j = 0; j < colData.length; ++j) {
+ LlapDataBuffer[] streamData = colData[j];
+ if (streamData == null) continue;
+ for (int k = 0; k < streamData.length; ++k) {
+ bufferManager.decRefBuffer(streamData[k]);
+ }
+ }
+ }
+ }
+
+ public void cacheFileData(StripeData sd) {
+ if (sd == null || sd.getEncodings() == null) return;
+ if (fileKey != null) {
+ // Note that we cache each slice separately. We could cache them together at the end, but
+ // then we won't be able to pass them to users without inc-refing explicitly.
+ ColumnEncoding[] encodings = sd.getEncodings();
+ for (int i = 0; i < encodings.length; ++i) {
+ // Make data consistent with encodings, don't store useless information.
+ if (sd.getData()[i] == null) {
+ encodings[i] = null;
+ }
+ }
+ FileData fd = new FileData(fileKey, encodings.length);
+ fd.addStripe(sd);
+ cache.putFileData(fd, Priority.NORMAL, counters);
+ } else {
+ lockAllBuffers(sd);
+ }
+ // We assume that if put/lock throws in the middle, it's ok to treat buffers as not being
+ // locked and to blindly deallocate them, since they are not going to be used. Therefore
+ // we don't remove them from the cleanup list - we will do it after sending to consumer.
+ // This relies on sequence of calls to cacheFileData and sendEcb..
+ }
+
+
+ private void lockAllBuffers(StripeData sd) {
+ for (int i = 0; i < sd.getData().length; ++i) {
+ LlapDataBuffer[][] colData = sd.getData()[i];
+ if (colData == null) continue;
+ for (int j = 0; j < colData.length; ++j) {
+ LlapDataBuffer[] streamData = colData[j];
+ if (streamData == null) continue;
+ for (int k = 0; k < streamData.length; ++k) {
+ boolean canLock = bufferManager.incRefBuffer(streamData[k]);
+ assert canLock;
+ }
+ }
+ }
+ }
+
+ public Boolean readFileWithCache(long startTime) throws IOException {
+ if (fileKey == null) return false;
+ BooleanRef gotAllData = new BooleanRef();
+ long endOfSplit = split.getStart() + split.getLength();
+ this.cachedData = cache.getFileData(fileKey, split.getStart(),
+ endOfSplit, writerIncludes, CC_FACTORY, counters, gotAllData);
+ if (cachedData == null) {
+ LOG.info("TODO# no data for the split found in cache");
+ return false;
+ }
+ String[] hosts = extractHosts(split, false), inMemoryHosts = extractHosts(split, true);
+ List<StripeData> slices = cachedData.getData();
+ if (slices.isEmpty()) return false;
+ long uncachedPrefixEnd = slices.get(0).getKnownTornStart(),
+ uncachedSuffixStart = slices.get(slices.size() - 1).getLastEnd();
+ Ref<Integer> stripeIx = Ref.from(0);
+ if (uncachedPrefixEnd > split.getStart()) {
+ // TODO: can we merge neighboring splits? So we don't init so many readers.
+ FileSplit sliceSplit = new FileSplit(split.getPath(), split.getStart(),
+ uncachedPrefixEnd - split.getStart(), hosts, inMemoryHosts);
+ if (!processOneFileSplit(sliceSplit, startTime, stripeIx, null)) return null;
+ }
+ while (!slices.isEmpty()) {
+ StripeData slice = slices.get(0);
+ long start = slice.getKnownTornStart();
+ long len = slice.getLastStart() - start; // Will also read the last row.
+ FileSplit sliceSplit = new FileSplit(split.getPath(), start, len, hosts, inMemoryHosts);
+ if (!processOneFileSplit(sliceSplit, startTime, stripeIx, slice)) return null;
+ }
+ boolean isUnfortunate = false;
+ if (uncachedSuffixStart == endOfSplit) {
+ // This is rather obscure. The end of last row cached is precisely at the split end offset.
+ // If the split is in the middle of the file, LRR would read one more row after that,
+ // therefore as unfortunate as it is, we have to do a one-row read. However, for that to
+ // have happened, someone should have supplied a split that ends inside the last row, i.e.
+ // a few bytes earlier than the current split, which is pretty unlikely. What is more likely
+ // is that the split, and the last row, both end at the end of file. Check for this.
+ long size = split.getPath().getFileSystem(conf).getFileStatus(split.getPath()).getLen();
+ isUnfortunate = size > endOfSplit;
+ if (isUnfortunate) {
+ LOG.info("TODO# one-row mismatch at the end of split " + split.getPath() + " at "
+ + endOfSplit + "; file size is " + size);
+ }
+ }
+
+ if (uncachedSuffixStart < endOfSplit || isUnfortunate) {
+ // TODO: will 0-length split work? should we assume 1+ chars and add 1 for isUnfortunate?
+ FileSplit splitPart = new FileSplit(split.getPath(), uncachedSuffixStart,
+ endOfSplit - uncachedSuffixStart, hosts, inMemoryHosts);
+ if (!processOneFileSplit(splitPart, startTime, stripeIx, null)) return null;
+ }
+ return true;
+ }
+
+ public boolean processOneFileSplit(FileSplit split, long startTime,
+ Ref<Integer> stripeIxRef, StripeData slice) throws IOException {
+ ColumnEncoding[] cacheEncodings = slice == null ? null : slice.getEncodings();
+ LlapIoImpl.LOG.info("TODO# Processing one split {" + split.getPath() + ", "
+ + split.getStart() + ", " + split.getLength() + "}; cache data " + slice);
+ boolean[] splitIncludes = writerIncludes;
+ boolean hasAllData = false;
+ if (cacheEncodings != null) {
+ hasAllData = true;
+ splitIncludes = Arrays.copyOf(writerIncludes, writerIncludes.length);
+ for (int colIx = 0; colIx < cacheEncodings.length; ++colIx) {
+ if (!splitIncludes[colIx]) continue;
+ assert (cacheEncodings[colIx] != null) == (slice.getData()[colIx] != null);
+ if (cacheEncodings[colIx] != null) {
+ splitIncludes[colIx] = false;
+ } else {
+ hasAllData = false;
+ }
+ }
+ }
+ LOG.info("TODO# includes accounting for cached data: before " + DebugUtils.toString(
+ writerIncludes) + ", after " + DebugUtils.toString(splitIncludes));
+
+ // We have 3 cases here:
+ // 1) All the data is in the cache. Always a single slice, no disk read, no cache puts.
+ // 2) Some data is in the cache. Always a single slice, disk read and a single cache put.
+ // 3) No data is in the cache. Multiple slices, disk read and multiple cache puts.
+ if (!hasAllData) {
+ // This initializes cacheWriter with data.
+ readSplitFromFile(split, splitIncludes, slice);
+ assert cacheWriter != null;
+ }
+ if (slice != null) {
+ // If we had a cache range already, it should not have been split.
+ assert cacheWriter == null || cacheWriter.stripes.size() == 1;
+ CacheWriter.CacheStripeData csd = hasAllData ? null : cacheWriter.stripes.get(0);
+ boolean result = processOneSlice(csd, splitIncludes, stripeIxRef.value, slice, startTime);
+ ++stripeIxRef.value;
+ return result;
+ } else {
+ for (CacheWriter.CacheStripeData csd : cacheWriter.stripes) {
+ if (!processOneSlice(csd, splitIncludes, stripeIxRef.value, null, startTime)) {
+ return false;
+ }
+ ++stripeIxRef.value;
+ }
+ return true;
+ }
+ }
+
+ private boolean processOneSlice(CacheWriter.CacheStripeData csd, boolean[] splitIncludes,
+ int stripeIx, StripeData slice, long startTime) throws IOException {
+ String sliceStr = slice == null ? "null" : slice.toCoordinateString();
+ LOG.info("TODO# processing slice #" + stripeIx + " " + sliceStr + "; has"
+ + ((slice == null) ? " no" : "") + " cache data; has" + ((csd == null) ? " no" : "")
+ + " disk data");
+
+ ColumnEncoding[] cacheEncodings = slice == null ? null : slice.getEncodings();
+ LlapDataBuffer[][][] cacheData = slice == null ? null : slice.getData();
+ long cacheRowCount = slice == null ? -1L : slice.getRowCount();
+ TextStripeMetadata metadata = new TextStripeMetadata(stripeIx);
+ StripeData sliceToCache = null;
+ boolean hasAllData = csd == null;
+ if (!hasAllData) {
+ if (slice == null) {
+ sliceToCache = new StripeData(
+ csd.knownTornStart, csd.firstRowStart, csd.lastRowStart, csd.lastRowEnd,
+ csd.rowCount, csd.encodings.toArray(new ColumnEncoding[csd.encodings.size()]));
+ } else {
+ if (csd.rowCount != slice.getRowCount()) {
+ throw new IOException("Row count mismatch; disk " + csd.rowCount + ", cache "
+ + slice.getRowCount() + " from " + csd + " and " + slice);
+ }
+ if (csd.encodings.size() != slice.getEncodings().length) {
+ throw new IOException("Column count mismatch; disk " + csd.encodings.size()
+ + ", cache " + slice.getEncodings().length + " from " + csd + " and " + slice);
+ }
+ LOG.info("TODO# creating slice to cache in addition to an existing slice "
+ + slice.toCoordinateString() + "; disk offsets were " + csd.toCoordinateString());
+ sliceToCache = StripeData.duplicateForResults(slice);
+ for (int i = 0; i < csd.encodings.size(); ++i) {
+ sliceToCache.getEncodings()[i] = csd.encodings.get(i);
+ }
+ sliceToCache.setKnownTornStart(Math.min(csd.knownTornStart, slice.getKnownTornStart()));
+ }
+ metadata.setEncodings(combineCacheAndWriterEncodings(cacheEncodings, csd.encodings));
+ metadata.setRowCount(csd.rowCount);
+ } else {
+ assert cacheWriter == null;
+ metadata.setEncodings(Lists.newArrayList(cacheEncodings));
+ metadata.setRowCount(cacheRowCount);
+ }
+ LOG.info("TODO# derived stripe metadata for this split is " + metadata);
+ consumer.setStripeMetadata(metadata);
+
+ OrcEncodedColumnBatch ecb = ECB_POOL.take();
+ ecb.init(fileKey, metadata.getStripeIx(), OrcEncodedColumnBatch.ALL_RGS, writerIncludes.length);
+ for (int colIx = 0; colIx < writerIncludes.length; ++colIx) {
+ if (!writerIncludes[colIx]) continue;
+ ecb.initColumn(colIx, OrcEncodedColumnBatch.MAX_DATA_STREAMS);
+ if (!hasAllData && splitIncludes[colIx]) {
+ // The column has been read from disk.
+ List<CacheWriter.CacheStreamData> streams = csd.colStreams.get(colIx);
+ LOG.info("TODO# processing streams for column " + colIx + ": " + streams);
+ LlapDataBuffer[][] newCacheDataForCol = sliceToCache.getData()[colIx]
+ = new LlapDataBuffer[OrcEncodedColumnBatch.MAX_DATA_STREAMS][];
+ if (streams == null) continue; // Struct column, such as root?
+ Iterator<CacheWriter.CacheStreamData> iter = streams.iterator();
+ while (iter.hasNext()) {
+ CacheWriter.CacheStreamData stream = iter.next();
+ if (stream.isSuppressed) {
+ LOG.info("TODO# removing a suppressed stream " + stream.name);
+ iter.remove();
+ discardUncachedBuffers(stream.data);
+ continue;
+ }
+ // TODO: We write each slice using a separate writer, so we don't share dictionaries. Fix?
+ ColumnStreamData cb = CSD_POOL.take();
+ cb.incRef();
+ int streamIx = stream.name.getKind().getNumber();
+ cb.setCacheBuffers(stream.data);
+ // This is kinda hacky - we "know" these are LlapDataBuffer-s.
+ newCacheDataForCol[streamIx] = stream.data.toArray(
+ new LlapDataBuffer[stream.data.size()]);
+ ecb.setStreamData(colIx, streamIx, cb);
+ }
+ } else {
+ // The column has been obtained from cache.
+ LlapDataBuffer[][] colData = cacheData[colIx];
+ LOG.info("TODO# processing cache data for column " + colIx + ": " + SerDeLowLevelCacheImpl.toString(colData));
+ for (int streamIx = 0; streamIx < colData.length; ++streamIx) {
+ if (colData[streamIx] == null) continue;
+ ColumnStreamData cb = CSD_POOL.take();
+ cb.incRef();
+ cb.setCacheBuffers(Lists.<MemoryBuffer>newArrayList(colData[streamIx]));
+ ecb.setStreamData(colIx, streamIx, cb);
+ }
+ }
+ }
+ if (processStop()) {
+ recordReaderTime(startTime);
+ return false;
+ }
+ // Note: we cache slices one by one since we need to lock them before sending to consumer.
+ // We could lock here, then cache them together, then unlock here and in return,
+ // but for now just rely on the cache put to lock them before we send them over.
+ LOG.info("TODO# Data to cache from the read " + sliceToCache);
+ cacheFileData(sliceToCache);
+ return sendEcbToConsumer(ecb, slice != null, csd);
+ }
+
+ private void discardUncachedBuffers(List<MemoryBuffer> list) {
+ for (MemoryBuffer buffer : list) {
+ boolean isInvalidated = ((LlapDataBuffer)buffer).invalidate();
+ assert isInvalidated;
+ bufferManager.getAllocator().deallocate(buffer);
+ }
+ }
+
+ private static List<ColumnEncoding> combineCacheAndWriterEncodings(
+ ColumnEncoding[] cacheEncodings, List<ColumnEncoding> writerEncodings) throws IOException {
+ // TODO: refactor with cache impl? it has the same merge logic
+ if (cacheEncodings == null) {
+ return new ArrayList<>(writerEncodings);
+ }
+ if (cacheEncodings.length != writerEncodings.size()) {
+ throw new IOException("Incompatible encoding lengths: "
+ + Arrays.toString(cacheEncodings) + " vs " + writerEncodings);
+ }
+ ColumnEncoding[] combinedEncodings = Arrays.copyOf(cacheEncodings, cacheEncodings.length);
+ for (int colIx = 0; colIx < cacheEncodings.length; ++colIx) {
+ ColumnEncoding newEncoding = writerEncodings.get(colIx);
+ if (newEncoding == null) continue;
+ if (combinedEncodings[colIx] != null && !newEncoding.equals(combinedEncodings[colIx])) {
+ throw new IOException("Incompatible encodings at " + colIx + ": "
+ + Arrays.toString(cacheEncodings) + " vs " + writerEncodings);
+ }
+ combinedEncodings[colIx] = newEncoding;
+ }
+ return Lists.newArrayList(combinedEncodings);
+ }
+
+
+ public void readSplitFromFile(FileSplit split, boolean[] splitIncludes, StripeData slice)
+ throws IOException {
+ boolean maySplitTheSplit = slice == null;
+ ReaderWithOffsets offsetReader = null;
+ @SuppressWarnings("rawtypes")
+ RecordReader sourceReader = sourceInputFormat.getRecordReader(split, jobConf, reporter);
+ try {
+ LOG.info("TODO# using " + sourceReader.getClass().getSimpleName() + " to read data");
+ // TODO# need a more general approach to this. At least, need to factor this out and add configs.
+ if (sourceReader instanceof LineRecordReader) {
+ offsetReader = LineRrOffsetReader.create((LineRecordReader)sourceReader);
+ maySplitTheSplit = maySplitTheSplit && offsetReader != null;
+ sourceReader = null;
+ } else {
+ offsetReader = new PassThruOffsetReader(sourceReader);
+ sourceReader = null;
+ }
+ ObjectInspector sourceOi;
+ try {
+ sourceOi = sourceSerDe.getObjectInspector();
+ } catch (SerDeException e) {
+ throw new IOException(e);
+ }
+
+ // TODO: ideally, we want to transform the rows to only have the included columns, and
+ // only write those to the writer, with modified schema; then map back to full set later.
+ WriterOptions opts = OrcFile.writerOptions(conf)
+ .stripeSize(Long.MAX_VALUE).blockSize(Long.MAX_VALUE)
+ .rowIndexStride(Integer.MAX_VALUE) // For now, do not limit this - one RG per split
+ .blockPadding(false).compress(CompressionKind.NONE).version(Version.CURRENT)
+ .encodingStrategy(EncodingStrategy.SPEED).bloomFilterColumns(null).inspector(sourceOi);
+
+
+ // Column IDs are only used to verify eventual writer includes.
+ cacheWriter = new CacheWriter(bufferManager, allocSize, columnIds, splitIncludes);
+ orcWriter = new WriterImpl(cacheWriter, null, opts);
+ int rowsPerSlice = 0;
+ long currentKnownTornStart = split.getStart();
+ long lastStartOffset = Long.MIN_VALUE, firstStartOffset = Long.MIN_VALUE;
+ while (offsetReader.next()) {
+ Writable value = offsetReader.getCurrentValue();
+ lastStartOffset = offsetReader.getCurrentRowStartOffset();
+ if (firstStartOffset == Long.MIN_VALUE) {
+ firstStartOffset = lastStartOffset;
+ }
+ Object row = null;
+ try {
+ row = sourceSerDe.deserialize(value);
+ } catch (SerDeException e) {
+ throw new IOException(e);
+ }
+ orcWriter.addRow(row);
+ if (maySplitTheSplit && ++rowsPerSlice == targetSliceRowCount) {
+ long fileOffset = offsetReader.getCurrentFileOffset();
+ // Must support offsets to be able to split.
+ if (firstStartOffset < 0 || lastStartOffset < 0 || fileOffset < 0) {
+ throw new AssertionError("Unable to get offsets from "
+ + offsetReader.getClass().getSimpleName());
+ }
+ cacheWriter.setCurrentStripeOffsets(
+ currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset);
+ // Split starting at row start will not read that row.
+ currentKnownTornStart = lastStartOffset;
+ // Row offsets will be determined from the reader (we could set the first from last).
+ lastStartOffset = Long.MIN_VALUE;
+ firstStartOffset = Long.MIN_VALUE;
+ rowsPerSlice = 0;
+ orcWriter.writeIntermediateFooter();
+ }
+ }
+ if (rowsPerSlice > 0) {
+ long fileOffset = offsetReader.getCurrentFileOffset();
+ if (firstStartOffset < 0 || lastStartOffset < 0 || fileOffset < 0) {
+ // The reader doesn't support offsets.
+ // TODO## Dbl check if these shennanigans are correct wrt cache matching.
+ // We want to match the exact same splits, and not match anything else ever.
+ // Perhaps we should just add a flag that would allow us to match exactly.
+ // If cached split was starting at row start, that row would be skipped, so +1
+ firstStartOffset = split.getStart() + 1;
+ // Last row starting at the end of the split would be read.
+ lastStartOffset = split.getStart() + split.getLength();
+ // However, it must end after the split end, otherwise the next one would have been read.
+ fileOffset = lastStartOffset + 1;
+ LOG.info("TODO# setting fake cache offsets based on split offsets - 'first row' at "
+ + firstStartOffset + "; 'last row' at " + lastStartOffset + ", " + fileOffset);
+ }
+ cacheWriter.setCurrentStripeOffsets(
+ currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset);
+ }
+ // Close the writer to finalize the metadata. No catch since we cannot go on if this throws.
+ orcWriter.close();
+ orcWriter = null;
+ } finally {
+ // We don't need the source reader anymore.
+ if (offsetReader != null) {
+ try {
+ offsetReader.close();
+ } catch (Exception ex) {
+ LOG.error("Failed to close source reader", ex);
+ }
+ } else {
+ assert sourceReader != null;
+ try {
+ sourceReader.close();
+ } catch (Exception ex) {
+ LOG.error("Failed to close source reader", ex);
+ }
+ }
+ }
+ }
+
+ private static String[] extractHosts(FileSplit split, boolean isInMemory) throws IOException {
+ SplitLocationInfo[] locInfo = split.getLocationInfo();
+ if (locInfo == null) return new String[0];
+ List<String> hosts = null; // TODO: most of the time, there's no in-memory. Use an array?
+ for (int i = 0; i < locInfo.length; i++) {
+ if (locInfo[i].isInMemory() != isInMemory) continue;
+ if (hosts == null) {
+ hosts = new ArrayList<>();
+ }
+ hosts.add(locInfo[i].getLocation());
+ }
+ if (hosts == null) return new String[0];
+ return hosts.toArray(new String[hosts.size()]);
+ }
+
+ private boolean sendEcbToConsumer(OrcEncodedColumnBatch ecb,
+ boolean hasCachedSlice, CacheWriter.CacheStripeData writerData) {
+ if (ecb == null) { // This basically means stop has been called.
+ cleanupReaders();
+ return false;
+ }
+ LOG.info("TODO# Sending over the ecb");
+ try {
+ consumer.consumeData(ecb);
+ } catch (Throwable ex) {
+ LOG.info("TODO# threw", ex);
+ consumer.setError(ex); // TODO## this is wrong, it shouldn't throw
+ }
+ if (hasCachedSlice) {
+ cachedData.getData().remove(0); // See javadoc - no need to clean up the cache data anymore.
+ }
+ if (writerData != null) {
+ writerData.colStreams.clear();
+ }
+ return true;
+ }
+
+
+ private void cleanupReaders() {
+ if (orcWriter != null) {
+ try {
+ orcWriter.close();
+ orcWriter = null;
+ } catch (Exception ex) {
+ LOG.error("Failed to close ORC writer", ex);
+ }
+ }
+ if (cacheWriter != null) {
+ try {
+ cacheWriter.discardData();
+ cacheWriter = null;
+ } catch (Exception ex) {
+ LOG.error("Failed to close cache writer", ex);
+ }
+ }
+ }
+
+ private void recordReaderTime(long startTime) {
+ counters.incrTimeCounter(LlapIOCounters.TOTAL_IO_TIME_NS, startTime);
+ }
+
+ private boolean processStop() {
+ if (!isStopped) return false;
+ LOG.info("SerDe-based data reader is stopping");
+ cleanupReaders();
+ return true;
+ }
+
+ private static Object determineFileId(FileSystem fs, FileSplit split,
+ boolean allowSynthetic) throws IOException {
+ /* TODO: support this optionally? this is not OrcSplit, but we could add a custom split.
+ Object fileKey = ((OrcSplit)split).getFileKey();
+ if (fileKey != null) return fileKey; */
+ LOG.warn("Split for " + split.getPath() + " (" + split.getClass() + ") does not have file ID");
+ return HdfsUtils.getFileId(fs, split.getPath(), allowSynthetic);
+ }
+
+ // TODO: move to a superclass?
+ @Override
+ public void returnData(OrcEncodedColumnBatch ecb) {
+ for (int colIx = 0; colIx < ecb.getTotalColCount(); ++colIx) {
+ if (!ecb.hasData(colIx)) continue;
+ ColumnStreamData[] datas = ecb.getColumnData(colIx);
+ for (ColumnStreamData data : datas) {
+ if (data == null || data.decRef() != 0) continue;
+ if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
+ for (MemoryBuffer buf : data.getCacheBuffers()) {
+ LlapIoImpl.LOCKING_LOGGER.trace("Unlocking {} at the end of processing", buf);
+ }
+ }
+ bufferManager.decRefBuffers(data.getCacheBuffers());
+ CSD_POOL.offer(data);
+ }
+ }
+ // We can offer ECB even with some streams not discarded; reset() will clear the arrays.
+ ECB_POOL.offer(ecb);
+ }
+
+ public TezCounters getTezCounters() {
+ return counters.getTezCounters();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/ConsumerFileMetadata.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/ConsumerFileMetadata.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/ConsumerFileMetadata.java
new file mode 100644
index 0000000..040f1a7
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/ConsumerFileMetadata.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.metadata;
+
+import java.util.List;
+
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcProto.Type;
+import org.apache.orc.TypeDescription;
+
+public interface ConsumerFileMetadata {
+ int getStripeCount();
+ CompressionKind getCompressionKind();
+ List<Type> getTypes();
+ TypeDescription getSchema();
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/ConsumerStripeMetadata.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/ConsumerStripeMetadata.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/ConsumerStripeMetadata.java
new file mode 100644
index 0000000..1e28f5f
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/ConsumerStripeMetadata.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.metadata;
+
+import java.util.List;
+
+import org.apache.orc.OrcProto;
+import org.apache.orc.OrcProto.ColumnEncoding;
+import org.apache.orc.OrcProto.RowIndex;
+import org.apache.orc.OrcProto.RowIndexEntry;
+
+public interface ConsumerStripeMetadata {
+ int getStripeIx();
+ long getRowCount();
+ List<ColumnEncoding> getEncodings();
+ String getWriterTimezone();
+ RowIndexEntry getRowIndexEntry(int colIx, int rgIx); // TODO: remove?
+ RowIndex[] getRowIndexes();
+ boolean supportsRowIndexes();
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java
index 70cba05..2c7a234 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java
@@ -41,7 +41,8 @@ import org.apache.orc.impl.ReaderImpl;
* of ORC use different info. Ideally we would get rid of protobuf structs in code beyond reading,
* or instead use protobuf structs everywhere instead of the mix of things like now.
*/
-public final class OrcFileMetadata extends LlapCacheableBuffer implements FileMetadata {
+public final class OrcFileMetadata extends LlapCacheableBuffer
+ implements FileMetadata, ConsumerFileMetadata {
private final List<StripeInformation> stripes;
private final List<Integer> versionList;
private final List<OrcProto.StripeStatistics> stripeStats;
@@ -225,6 +226,11 @@ public final class OrcFileMetadata extends LlapCacheableBuffer implements FileMe
return fileStats;
}
+ @Override
+ public int getStripeCount() {
+ return stripes.size();
+ }
+
public TypeDescription getSchema() {
return OrcUtils.convertTypeFromProtobuf(this.types, 0);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
index 3f4f43b..73a1721 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.llap.cache.LowLevelCachePolicy;
import org.apache.hadoop.hive.llap.cache.MemoryManager;
import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
public class OrcMetadataCache {
private final ConcurrentHashMap<Object, OrcFileMetadata> metadata = new ConcurrentHashMap<>();
@@ -116,7 +117,6 @@ public class OrcMetadataCache {
return touchOnGet(metadata.get(fileKey));
}
-
private <T extends LlapCacheableBuffer> T touchOnGet(T result) {
if (result != null) {
policy.notifyLock(result);
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
index 6f0b9ff..5ef1678 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
@@ -32,10 +32,11 @@ import org.apache.hadoop.hive.ql.io.SyntheticFileId;
import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey;
import org.apache.orc.DataReader;
import org.apache.orc.OrcProto;
+import org.apache.orc.OrcProto.RowIndexEntry;
import org.apache.orc.StripeInformation;
import org.apache.orc.impl.OrcIndex;
-public class OrcStripeMetadata extends LlapCacheableBuffer {
+public class OrcStripeMetadata extends LlapCacheableBuffer implements ConsumerStripeMetadata {
private final OrcBatchKey stripeKey;
private final List<OrcProto.ColumnEncoding> encodings;
private final List<OrcProto.Stream> streams;
@@ -172,4 +173,14 @@ public class OrcStripeMetadata extends LlapCacheableBuffer {
public void resetRowIndex() {
rowIndex = null;
}
+
+ @Override
+ public RowIndexEntry getRowIndexEntry(int colIx, int rgIx) {
+ return rowIndex.getRowGroupIndex()[colIx].getEntry(rgIx);
+ }
+
+ @Override
+ public boolean supportsRowIndexes() {
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/orc/src/java/org/apache/orc/OrcUtils.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/OrcUtils.java b/orc/src/java/org/apache/orc/OrcUtils.java
index dc83b9c..4f02926 100644
--- a/orc/src/java/org/apache/orc/OrcUtils.java
+++ b/orc/src/java/org/apache/orc/OrcUtils.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import org.apache.orc.OrcProto.Type.Builder;
import org.apache.orc.impl.ReaderImpl;
import com.google.common.collect.Lists;
@@ -538,4 +539,86 @@ public class OrcUtils {
}
return result;
}
+
+ public static List<TypeDescription> setTypeBuilderFromSchema(
+ OrcProto.Type.Builder type, TypeDescription schema) {
+ List<TypeDescription> children = schema.getChildren();
+ switch (schema.getCategory()) {
+ case BOOLEAN:
+ type.setKind(OrcProto.Type.Kind.BOOLEAN);
+ break;
+ case BYTE:
+ type.setKind(OrcProto.Type.Kind.BYTE);
+ break;
+ case SHORT:
+ type.setKind(OrcProto.Type.Kind.SHORT);
+ break;
+ case INT:
+ type.setKind(OrcProto.Type.Kind.INT);
+ break;
+ case LONG:
+ type.setKind(OrcProto.Type.Kind.LONG);
+ break;
+ case FLOAT:
+ type.setKind(OrcProto.Type.Kind.FLOAT);
+ break;
+ case DOUBLE:
+ type.setKind(OrcProto.Type.Kind.DOUBLE);
+ break;
+ case STRING:
+ type.setKind(OrcProto.Type.Kind.STRING);
+ break;
+ case CHAR:
+ type.setKind(OrcProto.Type.Kind.CHAR);
+ type.setMaximumLength(schema.getMaxLength());
+ break;
+ case VARCHAR:
+ type.setKind(OrcProto.Type.Kind.VARCHAR);
+ type.setMaximumLength(schema.getMaxLength());
+ break;
+ case BINARY:
+ type.setKind(OrcProto.Type.Kind.BINARY);
+ break;
+ case TIMESTAMP:
+ type.setKind(OrcProto.Type.Kind.TIMESTAMP);
+ break;
+ case DATE:
+ type.setKind(OrcProto.Type.Kind.DATE);
+ break;
+ case DECIMAL:
+ type.setKind(OrcProto.Type.Kind.DECIMAL);
+ type.setPrecision(schema.getPrecision());
+ type.setScale(schema.getScale());
+ break;
+ case LIST:
+ type.setKind(OrcProto.Type.Kind.LIST);
+ type.addSubtypes(children.get(0).getId());
+ break;
+ case MAP:
+ type.setKind(OrcProto.Type.Kind.MAP);
+ for(TypeDescription t: children) {
+ type.addSubtypes(t.getId());
+ }
+ break;
+ case STRUCT:
+ type.setKind(OrcProto.Type.Kind.STRUCT);
+ for(TypeDescription t: children) {
+ type.addSubtypes(t.getId());
+ }
+ for(String field: schema.getFieldNames()) {
+ type.addFieldNames(field);
+ }
+ break;
+ case UNION:
+ type.setKind(OrcProto.Type.Kind.UNION);
+ for(TypeDescription t: children) {
+ type.addSubtypes(t.getId());
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown category: " +
+ schema.getCategory());
+ }
+ return children;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/orc/src/java/org/apache/orc/impl/PhysicalWriter.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/PhysicalWriter.java b/orc/src/java/org/apache/orc/impl/PhysicalWriter.java
index 83742e4..5ba1b9b 100644
--- a/orc/src/java/org/apache/orc/impl/PhysicalWriter.java
+++ b/orc/src/java/org/apache/orc/impl/PhysicalWriter.java
@@ -19,7 +19,6 @@
package org.apache.orc.impl;
import java.io.IOException;
-import java.util.EnumSet;
import org.apache.orc.OrcProto.BloomFilterIndex;
import org.apache.orc.OrcProto.Footer;
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/orc/src/java/org/apache/orc/impl/RecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/RecordReaderImpl.java b/orc/src/java/org/apache/orc/impl/RecordReaderImpl.java
index 975804b..9433e54 100644
--- a/orc/src/java/org/apache/orc/impl/RecordReaderImpl.java
+++ b/orc/src/java/org/apache/orc/impl/RecordReaderImpl.java
@@ -232,6 +232,11 @@ public class RecordReaderImpl implements RecordReader {
public long getNext() {
return entry.getPositions(index++);
}
+
+ @Override
+ public String toString() {
+ return "{" + entry.getPositionsList() + "; " + index + "}";
+ }
}
public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/orc/src/java/org/apache/orc/impl/TreeReaderFactory.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/TreeReaderFactory.java b/orc/src/java/org/apache/orc/impl/TreeReaderFactory.java
index 484209b..4bb51c3 100644
--- a/orc/src/java/org/apache/orc/impl/TreeReaderFactory.java
+++ b/orc/src/java/org/apache/orc/impl/TreeReaderFactory.java
@@ -1223,6 +1223,7 @@ public class TreeReaderFactory {
}
}
+ private static org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(TreeReaderFactory.class);
// This class collects together very similar methods for reading an ORC vector of byte arrays and
// creating the BytesColumnVector.
//
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/orc/src/java/org/apache/orc/impl/WriterImpl.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/WriterImpl.java b/orc/src/java/org/apache/orc/impl/WriterImpl.java
index b17fb41..e0fcae7 100644
--- a/orc/src/java/org/apache/orc/impl/WriterImpl.java
+++ b/orc/src/java/org/apache/orc/impl/WriterImpl.java
@@ -125,10 +125,17 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
public WriterImpl(FileSystem fs,
Path path,
OrcFile.WriterOptions opts) throws IOException {
- this.path = path;
+ this(new PhysicalFsWriter(fs, path, opts.getSchema().getMaximumId() + 1, opts), path, opts);
+ }
+
+ public WriterImpl(PhysicalWriter writer,
+ Path pathForMem,
+ OrcFile.WriterOptions opts) throws IOException {
+ this.physWriter = writer;
+ this.path = pathForMem;
this.conf = opts.getConfiguration();
- this.callback = opts.getCallback();
this.schema = opts.getSchema();
+ this.callback = opts.getCallback();
if (callback != null) {
callbackContext = new OrcFile.WriterContext(){
@@ -153,8 +160,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
OrcUtils.includeColumns(opts.getBloomFilterColumns(), schema);
}
this.bloomFilterFpp = opts.getBloomFilterFpp();
- int numColumns = schema.getMaximumId() + 1;
- physWriter = new PhysicalFsWriter(fs, path, numColumns, opts);
treeWriter = createTreeWriter(schema, streamFactory, false);
if (buildIndex && rowIndexStride < MIN_ROW_INDEX_STRIDE) {
throw new IllegalArgumentException("Row stride must be at least " +
@@ -162,7 +167,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
// ensure that we are able to handle callbacks before we register ourselves
- memoryManager.addWriter(path, opts.getStripeSize(), this);
+ if (path != null) {
+ memoryManager.addWriter(path, opts.getStripeSize(), this);
+ }
}
@Override
@@ -2112,83 +2119,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
private static void writeTypes(OrcProto.Footer.Builder builder,
TypeDescription schema) {
OrcProto.Type.Builder type = OrcProto.Type.newBuilder();
- List<TypeDescription> children = schema.getChildren();
- switch (schema.getCategory()) {
- case BOOLEAN:
- type.setKind(OrcProto.Type.Kind.BOOLEAN);
- break;
- case BYTE:
- type.setKind(OrcProto.Type.Kind.BYTE);
- break;
- case SHORT:
- type.setKind(OrcProto.Type.Kind.SHORT);
- break;
- case INT:
- type.setKind(OrcProto.Type.Kind.INT);
- break;
- case LONG:
- type.setKind(OrcProto.Type.Kind.LONG);
- break;
- case FLOAT:
- type.setKind(OrcProto.Type.Kind.FLOAT);
- break;
- case DOUBLE:
- type.setKind(OrcProto.Type.Kind.DOUBLE);
- break;
- case STRING:
- type.setKind(OrcProto.Type.Kind.STRING);
- break;
- case CHAR:
- type.setKind(OrcProto.Type.Kind.CHAR);
- type.setMaximumLength(schema.getMaxLength());
- break;
- case VARCHAR:
- type.setKind(OrcProto.Type.Kind.VARCHAR);
- type.setMaximumLength(schema.getMaxLength());
- break;
- case BINARY:
- type.setKind(OrcProto.Type.Kind.BINARY);
- break;
- case TIMESTAMP:
- type.setKind(OrcProto.Type.Kind.TIMESTAMP);
- break;
- case DATE:
- type.setKind(OrcProto.Type.Kind.DATE);
- break;
- case DECIMAL:
- type.setKind(OrcProto.Type.Kind.DECIMAL);
- type.setPrecision(schema.getPrecision());
- type.setScale(schema.getScale());
- break;
- case LIST:
- type.setKind(OrcProto.Type.Kind.LIST);
- type.addSubtypes(children.get(0).getId());
- break;
- case MAP:
- type.setKind(OrcProto.Type.Kind.MAP);
- for(TypeDescription t: children) {
- type.addSubtypes(t.getId());
- }
- break;
- case STRUCT:
- type.setKind(OrcProto.Type.Kind.STRUCT);
- for(TypeDescription t: children) {
- type.addSubtypes(t.getId());
- }
- for(String field: schema.getFieldNames()) {
- type.addFieldNames(field);
- }
- break;
- case UNION:
- type.setKind(OrcProto.Type.Kind.UNION);
- for(TypeDescription t: children) {
- type.addSubtypes(t.getId());
- }
- break;
- default:
- throw new IllegalArgumentException("Unknown category: " +
- schema.getCategory());
- }
+ List<TypeDescription> children = OrcUtils.setTypeBuilderFromSchema(type, schema);
builder.addTypes(type);
if (children != null) {
for(TypeDescription child: children) {
@@ -2380,7 +2311,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
callback.preFooterWrite(callbackContext);
}
// remove us from the memory manager so that we don't get any callbacks
- memoryManager.removeWriter(path);
+ if (path != null) {
+ memoryManager.removeWriter(path);
+ }
// actually close the file
flushStripe();
writeMetadata();
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java b/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java
index 3d81e43..b9c39de 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java
@@ -24,6 +24,7 @@ package org.apache.hadoop.hive.llap;
*/
public class DebugUtils {
public static String toString(boolean[] a) {
+ if (a == null) return "null";
StringBuilder b = new StringBuilder();
b.append('[');
for (int i = 0; i < a.length; ++i) {
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
index 601ad08..004bb2f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
@@ -225,7 +225,7 @@ public class FetchOperator implements Serializable {
+ inputFormatClass.getName() + " as specified in mapredWork!", e);
}
}
- return HiveInputFormat.wrapForLlap(format, conf);
+ return format;
}
private StructObjectInspector getPartitionKeyOI(TableDesc tableDesc) throws Exception {
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 37e4b9b..46270bb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -327,7 +327,7 @@ public final class Utilities {
return null;
}
- public static BaseWork getMergeWork(JobConf jconf) {
+ public static BaseWork getMergeWork(Configuration jconf) {
if ((jconf.get(DagUtils.TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX) == null)
|| (jconf.get(DagUtils.TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX).isEmpty())) {
return null;
@@ -335,7 +335,7 @@ public final class Utilities {
return getMergeWork(jconf, jconf.get(DagUtils.TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX));
}
- public static BaseWork getMergeWork(JobConf jconf, String prefix) {
+ public static BaseWork getMergeWork(Configuration jconf, String prefix) {
if (prefix == null || prefix.isEmpty()) {
return null;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java
index ba25573..de36f2b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java
@@ -19,11 +19,20 @@
package org.apache.hadoop.hive.ql.io;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat.CombineHiveInputSplit;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileSplit;
@@ -42,6 +51,9 @@ import org.apache.hadoop.mapred.lib.CombineFileSplit;
*/
public class CombineHiveRecordReader<K extends WritableComparable, V extends Writable>
extends HiveContextAwareRecordReader<K, V> {
+ private org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(CombineHiveRecordReader.class);
+
+ private LinkedHashMap<Path, PartitionDesc> pathToPartInfo;
public CombineHiveRecordReader(InputSplit split, Configuration conf,
Reporter reporter, Integer partition, RecordReader preReader) throws IOException {
@@ -57,8 +69,27 @@ public class CombineHiveRecordReader<K extends WritableComparable, V extends Wri
throw new IOException("CombineHiveRecordReader: class not found "
+ inputFormatClassName);
}
- InputFormat inputFormat = HiveInputFormat.getInputFormatFromCache(
- inputFormatClass, jobConf);
+ InputFormat inputFormat = HiveInputFormat.getInputFormatFromCache(inputFormatClass, jobConf);
+ try {
+ // TODO: refactor this out
+ if (pathToPartInfo == null) {
+ MapWork mrwork;
+ if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
+ mrwork = (MapWork) Utilities.getMergeWork(jobConf);
+ if (mrwork == null) {
+ mrwork = Utilities.getMapWork(jobConf);
+ }
+ } else {
+ mrwork = Utilities.getMapWork(jobConf);
+ }
+ pathToPartInfo = mrwork.getPathToPartitionInfo();
+ }
+
+ PartitionDesc part = extractSinglePartSpec(hsplit);
+ inputFormat = HiveInputFormat.wrapForLlap(inputFormat, jobConf, part);
+ } catch (HiveException e) {
+ throw new IOException(e);
+ }
// create a split for the given partition
FileSplit fsplit = new FileSplit(hsplit.getPaths()[partition], hsplit
@@ -79,6 +110,26 @@ public class CombineHiveRecordReader<K extends WritableComparable, V extends Wri
}
+ private PartitionDesc extractSinglePartSpec(CombineHiveInputSplit hsplit) throws IOException {
+ PartitionDesc part = null;
+ Map<Map<Path,PartitionDesc>, Map<Path,PartitionDesc>> cache = new HashMap<>();
+ for (Path path : hsplit.getPaths()) {
+ PartitionDesc otherPart = HiveFileFormatUtils.getPartitionDescFromPathRecursively(
+ pathToPartInfo, path, cache);
+ LOG.debug("Found spec for " + path + " " + otherPart + " from " + pathToPartInfo);
+ if (part == null) {
+ part = otherPart;
+ } else if (otherPart != part) { // Assume we should have the exact same object.
+ // TODO: we could also compare the schema and SerDe, and pass only those to the call
+ // instead; most of the time these would be the same and LLAP IO can handle that.
+ LOG.warn("Multiple partitions found; not going to pass a part spec to LLAP IO: {"
+ + part.getPartSpec() + "} and {" + otherPart.getPartSpec() + "}");
+ return null;
+ }
+ }
+ return part;
+ }
+
@Override
public void doClose() throws IOException {
recordReader.close();
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index 94fcd60..f8391e0 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -57,6 +58,7 @@ import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileInputFormat;
@@ -202,7 +204,8 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
}
public static InputFormat<WritableComparable, Writable> wrapForLlap(
- InputFormat<WritableComparable, Writable> inputFormat, Configuration conf) {
+ InputFormat<WritableComparable, Writable> inputFormat, Configuration conf,
+ PartitionDesc part) throws HiveException {
if (!HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_ENABLED, LlapProxy.isDaemon())) {
return inputFormat; // LLAP not enabled, no-op.
}
@@ -227,7 +230,20 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
LOG.info("Not using LLAP IO because it is not initialized");
return inputFormat;
}
- return castInputFormat(llapIo.getInputFormat(inputFormat));
+ LlapWrappableInputFormatInterface llapIf = (LlapWrappableInputFormatInterface)inputFormat;
+ Deserializer serde = null;
+ if (llapIf.isSerdeBased()) {
+ if (part == null) {
+ LOG.info("Not using LLAP IO because there's no partition spec for SerDe-based IF", new Exception());
+ return inputFormat;
+ }
+ try {
+ serde = part.getDeserializer(conf);
+ } catch (Exception e) {
+ throw new HiveException("Error creating SerDe for LLAP IO", e);
+ }
+ }
+ return castInputFormat(llapIo.getInputFormat(inputFormat, serde));
}
@@ -248,7 +264,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
return (InputFormat<T, U>)from;
}
-
+ /** NOTE: this no longer wraps the IF for LLAP. Call wrapForLlap manually if needed. */
public static InputFormat<WritableComparable, Writable> getInputFormatFromCache(
Class inputFormatClass, JobConf job) throws IOException {
InputFormat<WritableComparable, Writable> instance = inputFormats.get(inputFormatClass);
@@ -266,7 +282,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
+ inputFormatClass.getName() + " as specified in mapredWork!", e);
}
}
- return wrapForLlap(instance, job);
+ return instance;
}
public RecordReader getRecordReader(InputSplit split, JobConf job,
@@ -287,15 +303,24 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
}
boolean nonNative = false;
- PartitionDesc part = pathToPartitionInfo.get(hsplit.getPath());
+ PartitionDesc part = HiveFileFormatUtils.getPartitionDescFromPathRecursively(
+ pathToPartitionInfo, hsplit.getPath(), null);
+ LOG.debug("Found spec for " + hsplit.getPath() + " " + part + " from " + pathToPartitionInfo);
+
if ((part != null) && (part.getTableDesc() != null)) {
Utilities.copyTableJobPropertiesToConf(part.getTableDesc(), job);
nonNative = part.getTableDesc().isNonNative();
}
- pushProjectionsAndFilters(job, inputFormatClass, hsplit.getPath(), nonNative);
+ Path splitPath = hsplit.getPath();
+ pushProjectionsAndFilters(job, inputFormatClass, splitPath, nonNative);
InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);
+ try {
+ inputFormat = HiveInputFormat.wrapForLlap(inputFormat, job, part);
+ } catch (HiveException e) {
+ throw new IOException(e);
+ }
RecordReader innerReader = null;
try {
innerReader = inputFormat.getRecordReader(inputSplit, job, reporter);
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/ql/src/java/org/apache/hadoop/hive/ql/io/LlapWrappableInputFormatInterface.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/LlapWrappableInputFormatInterface.java b/ql/src/java/org/apache/hadoop/hive/ql/io/LlapWrappableInputFormatInterface.java
index 66e1f90..f168f3c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/LlapWrappableInputFormatInterface.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/LlapWrappableInputFormatInterface.java
@@ -18,5 +18,7 @@
package org.apache.hadoop.hive.ql.io;
-/** Marker interface for LLAP; serves no other purpose. */
-public interface LlapWrappableInputFormatInterface {}
+/** Marker interface for LLAP IO. */
+public interface LlapWrappableInputFormatInterface {
+ boolean isSerdeBased();
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 361901e..2a89c03 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -332,6 +332,10 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
List<Integer> included) {
boolean[] result = new boolean[readerSchema.getMaximumId() + 1];
+ if (included == null) {
+ Arrays.fill(result, true);
+ return result;
+ }
result[0] = true;
List<TypeDescription> children = readerSchema.getChildren();
for (int columnNumber = 0; columnNumber < children.size(); ++columnNumber) {
@@ -2482,4 +2486,10 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
VectorizedRowBatchCtx vrbCtx, List<Integer> includedCols) {
return new OrcOiBatchToRowReader(vrr, vrbCtx, includedCols);
}
+
+
+ @Override
+ public boolean isSerdeBased() {
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
index 075c3b4..3e4ec2e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
@@ -93,14 +93,28 @@ public class WriterImpl extends org.apache.orc.impl.WriterImpl implements Writer
OrcFile.WriterOptions opts) throws IOException {
super(fs, path, opts);
this.inspector = opts.getInspector();
- internalBatch = opts.getSchema().createRowBatch(opts.getBatchSize());
+ this.internalBatch = opts.getSchema().createRowBatch(opts.getBatchSize());
+ this.fields = initializeFieldsFromOi(inspector);
+ }
+
+ public WriterImpl(PhysicalWriter writer,
+ Path pathForMem,
+ OrcFile.WriterOptions opts) throws IOException {
+ super(writer, pathForMem, opts);
+ this.inspector = opts.getInspector();
+ this.internalBatch = opts.getSchema().createRowBatch(opts.getBatchSize());
+ this.fields = initializeFieldsFromOi(inspector);
+ }
+
+ private static StructField[] initializeFieldsFromOi(ObjectInspector inspector) {
if (inspector instanceof StructObjectInspector) {
List<? extends StructField> fieldList =
((StructObjectInspector) inspector).getAllStructFieldRefs();
- fields = new StructField[fieldList.size()];
+ StructField[] fields = new StructField[fieldList.size()];
fieldList.toArray(fields);
+ return fields;
} else {
- fields = null;
+ return null;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java
index 2325140..8857d3e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java
@@ -83,4 +83,8 @@ public class CacheChunk extends DiskRangeList {
public void reset() {
init(null, -1, -1);
}
+
+ public void adjustEnd(long l) {
+ this.end += l;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
index d5f5f9d..0dba1a0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
@@ -1775,7 +1775,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
LOG.debug("columnIndex: {} columnType: {} streamBuffers.length: {} columnEncoding: {}" +
" present: {} data: {} dictionary: {} lengths: {} secondary: {} tz: {}",
columnIndex, schema, streamBuffers.length, columnEncoding, present != null,
- data != null, dictionary != null, lengths != null, secondary != null, tz);
+ data, dictionary != null, lengths != null, secondary != null, tz);
}
switch (schema.getCategory()) {
case BINARY: