You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2015/12/12 00:28:09 UTC
[12/16] hive git commit: HIVE-11890. Create ORC submodue. (omalley
reviewed by prasanthj)
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/java/org/apache/orc/impl/InStream.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/InStream.java b/orc/src/java/org/apache/orc/impl/InStream.java
new file mode 100644
index 0000000..b1c6de5
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/InStream.java
@@ -0,0 +1,496 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+
+import org.apache.orc.CompressionCodec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.common.io.DiskRange;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.CodedInputStream;
+
+public abstract class InStream extends InputStream {
+
+ private static final Logger LOG = LoggerFactory.getLogger(InStream.class);
+ private static final int PROTOBUF_MESSAGE_MAX_LIMIT = 1024 << 20; // 1GB
+
+ protected final String name;
+ protected long length;
+
+ public InStream(String name, long length) {
+ this.name = name;
+ this.length = length;
+ }
+
+ public String getStreamName() {
+ return name;
+ }
+
+ public long getStreamLength() {
+ return length;
+ }
+
+ public static class UncompressedStream extends InStream {
+ private List<DiskRange> bytes;
+ private long length;
+ protected long currentOffset;
+ private ByteBuffer range;
+ private int currentRange;
+
+ public UncompressedStream(String name, List<DiskRange> input, long length) {
+ super(name, length);
+ reset(input, length);
+ }
+
+ protected void reset(List<DiskRange> input, long length) {
+ this.bytes = input;
+ this.length = length;
+ currentRange = 0;
+ currentOffset = 0;
+ range = null;
+ }
+
+ @Override
+ public int read() {
+ if (range == null || range.remaining() == 0) {
+ if (currentOffset == length) {
+ return -1;
+ }
+ seek(currentOffset);
+ }
+ currentOffset += 1;
+ return 0xff & range.get();
+ }
+
+ @Override
+ public int read(byte[] data, int offset, int length) {
+ if (range == null || range.remaining() == 0) {
+ if (currentOffset == this.length) {
+ return -1;
+ }
+ seek(currentOffset);
+ }
+ int actualLength = Math.min(length, range.remaining());
+ range.get(data, offset, actualLength);
+ currentOffset += actualLength;
+ return actualLength;
+ }
+
+ @Override
+ public int available() {
+ if (range != null && range.remaining() > 0) {
+ return range.remaining();
+ }
+ return (int) (length - currentOffset);
+ }
+
+ @Override
+ public void close() {
+ currentRange = bytes.size();
+ currentOffset = length;
+ // explicit de-ref of bytes[]
+ bytes.clear();
+ }
+
+ @Override
+ public void seek(PositionProvider index) throws IOException {
+ seek(index.getNext());
+ }
+
+ public void seek(long desired) {
+ if (desired == 0 && bytes.isEmpty()) {
+ logEmptySeek(name);
+ return;
+ }
+ int i = 0;
+ for (DiskRange curRange : bytes) {
+ if (desired == 0 && curRange.getData().remaining() == 0) {
+ logEmptySeek(name);
+ return;
+ }
+ if (curRange.getOffset() <= desired &&
+ (desired - curRange.getOffset()) < curRange.getLength()) {
+ currentOffset = desired;
+ currentRange = i;
+ this.range = curRange.getData().duplicate();
+ int pos = range.position();
+ pos += (int)(desired - curRange.getOffset()); // this is why we duplicate
+ this.range.position(pos);
+ return;
+ }
+ ++i;
+ }
+ // if they are seeking to the precise end, go ahead and let them go there
+ int segments = bytes.size();
+ if (segments != 0 && desired == bytes.get(segments - 1).getEnd()) {
+ currentOffset = desired;
+ currentRange = segments - 1;
+ DiskRange curRange = bytes.get(currentRange);
+ this.range = curRange.getData().duplicate();
+ int pos = range.position();
+ pos += (int)(desired - curRange.getOffset()); // this is why we duplicate
+ this.range.position(pos);
+ return;
+ }
+ throw new IllegalArgumentException("Seek in " + name + " to " +
+ desired + " is outside of the data");
+ }
+
+ @Override
+ public String toString() {
+ return "uncompressed stream " + name + " position: " + currentOffset +
+ " length: " + length + " range: " + currentRange +
+ " offset: " + (range == null ? 0 : range.position()) + " limit: " + (range == null ? 0 : range.limit());
+ }
+ }
+
+ private static ByteBuffer allocateBuffer(int size, boolean isDirect) {
+ // TODO: use the same pool as the ORC readers
+ if (isDirect) {
+ return ByteBuffer.allocateDirect(size);
+ } else {
+ return ByteBuffer.allocate(size);
+ }
+ }
+
+ private static class CompressedStream extends InStream {
+ private final List<DiskRange> bytes;
+ private final int bufferSize;
+ private ByteBuffer uncompressed;
+ private final CompressionCodec codec;
+ private ByteBuffer compressed;
+ private long currentOffset;
+ private int currentRange;
+ private boolean isUncompressedOriginal;
+
+ public CompressedStream(String name, List<DiskRange> input, long length,
+ CompressionCodec codec, int bufferSize) {
+ super(name, length);
+ this.bytes = input;
+ this.codec = codec;
+ this.bufferSize = bufferSize;
+ currentOffset = 0;
+ currentRange = 0;
+ }
+
+ private void allocateForUncompressed(int size, boolean isDirect) {
+ uncompressed = allocateBuffer(size, isDirect);
+ }
+
+ private void readHeader() throws IOException {
+ if (compressed == null || compressed.remaining() <= 0) {
+ seek(currentOffset);
+ }
+ if (compressed.remaining() > OutStream.HEADER_SIZE) {
+ int b0 = compressed.get() & 0xff;
+ int b1 = compressed.get() & 0xff;
+ int b2 = compressed.get() & 0xff;
+ boolean isOriginal = (b0 & 0x01) == 1;
+ int chunkLength = (b2 << 15) | (b1 << 7) | (b0 >> 1);
+
+ if (chunkLength > bufferSize) {
+ throw new IllegalArgumentException("Buffer size too small. size = " +
+ bufferSize + " needed = " + chunkLength);
+ }
+ // read 3 bytes, which should be equal to OutStream.HEADER_SIZE always
+ assert OutStream.HEADER_SIZE == 3 : "The Orc HEADER_SIZE must be the same in OutStream and InStream";
+ currentOffset += OutStream.HEADER_SIZE;
+
+ ByteBuffer slice = this.slice(chunkLength);
+
+ if (isOriginal) {
+ uncompressed = slice;
+ isUncompressedOriginal = true;
+ } else {
+ if (isUncompressedOriginal) {
+ allocateForUncompressed(bufferSize, slice.isDirect());
+ isUncompressedOriginal = false;
+ } else if (uncompressed == null) {
+ allocateForUncompressed(bufferSize, slice.isDirect());
+ } else {
+ uncompressed.clear();
+ }
+ codec.decompress(slice, uncompressed);
+ }
+ } else {
+ throw new IllegalStateException("Can't read header at " + this);
+ }
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (uncompressed == null || uncompressed.remaining() == 0) {
+ if (currentOffset == length) {
+ return -1;
+ }
+ readHeader();
+ }
+ return 0xff & uncompressed.get();
+ }
+
+ @Override
+ public int read(byte[] data, int offset, int length) throws IOException {
+ if (uncompressed == null || uncompressed.remaining() == 0) {
+ if (currentOffset == this.length) {
+ return -1;
+ }
+ readHeader();
+ }
+ int actualLength = Math.min(length, uncompressed.remaining());
+ uncompressed.get(data, offset, actualLength);
+ return actualLength;
+ }
+
+ @Override
+ public int available() throws IOException {
+ if (uncompressed == null || uncompressed.remaining() == 0) {
+ if (currentOffset == length) {
+ return 0;
+ }
+ readHeader();
+ }
+ return uncompressed.remaining();
+ }
+
+ @Override
+ public void close() {
+ uncompressed = null;
+ compressed = null;
+ currentRange = bytes.size();
+ currentOffset = length;
+ bytes.clear();
+ }
+
+ @Override
+ public void seek(PositionProvider index) throws IOException {
+ seek(index.getNext());
+ long uncompressedBytes = index.getNext();
+ if (uncompressedBytes != 0) {
+ readHeader();
+ uncompressed.position(uncompressed.position() +
+ (int) uncompressedBytes);
+ } else if (uncompressed != null) {
+ // mark the uncompressed buffer as done
+ uncompressed.position(uncompressed.limit());
+ }
+ }
+
+ /* slices a read only contiguous buffer of chunkLength */
+ private ByteBuffer slice(int chunkLength) throws IOException {
+ int len = chunkLength;
+ final long oldOffset = currentOffset;
+ ByteBuffer slice;
+ if (compressed.remaining() >= len) {
+ slice = compressed.slice();
+ // simple case
+ slice.limit(len);
+ currentOffset += len;
+ compressed.position(compressed.position() + len);
+ return slice;
+ } else if (currentRange >= (bytes.size() - 1)) {
+ // nothing has been modified yet
+ throw new IOException("EOF in " + this + " while trying to read " +
+ chunkLength + " bytes");
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format(
+ "Crossing into next BufferChunk because compressed only has %d bytes (needs %d)",
+ compressed.remaining(), len));
+ }
+
+ // we need to consolidate 2 or more buffers into 1
+ // first copy out compressed buffers
+ ByteBuffer copy = allocateBuffer(chunkLength, compressed.isDirect());
+ currentOffset += compressed.remaining();
+ len -= compressed.remaining();
+ copy.put(compressed);
+ ListIterator<DiskRange> iter = bytes.listIterator(currentRange);
+
+ while (len > 0 && iter.hasNext()) {
+ ++currentRange;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Read slow-path, >1 cross block reads with %s", this.toString()));
+ }
+ DiskRange range = iter.next();
+ compressed = range.getData().duplicate();
+ if (compressed.remaining() >= len) {
+ slice = compressed.slice();
+ slice.limit(len);
+ copy.put(slice);
+ currentOffset += len;
+ compressed.position(compressed.position() + len);
+ return copy;
+ }
+ currentOffset += compressed.remaining();
+ len -= compressed.remaining();
+ copy.put(compressed);
+ }
+
+ // restore offsets for exception clarity
+ seek(oldOffset);
+ throw new IOException("EOF in " + this + " while trying to read " +
+ chunkLength + " bytes");
+ }
+
+ private void seek(long desired) throws IOException {
+ if (desired == 0 && bytes.isEmpty()) {
+ logEmptySeek(name);
+ return;
+ }
+ int i = 0;
+ for (DiskRange range : bytes) {
+ if (range.getOffset() <= desired && desired < range.getEnd()) {
+ currentRange = i;
+ compressed = range.getData().duplicate();
+ int pos = compressed.position();
+ pos += (int)(desired - range.getOffset());
+ compressed.position(pos);
+ currentOffset = desired;
+ return;
+ }
+ ++i;
+ }
+ // if they are seeking to the precise end, go ahead and let them go there
+ int segments = bytes.size();
+ if (segments != 0 && desired == bytes.get(segments - 1).getEnd()) {
+ DiskRange range = bytes.get(segments - 1);
+ currentRange = segments - 1;
+ compressed = range.getData().duplicate();
+ compressed.position(compressed.limit());
+ currentOffset = desired;
+ return;
+ }
+ throw new IOException("Seek outside of data in " + this + " to " + desired);
+ }
+
+ private String rangeString() {
+ StringBuilder builder = new StringBuilder();
+ int i = 0;
+ for (DiskRange range : bytes) {
+ if (i != 0) {
+ builder.append("; ");
+ }
+ builder.append(" range " + i + " = " + range.getOffset()
+ + " to " + (range.getEnd() - range.getOffset()));
+ ++i;
+ }
+ return builder.toString();
+ }
+
+ @Override
+ public String toString() {
+ return "compressed stream " + name + " position: " + currentOffset +
+ " length: " + length + " range: " + currentRange +
+ " offset: " + (compressed == null ? 0 : compressed.position()) + " limit: " + (compressed == null ? 0 : compressed.limit()) +
+ rangeString() +
+ (uncompressed == null ? "" :
+ " uncompressed: " + uncompressed.position() + " to " +
+ uncompressed.limit());
+ }
+ }
+
+ public abstract void seek(PositionProvider index) throws IOException;
+
+ private static void logEmptySeek(String name) {
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Attempting seek into empty stream (" + name + ") Skipping stream.");
+ }
+ }
+
+ /**
+ * Create an input stream from a list of buffers.
+ * @param fileName name of the file
+ * @param streamName the name of the stream
+ * @param buffers the list of ranges of bytes for the stream
+ * @param offsets a list of offsets (the same length as input) that must
+ * contain the first offset of the each set of bytes in input
+ * @param length the length in bytes of the stream
+ * @param codec the compression codec
+ * @param bufferSize the compression buffer size
+ * @return an input stream
+ * @throws IOException
+ */
+ @VisibleForTesting
+ @Deprecated
+ public static InStream create(String streamName,
+ ByteBuffer[] buffers,
+ long[] offsets,
+ long length,
+ CompressionCodec codec,
+ int bufferSize) throws IOException {
+ List<DiskRange> input = new ArrayList<DiskRange>(buffers.length);
+ for (int i = 0; i < buffers.length; ++i) {
+ input.add(new BufferChunk(buffers[i], offsets[i]));
+ }
+ return create(streamName, input, length, codec, bufferSize);
+ }
+
+ /**
+ * Create an input stream from a list of disk ranges with data.
+ * @param name the name of the stream
+ * @param input the list of ranges of bytes for the stream; from disk or cache
+ * @param length the length in bytes of the stream
+ * @param codec the compression codec
+ * @param bufferSize the compression buffer size
+ * @return an input stream
+ * @throws IOException
+ */
+ public static InStream create(String name,
+ List<DiskRange> input,
+ long length,
+ CompressionCodec codec,
+ int bufferSize) throws IOException {
+ if (codec == null) {
+ return new UncompressedStream(name, input, length);
+ } else {
+ return new CompressedStream(name, input, length, codec, bufferSize);
+ }
+ }
+
+ /**
+ * Creates coded input stream (used for protobuf message parsing) with higher message size limit.
+ *
+ * @param name the name of the stream
+ * @param input the list of ranges of bytes for the stream; from disk or cache
+ * @param length the length in bytes of the stream
+ * @param codec the compression codec
+ * @param bufferSize the compression buffer size
+ * @return coded input stream
+ * @throws IOException
+ */
+ public static CodedInputStream createCodedInputStream(
+ String name,
+ List<DiskRange> input,
+ long length,
+ CompressionCodec codec,
+ int bufferSize) throws IOException {
+ InStream inStream = create(name, input, length, codec, bufferSize);
+ CodedInputStream codedInputStream = CodedInputStream.newInstance(inStream);
+ codedInputStream.setSizeLimit(PROTOBUF_MESSAGE_MAX_LIMIT);
+ return codedInputStream;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/java/org/apache/orc/impl/IntegerReader.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/IntegerReader.java b/orc/src/java/org/apache/orc/impl/IntegerReader.java
new file mode 100644
index 0000000..b928559
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/IntegerReader.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+
+/**
+ * Interface for reading integers.
+ */
+public interface IntegerReader {
+
+ /**
+ * Seek to the position provided by index.
+ * @param index
+ * @throws IOException
+ */
+ void seek(PositionProvider index) throws IOException;
+
+ /**
+ * Skip number of specified rows.
+ * @param numValues
+ * @throws IOException
+ */
+ void skip(long numValues) throws IOException;
+
+ /**
+ * Check if there are any more values left.
+ * @return
+ * @throws IOException
+ */
+ boolean hasNext() throws IOException;
+
+ /**
+ * Return the next available value.
+ * @return
+ * @throws IOException
+ */
+ long next() throws IOException;
+
+ /**
+ * Return the next available vector for values.
+ * @return
+ * @throws IOException
+ */
+ void nextVector(LongColumnVector previous, long previousLen)
+ throws IOException;
+
+ void setInStream(InStream data);
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/java/org/apache/orc/impl/IntegerWriter.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/IntegerWriter.java b/orc/src/java/org/apache/orc/impl/IntegerWriter.java
new file mode 100644
index 0000000..419054f
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/IntegerWriter.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import java.io.IOException;
+
+/**
+ * Interface for writing integers.
+ */
+public interface IntegerWriter {
+
+ /**
+ * Get position from the stream.
+ * @param recorder
+ * @throws IOException
+ */
+ void getPosition(PositionRecorder recorder) throws IOException;
+
+ /**
+ * Write the integer value
+ * @param value
+ * @throws IOException
+ */
+ void write(long value) throws IOException;
+
+ /**
+ * Flush the buffer
+ * @throws IOException
+ */
+ void flush() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/java/org/apache/orc/impl/MemoryManager.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/MemoryManager.java b/orc/src/java/org/apache/orc/impl/MemoryManager.java
new file mode 100644
index 0000000..2dbfba7
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/MemoryManager.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import org.apache.orc.OrcConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Preconditions;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Implements a memory manager that keeps a global context of how many ORC
+ * writers there are and manages the memory between them. For use cases with
+ * dynamic partitions, it is easy to end up with many writers in the same task.
+ * By managing the size of each allocation, we try to cut down the size of each
+ * allocation and keep the task from running out of memory.
+ *
+ * This class is not thread safe, but is re-entrant - ensure creation and all
+ * invocations are triggered from the same thread.
+ */
+public class MemoryManager {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MemoryManager.class);
+
+ /**
+ * How often should we check the memory sizes? Measured in rows added
+ * to all of the writers.
+ */
+ private static final int ROWS_BETWEEN_CHECKS = 5000;
+ private final long totalMemoryPool;
+ private final Map<Path, WriterInfo> writerList =
+ new HashMap<Path, WriterInfo>();
+ private long totalAllocation = 0;
+ private double currentScale = 1;
+ private int rowsAddedSinceCheck = 0;
+ private final OwnedLock ownerLock = new OwnedLock();
+
+ @SuppressWarnings("serial")
+ private static class OwnedLock extends ReentrantLock {
+ public Thread getOwner() {
+ return super.getOwner();
+ }
+ }
+
+ private static class WriterInfo {
+ long allocation;
+ Callback callback;
+ WriterInfo(long allocation, Callback callback) {
+ this.allocation = allocation;
+ this.callback = callback;
+ }
+ }
+
+ public interface Callback {
+ /**
+ * The writer needs to check its memory usage
+ * @param newScale the current scale factor for memory allocations
+ * @return true if the writer was over the limit
+ * @throws IOException
+ */
+ boolean checkMemory(double newScale) throws IOException;
+ }
+
+ /**
+ * Create the memory manager.
+ * @param conf use the configuration to find the maximum size of the memory
+ * pool.
+ */
+ public MemoryManager(Configuration conf) {
+ double maxLoad = OrcConf.MEMORY_POOL.getDouble(conf);
+ totalMemoryPool = Math.round(ManagementFactory.getMemoryMXBean().
+ getHeapMemoryUsage().getMax() * maxLoad);
+ ownerLock.lock();
+ }
+
+ /**
+ * Light weight thread-safety check for multi-threaded access patterns
+ */
+ private void checkOwner() {
+ Preconditions.checkArgument(ownerLock.isHeldByCurrentThread(),
+ "Owner thread expected %s, got %s",
+ ownerLock.getOwner(),
+ Thread.currentThread());
+ }
+
+ /**
+ * Add a new writer's memory allocation to the pool. We use the path
+ * as a unique key to ensure that we don't get duplicates.
+ * @param path the file that is being written
+ * @param requestedAllocation the requested buffer size
+ */
+ public void addWriter(Path path, long requestedAllocation,
+ Callback callback) throws IOException {
+ checkOwner();
+ WriterInfo oldVal = writerList.get(path);
+ // this should always be null, but we handle the case where the memory
+ // manager wasn't told that a writer wasn't still in use and the task
+ // starts writing to the same path.
+ if (oldVal == null) {
+ oldVal = new WriterInfo(requestedAllocation, callback);
+ writerList.put(path, oldVal);
+ totalAllocation += requestedAllocation;
+ } else {
+ // handle a new writer that is writing to the same path
+ totalAllocation += requestedAllocation - oldVal.allocation;
+ oldVal.allocation = requestedAllocation;
+ oldVal.callback = callback;
+ }
+ updateScale(true);
+ }
+
+ /**
+ * Remove the given writer from the pool.
+ * @param path the file that has been closed
+ */
+ public void removeWriter(Path path) throws IOException {
+ checkOwner();
+ WriterInfo val = writerList.get(path);
+ if (val != null) {
+ writerList.remove(path);
+ totalAllocation -= val.allocation;
+ if (writerList.isEmpty()) {
+ rowsAddedSinceCheck = 0;
+ }
+ updateScale(false);
+ }
+ if(writerList.isEmpty()) {
+ rowsAddedSinceCheck = 0;
+ }
+ }
+
+ /**
+ * Get the total pool size that is available for ORC writers.
+ * @return the number of bytes in the pool
+ */
+ public long getTotalMemoryPool() {
+ return totalMemoryPool;
+ }
+
+ /**
+ * The scaling factor for each allocation to ensure that the pool isn't
+ * oversubscribed.
+ * @return a fraction between 0.0 and 1.0 of the requested size that is
+ * available for each writer.
+ */
+ public double getAllocationScale() {
+ return currentScale;
+ }
+
+ /**
+ * Give the memory manager an opportunity for doing a memory check.
+ * @param rows number of rows added
+ * @throws IOException
+ */
+ public void addedRow(int rows) throws IOException {
+ rowsAddedSinceCheck += rows;
+ if (rowsAddedSinceCheck >= ROWS_BETWEEN_CHECKS) {
+ notifyWriters();
+ }
+ }
+
+ /**
+ * Notify all of the writers that they should check their memory usage.
+ * @throws IOException
+ */
+ public void notifyWriters() throws IOException {
+ checkOwner();
+ LOG.debug("Notifying writers after " + rowsAddedSinceCheck);
+ for(WriterInfo writer: writerList.values()) {
+ boolean flushed = writer.callback.checkMemory(currentScale);
+ if (LOG.isDebugEnabled() && flushed) {
+ LOG.debug("flushed " + writer.toString());
+ }
+ }
+ rowsAddedSinceCheck = 0;
+ }
+
+ /**
+ * Update the currentScale based on the current allocation and pool size.
+ * This also updates the notificationTrigger.
+ * @param isAllocate is this an allocation?
+ */
+ private void updateScale(boolean isAllocate) throws IOException {
+ if (totalAllocation <= totalMemoryPool) {
+ currentScale = 1;
+ } else {
+ currentScale = (double) totalMemoryPool / totalAllocation;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/java/org/apache/orc/impl/MetadataReader.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/MetadataReader.java b/orc/src/java/org/apache/orc/impl/MetadataReader.java
new file mode 100644
index 0000000..670a81a
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/MetadataReader.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import java.io.IOException;
+
+import org.apache.orc.OrcProto;
+import org.apache.orc.StripeInformation;
+
+public interface MetadataReader {
+ OrcIndex readRowIndex(StripeInformation stripe,
+ OrcProto.StripeFooter footer,
+ boolean[] included, OrcProto.RowIndex[] indexes, boolean[] sargColumns,
+ OrcProto.BloomFilterIndex[] bloomFilterIndices) throws IOException;
+
+ OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws IOException;
+
+ void close() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/java/org/apache/orc/impl/MetadataReaderImpl.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/MetadataReaderImpl.java b/orc/src/java/org/apache/orc/impl/MetadataReaderImpl.java
new file mode 100644
index 0000000..d0ded52
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/MetadataReaderImpl.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.io.DiskRange;
+import org.apache.orc.CompressionCodec;
+import org.apache.orc.OrcProto;
+import org.apache.orc.StripeInformation;
+
+import com.google.common.collect.Lists;
+
+public class MetadataReaderImpl implements MetadataReader {
+ private final FSDataInputStream file;
+ private final CompressionCodec codec;
+ private final int bufferSize;
+ private final int typeCount;
+
+ public MetadataReaderImpl(FileSystem fileSystem, Path path,
+ CompressionCodec codec, int bufferSize, int typeCount) throws IOException {
+ this(fileSystem.open(path), codec, bufferSize, typeCount);
+ }
+
+ public MetadataReaderImpl(FSDataInputStream file,
+ CompressionCodec codec, int bufferSize, int typeCount) {
+ this.file = file;
+ this.codec = codec;
+ this.bufferSize = bufferSize;
+ this.typeCount = typeCount;
+ }
+
+ @Override
+ public OrcIndex readRowIndex(StripeInformation stripe,
+ OrcProto.StripeFooter footer, boolean[] included, OrcProto.RowIndex[] indexes,
+ boolean[] sargColumns, OrcProto.BloomFilterIndex[] bloomFilterIndices) throws IOException {
+ if (footer == null) {
+ footer = readStripeFooter(stripe);
+ }
+ if (indexes == null) {
+ indexes = new OrcProto.RowIndex[typeCount];
+ }
+ if (bloomFilterIndices == null) {
+ bloomFilterIndices = new OrcProto.BloomFilterIndex[typeCount];
+ }
+ long offset = stripe.getOffset();
+ List<OrcProto.Stream> streams = footer.getStreamsList();
+ for (int i = 0; i < streams.size(); i++) {
+ OrcProto.Stream stream = streams.get(i);
+ OrcProto.Stream nextStream = null;
+ if (i < streams.size() - 1) {
+ nextStream = streams.get(i+1);
+ }
+ int col = stream.getColumn();
+ int len = (int) stream.getLength();
+ // row index stream and bloom filter are interlaced, check if the sarg column contains bloom
+ // filter and combine the io to read row index and bloom filters for that column together
+ if (stream.hasKind() && (stream.getKind() == OrcProto.Stream.Kind.ROW_INDEX)) {
+ boolean readBloomFilter = false;
+ if (sargColumns != null && sargColumns[col] &&
+ nextStream.getKind() == OrcProto.Stream.Kind.BLOOM_FILTER) {
+ len += nextStream.getLength();
+ i += 1;
+ readBloomFilter = true;
+ }
+ if ((included == null || included[col]) && indexes[col] == null) {
+ byte[] buffer = new byte[len];
+ file.readFully(offset, buffer, 0, buffer.length);
+ ByteBuffer bb = ByteBuffer.wrap(buffer);
+ indexes[col] = OrcProto.RowIndex.parseFrom(InStream.create("index",
+ Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)), stream.getLength(),
+ codec, bufferSize));
+ if (readBloomFilter) {
+ bb.position((int) stream.getLength());
+ bloomFilterIndices[col] = OrcProto.BloomFilterIndex.parseFrom(InStream.create(
+ "bloom_filter", Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)),
+ nextStream.getLength(), codec, bufferSize));
+ }
+ }
+ }
+ offset += len;
+ }
+
+ OrcIndex index = new OrcIndex(indexes, bloomFilterIndices);
+ return index;
+ }
+
+ @Override
+ public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws IOException {
+ long offset = stripe.getOffset() + stripe.getIndexLength() + stripe.getDataLength();
+ int tailLength = (int) stripe.getFooterLength();
+
+ // read the footer
+ ByteBuffer tailBuf = ByteBuffer.allocate(tailLength);
+ file.readFully(offset, tailBuf.array(), tailBuf.arrayOffset(), tailLength);
+ return OrcProto.StripeFooter.parseFrom(InStream.createCodedInputStream("footer",
+ Lists.<DiskRange>newArrayList(new BufferChunk(tailBuf, 0)),
+ tailLength, codec, bufferSize));
+ }
+
+ @Override
+ public void close() throws IOException {
+ file.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/java/org/apache/orc/impl/OrcIndex.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/OrcIndex.java b/orc/src/java/org/apache/orc/impl/OrcIndex.java
new file mode 100644
index 0000000..50a15f2
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/OrcIndex.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import org.apache.orc.OrcProto;
+
+public final class OrcIndex {
+ OrcProto.RowIndex[] rowGroupIndex;
+ OrcProto.BloomFilterIndex[] bloomFilterIndex;
+
+ public OrcIndex(OrcProto.RowIndex[] rgIndex, OrcProto.BloomFilterIndex[] bfIndex) {
+ this.rowGroupIndex = rgIndex;
+ this.bloomFilterIndex = bfIndex;
+ }
+
+ public OrcProto.RowIndex[] getRowGroupIndex() {
+ return rowGroupIndex;
+ }
+
+ public OrcProto.BloomFilterIndex[] getBloomFilterIndex() {
+ return bloomFilterIndex;
+ }
+
+ public void setRowGroupIndex(OrcProto.RowIndex[] rowGroupIndex) {
+ this.rowGroupIndex = rowGroupIndex;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/java/org/apache/orc/impl/OutStream.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/OutStream.java b/orc/src/java/org/apache/orc/impl/OutStream.java
new file mode 100644
index 0000000..68ef7d4
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/OutStream.java
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import org.apache.orc.CompressionCodec;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class OutStream extends PositionedOutputStream {
+
+ public interface OutputReceiver {
+ /**
+ * Output the given buffer to the final destination
+ * @param buffer the buffer to output
+ * @throws IOException
+ */
+ void output(ByteBuffer buffer) throws IOException;
+ }
+
+ public static final int HEADER_SIZE = 3;
+ private final String name;
+ private final OutputReceiver receiver;
+ // if enabled the stream will be suppressed when writing stripe
+ private boolean suppress;
+
+ /**
+ * Stores the uncompressed bytes that have been serialized, but not
+ * compressed yet. When this fills, we compress the entire buffer.
+ */
+ private ByteBuffer current = null;
+
+ /**
+ * Stores the compressed bytes until we have a full buffer and then outputs
+ * them to the receiver. If no compression is being done, this (and overflow)
+ * will always be null and the current buffer will be sent directly to the
+ * receiver.
+ */
+ private ByteBuffer compressed = null;
+
+ /**
+ * Since the compressed buffer may start with contents from previous
+ * compression blocks, we allocate an overflow buffer so that the
+ * output of the codec can be split between the two buffers. After the
+ * compressed buffer is sent to the receiver, the overflow buffer becomes
+ * the new compressed buffer.
+ */
+ private ByteBuffer overflow = null;
+ private final int bufferSize;
+ private final CompressionCodec codec;
+ private long compressedBytes = 0;
+ private long uncompressedBytes = 0;
+
+ public OutStream(String name,
+ int bufferSize,
+ CompressionCodec codec,
+ OutputReceiver receiver) throws IOException {
+ this.name = name;
+ this.bufferSize = bufferSize;
+ this.codec = codec;
+ this.receiver = receiver;
+ this.suppress = false;
+ }
+
+ public void clear() throws IOException {
+ flush();
+ suppress = false;
+ }
+
+ /**
+ * Write the length of the compressed bytes. Life is much easier if the
+ * header is constant length, so just use 3 bytes. Considering most of the
+ * codecs want between 32k (snappy) and 256k (lzo, zlib), 3 bytes should
+ * be plenty. We also use the low bit for whether it is the original or
+ * compressed bytes.
+ * @param buffer the buffer to write the header to
+ * @param position the position in the buffer to write at
+ * @param val the size in the file
+ * @param original is it uncompressed
+ */
+ private static void writeHeader(ByteBuffer buffer,
+ int position,
+ int val,
+ boolean original) {
+ buffer.put(position, (byte) ((val << 1) + (original ? 1 : 0)));
+ buffer.put(position + 1, (byte) (val >> 7));
+ buffer.put(position + 2, (byte) (val >> 15));
+ }
+
+ private void getNewInputBuffer() throws IOException {
+ if (codec == null) {
+ current = ByteBuffer.allocate(bufferSize);
+ } else {
+ current = ByteBuffer.allocate(bufferSize + HEADER_SIZE);
+ writeHeader(current, 0, bufferSize, true);
+ current.position(HEADER_SIZE);
+ }
+ }
+
+ /**
+ * Allocate a new output buffer if we are compressing.
+ */
+ private ByteBuffer getNewOutputBuffer() throws IOException {
+ return ByteBuffer.allocate(bufferSize + HEADER_SIZE);
+ }
+
+ private void flip() throws IOException {
+ current.limit(current.position());
+ current.position(codec == null ? 0 : HEADER_SIZE);
+ }
+
+ @Override
+ public void write(int i) throws IOException {
+ if (current == null) {
+ getNewInputBuffer();
+ }
+ if (current.remaining() < 1) {
+ spill();
+ }
+ uncompressedBytes += 1;
+ current.put((byte) i);
+ }
+
+ @Override
+ public void write(byte[] bytes, int offset, int length) throws IOException {
+ if (current == null) {
+ getNewInputBuffer();
+ }
+ int remaining = Math.min(current.remaining(), length);
+ current.put(bytes, offset, remaining);
+ uncompressedBytes += remaining;
+ length -= remaining;
+ while (length != 0) {
+ spill();
+ offset += remaining;
+ remaining = Math.min(current.remaining(), length);
+ current.put(bytes, offset, remaining);
+ uncompressedBytes += remaining;
+ length -= remaining;
+ }
+ }
+
+ private void spill() throws java.io.IOException {
+ // if there isn't anything in the current buffer, don't spill
+ if (current == null ||
+ current.position() == (codec == null ? 0 : HEADER_SIZE)) {
+ return;
+ }
+ flip();
+ if (codec == null) {
+ receiver.output(current);
+ getNewInputBuffer();
+ } else {
+ if (compressed == null) {
+ compressed = getNewOutputBuffer();
+ } else if (overflow == null) {
+ overflow = getNewOutputBuffer();
+ }
+ int sizePosn = compressed.position();
+ compressed.position(compressed.position() + HEADER_SIZE);
+ if (codec.compress(current, compressed, overflow)) {
+ uncompressedBytes = 0;
+ // move position back to after the header
+ current.position(HEADER_SIZE);
+ current.limit(current.capacity());
+ // find the total bytes in the chunk
+ int totalBytes = compressed.position() - sizePosn - HEADER_SIZE;
+ if (overflow != null) {
+ totalBytes += overflow.position();
+ }
+ compressedBytes += totalBytes + HEADER_SIZE;
+ writeHeader(compressed, sizePosn, totalBytes, false);
+ // if we have less than the next header left, spill it.
+ if (compressed.remaining() < HEADER_SIZE) {
+ compressed.flip();
+ receiver.output(compressed);
+ compressed = overflow;
+ overflow = null;
+ }
+ } else {
+ compressedBytes += uncompressedBytes + HEADER_SIZE;
+ uncompressedBytes = 0;
+ // we are using the original, but need to spill the current
+ // compressed buffer first. So back up to where we started,
+ // flip it and add it to done.
+ if (sizePosn != 0) {
+ compressed.position(sizePosn);
+ compressed.flip();
+ receiver.output(compressed);
+ compressed = null;
+ // if we have an overflow, clear it and make it the new compress
+ // buffer
+ if (overflow != null) {
+ overflow.clear();
+ compressed = overflow;
+ overflow = null;
+ }
+ } else {
+ compressed.clear();
+ if (overflow != null) {
+ overflow.clear();
+ }
+ }
+
+ // now add the current buffer into the done list and get a new one.
+ current.position(0);
+ // update the header with the current length
+ writeHeader(current, 0, current.limit() - HEADER_SIZE, true);
+ receiver.output(current);
+ getNewInputBuffer();
+ }
+ }
+ }
+
+ @Override
+ public void getPosition(PositionRecorder recorder) throws IOException {
+ if (codec == null) {
+ recorder.addPosition(uncompressedBytes);
+ } else {
+ recorder.addPosition(compressedBytes);
+ recorder.addPosition(uncompressedBytes);
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ spill();
+ if (compressed != null && compressed.position() != 0) {
+ compressed.flip();
+ receiver.output(compressed);
+ compressed = null;
+ }
+ uncompressedBytes = 0;
+ compressedBytes = 0;
+ overflow = null;
+ current = null;
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+
+ @Override
+ public long getBufferSize() {
+ long result = 0;
+ if (current != null) {
+ result += current.capacity();
+ }
+ if (compressed != null) {
+ result += compressed.capacity();
+ }
+ if (overflow != null) {
+ result += overflow.capacity();
+ }
+ return result;
+ }
+
+ /**
+ * Set suppress flag
+ */
+ public void suppress() {
+ suppress = true;
+ }
+
+ /**
+ * Returns the state of suppress flag
+ * @return value of suppress flag
+ */
+ public boolean isSuppressed() {
+ return suppress;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/java/org/apache/orc/impl/PositionProvider.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/PositionProvider.java b/orc/src/java/org/apache/orc/impl/PositionProvider.java
new file mode 100644
index 0000000..47cf481
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/PositionProvider.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+/**
+ * An interface used for seeking to a row index.
+ */
+public interface PositionProvider {
+ long getNext();
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/java/org/apache/orc/impl/PositionRecorder.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/PositionRecorder.java b/orc/src/java/org/apache/orc/impl/PositionRecorder.java
new file mode 100644
index 0000000..1fff760
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/PositionRecorder.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+/**
+ * An interface for recording positions in a stream.
+ */
+public interface PositionRecorder {
+ void addPosition(long offset);
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/java/org/apache/orc/impl/PositionedOutputStream.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/PositionedOutputStream.java b/orc/src/java/org/apache/orc/impl/PositionedOutputStream.java
new file mode 100644
index 0000000..d412939
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/PositionedOutputStream.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public abstract class PositionedOutputStream extends OutputStream {
+
+ /**
+ * Record the current position to the recorder.
+ * @param recorder the object that receives the position
+ * @throws IOException
+ */
+ public abstract void getPosition(PositionRecorder recorder
+ ) throws IOException;
+
+ /**
+ * Get the memory size currently allocated as buffer associated with this
+ * stream.
+ * @return the number of bytes used by buffers.
+ */
+ public abstract long getBufferSize();
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/java/org/apache/orc/impl/RedBlackTree.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/RedBlackTree.java b/orc/src/java/org/apache/orc/impl/RedBlackTree.java
new file mode 100644
index 0000000..41aa4b9
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/RedBlackTree.java
@@ -0,0 +1,311 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import org.apache.orc.impl.DynamicIntArray;
+
+/**
+ * A memory efficient red-black tree that does not allocate any objects per
+ * an element. This class is abstract and assumes that the child class
+ * handles the key and comparisons with the key.
+ */
+abstract class RedBlackTree {
+ public static final int NULL = -1;
+
+ // Various values controlling the offset of the data within the array.
+ private static final int LEFT_OFFSET = 0;
+ private static final int RIGHT_OFFSET = 1;
+ private static final int ELEMENT_SIZE = 2;
+
+ protected int size = 0;
+ private final DynamicIntArray data;
+ protected int root = NULL;
+ protected int lastAdd = 0;
+ private boolean wasAdd = false;
+
+ /**
+ * Create a set with the given initial capacity.
+ */
+ public RedBlackTree(int initialCapacity) {
+ data = new DynamicIntArray(initialCapacity * ELEMENT_SIZE);
+ }
+
+ /**
+ * Insert a new node into the data array, growing the array as necessary.
+ *
+ * @return Returns the position of the new node.
+ */
+ private int insert(int left, int right, boolean isRed) {
+ int position = size;
+ size += 1;
+ setLeft(position, left, isRed);
+ setRight(position, right);
+ return position;
+ }
+
+ /**
+ * Compare the value at the given position to the new value.
+ * @return 0 if the values are the same, -1 if the new value is smaller and
+ * 1 if the new value is larger.
+ */
+ protected abstract int compareValue(int position);
+
+ /**
+ * Is the given node red as opposed to black? To prevent having an extra word
+ * in the data array, we just the low bit on the left child index.
+ */
+ protected boolean isRed(int position) {
+ return position != NULL &&
+ (data.get(position * ELEMENT_SIZE + LEFT_OFFSET) & 1) == 1;
+ }
+
+ /**
+ * Set the red bit true or false.
+ */
+ private void setRed(int position, boolean isRed) {
+ int offset = position * ELEMENT_SIZE + LEFT_OFFSET;
+ if (isRed) {
+ data.set(offset, data.get(offset) | 1);
+ } else {
+ data.set(offset, data.get(offset) & ~1);
+ }
+ }
+
+ /**
+ * Get the left field of the given position.
+ */
+ protected int getLeft(int position) {
+ return data.get(position * ELEMENT_SIZE + LEFT_OFFSET) >> 1;
+ }
+
+ /**
+ * Get the right field of the given position.
+ */
+ protected int getRight(int position) {
+ return data.get(position * ELEMENT_SIZE + RIGHT_OFFSET);
+ }
+
+ /**
+ * Set the left field of the given position.
+ * Note that we are storing the node color in the low bit of the left pointer.
+ */
+ private void setLeft(int position, int left) {
+ int offset = position * ELEMENT_SIZE + LEFT_OFFSET;
+ data.set(offset, (left << 1) | (data.get(offset) & 1));
+ }
+
+ /**
+ * Set the left field of the given position.
+ * Note that we are storing the node color in the low bit of the left pointer.
+ */
+ private void setLeft(int position, int left, boolean isRed) {
+ int offset = position * ELEMENT_SIZE + LEFT_OFFSET;
+ data.set(offset, (left << 1) | (isRed ? 1 : 0));
+ }
+
+ /**
+ * Set the right field of the given position.
+ */
+ private void setRight(int position, int right) {
+ data.set(position * ELEMENT_SIZE + RIGHT_OFFSET, right);
+ }
+
+ /**
+ * Insert or find a given key in the tree and rebalance the tree correctly.
+ * Rebalancing restores the red-black aspect of the tree to maintain the
+ * invariants:
+ * 1. If a node is red, both of its children are black.
+ * 2. Each child of a node has the same black height (the number of black
+ * nodes between it and the leaves of the tree).
+ *
+ * Inserted nodes are at the leaves and are red, therefore there is at most a
+ * violation of rule 1 at the node we just put in. Instead of always keeping
+ * the parents, this routine passing down the context.
+ *
+ * The fix is broken down into 6 cases (1.{1,2,3} and 2.{1,2,3} that are
+ * left-right mirror images of each other). See Algorighms by Cormen,
+ * Leiserson, and Rivest for the explaination of the subcases.
+ *
+ * @param node The node that we are fixing right now.
+ * @param fromLeft Did we come down from the left?
+ * @param parent Nodes' parent
+ * @param grandparent Parent's parent
+ * @param greatGrandparent Grandparent's parent
+ * @return Does parent also need to be checked and/or fixed?
+ */
+ private boolean add(int node, boolean fromLeft, int parent,
+ int grandparent, int greatGrandparent) {
+ if (node == NULL) {
+ if (root == NULL) {
+ lastAdd = insert(NULL, NULL, false);
+ root = lastAdd;
+ wasAdd = true;
+ return false;
+ } else {
+ lastAdd = insert(NULL, NULL, true);
+ node = lastAdd;
+ wasAdd = true;
+ // connect the new node into the tree
+ if (fromLeft) {
+ setLeft(parent, node);
+ } else {
+ setRight(parent, node);
+ }
+ }
+ } else {
+ int compare = compareValue(node);
+ boolean keepGoing;
+
+ // Recurse down to find where the node needs to be added
+ if (compare < 0) {
+ keepGoing = add(getLeft(node), true, node, parent, grandparent);
+ } else if (compare > 0) {
+ keepGoing = add(getRight(node), false, node, parent, grandparent);
+ } else {
+ lastAdd = node;
+ wasAdd = false;
+ return false;
+ }
+
+ // we don't need to fix the root (because it is always set to black)
+ if (node == root || !keepGoing) {
+ return false;
+ }
+ }
+
+
+ // Do we need to fix this node? Only if there are two reds right under each
+ // other.
+ if (isRed(node) && isRed(parent)) {
+ if (parent == getLeft(grandparent)) {
+ int uncle = getRight(grandparent);
+ if (isRed(uncle)) {
+ // case 1.1
+ setRed(parent, false);
+ setRed(uncle, false);
+ setRed(grandparent, true);
+ return true;
+ } else {
+ if (node == getRight(parent)) {
+ // case 1.2
+ // swap node and parent
+ int tmp = node;
+ node = parent;
+ parent = tmp;
+ // left-rotate on node
+ setLeft(grandparent, parent);
+ setRight(node, getLeft(parent));
+ setLeft(parent, node);
+ }
+
+ // case 1.2 and 1.3
+ setRed(parent, false);
+ setRed(grandparent, true);
+
+ // right-rotate on grandparent
+ if (greatGrandparent == NULL) {
+ root = parent;
+ } else if (getLeft(greatGrandparent) == grandparent) {
+ setLeft(greatGrandparent, parent);
+ } else {
+ setRight(greatGrandparent, parent);
+ }
+ setLeft(grandparent, getRight(parent));
+ setRight(parent, grandparent);
+ return false;
+ }
+ } else {
+ int uncle = getLeft(grandparent);
+ if (isRed(uncle)) {
+ // case 2.1
+ setRed(parent, false);
+ setRed(uncle, false);
+ setRed(grandparent, true);
+ return true;
+ } else {
+ if (node == getLeft(parent)) {
+ // case 2.2
+ // swap node and parent
+ int tmp = node;
+ node = parent;
+ parent = tmp;
+ // right-rotate on node
+ setRight(grandparent, parent);
+ setLeft(node, getRight(parent));
+ setRight(parent, node);
+ }
+ // case 2.2 and 2.3
+ setRed(parent, false);
+ setRed(grandparent, true);
+ // left-rotate on grandparent
+ if (greatGrandparent == NULL) {
+ root = parent;
+ } else if (getRight(greatGrandparent) == grandparent) {
+ setRight(greatGrandparent, parent);
+ } else {
+ setLeft(greatGrandparent, parent);
+ }
+ setRight(grandparent, getLeft(parent));
+ setLeft(parent, grandparent);
+ return false;
+ }
+ }
+ } else {
+ return true;
+ }
+ }
+
+ /**
+ * Add the new key to the tree.
+ * @return true if the element is a new one.
+ */
+ protected boolean add() {
+ add(root, false, NULL, NULL, NULL);
+ if (wasAdd) {
+ setRed(root, false);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Get the number of elements in the set.
+ */
+ public int size() {
+ return size;
+ }
+
+ /**
+ * Reset the table to empty.
+ */
+ public void clear() {
+ root = NULL;
+ size = 0;
+ data.clear();
+ }
+
+ /**
+ * Get the buffer size in bytes.
+ */
+ public long getSizeInBytes() {
+ return data.getSizeInBytes();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/java/org/apache/orc/impl/RunLengthByteReader.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/RunLengthByteReader.java b/orc/src/java/org/apache/orc/impl/RunLengthByteReader.java
new file mode 100644
index 0000000..380f339
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/RunLengthByteReader.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+
+/**
+ * A reader that reads a sequence of bytes. A control byte is read before
+ * each run with positive values 0 to 127 meaning 3 to 130 repetitions. If the
+ * byte is -1 to -128, 1 to 128 literal byte values follow.
+ */
+public class RunLengthByteReader {
+ private InStream input;
+ private final byte[] literals =
+ new byte[RunLengthByteWriter.MAX_LITERAL_SIZE];
+ private int numLiterals = 0;
+ private int used = 0;
+ private boolean repeat = false;
+
+ public RunLengthByteReader(InStream input) throws IOException {
+ this.input = input;
+ }
+
+ public void setInStream(InStream input) {
+ this.input = input;
+ }
+
+ private void readValues(boolean ignoreEof) throws IOException {
+ int control = input.read();
+ used = 0;
+ if (control == -1) {
+ if (!ignoreEof) {
+ throw new EOFException("Read past end of buffer RLE byte from " + input);
+ }
+ used = numLiterals = 0;
+ return;
+ } else if (control < 0x80) {
+ repeat = true;
+ numLiterals = control + RunLengthByteWriter.MIN_REPEAT_SIZE;
+ int val = input.read();
+ if (val == -1) {
+ throw new EOFException("Reading RLE byte got EOF");
+ }
+ literals[0] = (byte) val;
+ } else {
+ repeat = false;
+ numLiterals = 0x100 - control;
+ int bytes = 0;
+ while (bytes < numLiterals) {
+ int result = input.read(literals, bytes, numLiterals - bytes);
+ if (result == -1) {
+ throw new EOFException("Reading RLE byte literal got EOF in " + this);
+ }
+ bytes += result;
+ }
+ }
+ }
+
+ public boolean hasNext() throws IOException {
+ return used != numLiterals || input.available() > 0;
+ }
+
+ public byte next() throws IOException {
+ byte result;
+ if (used == numLiterals) {
+ readValues(false);
+ }
+ if (repeat) {
+ result = literals[0];
+ } else {
+ result = literals[used];
+ }
+ ++used;
+ return result;
+ }
+
+ public void nextVector(LongColumnVector previous, long previousLen)
+ throws IOException {
+ previous.isRepeating = true;
+ for (int i = 0; i < previousLen; i++) {
+ if (!previous.isNull[i]) {
+ previous.vector[i] = next();
+ } else {
+ // The default value of null for int types in vectorized
+ // processing is 1, so set that if the value is null
+ previous.vector[i] = 1;
+ }
+
+ // The default value for nulls in Vectorization for int types is 1
+ // and given that non null value can also be 1, we need to check for isNull also
+ // when determining the isRepeating flag.
+ if (previous.isRepeating
+ && i > 0
+ && ((previous.vector[i - 1] != previous.vector[i]) || (previous.isNull[i - 1] != previous.isNull[i]))) {
+ previous.isRepeating = false;
+ }
+ }
+ }
+
+ public void seek(PositionProvider index) throws IOException {
+ input.seek(index);
+ int consumed = (int) index.getNext();
+ if (consumed != 0) {
+ // a loop is required for cases where we break the run into two parts
+ while (consumed > 0) {
+ readValues(false);
+ used = consumed;
+ consumed -= numLiterals;
+ }
+ } else {
+ used = 0;
+ numLiterals = 0;
+ }
+ }
+
+ public void skip(long items) throws IOException {
+ while (items > 0) {
+ if (used == numLiterals) {
+ readValues(false);
+ }
+ long consume = Math.min(items, numLiterals - used);
+ used += consume;
+ items -= consume;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "byte rle " + (repeat ? "repeat" : "literal") + " used: " +
+ used + "/" + numLiterals + " from " + input;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/java/org/apache/orc/impl/RunLengthByteWriter.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/RunLengthByteWriter.java b/orc/src/java/org/apache/orc/impl/RunLengthByteWriter.java
new file mode 100644
index 0000000..09108b2
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/RunLengthByteWriter.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import java.io.IOException;
+
+/**
+ * A streamFactory that writes a sequence of bytes. A control byte is written before
+ * each run with positive values 0 to 127 meaning 2 to 129 repetitions. If the
+ * bytes is -1 to -128, 1 to 128 literal byte values follow.
+ */
+public class RunLengthByteWriter {
+ static final int MIN_REPEAT_SIZE = 3;
+ static final int MAX_LITERAL_SIZE = 128;
+ static final int MAX_REPEAT_SIZE= 127 + MIN_REPEAT_SIZE;
+ private final PositionedOutputStream output;
+ private final byte[] literals = new byte[MAX_LITERAL_SIZE];
+ private int numLiterals = 0;
+ private boolean repeat = false;
+ private int tailRunLength = 0;
+
+ public RunLengthByteWriter(PositionedOutputStream output) {
+ this.output = output;
+ }
+
+ private void writeValues() throws IOException {
+ if (numLiterals != 0) {
+ if (repeat) {
+ output.write(numLiterals - MIN_REPEAT_SIZE);
+ output.write(literals, 0, 1);
+ } else {
+ output.write(-numLiterals);
+ output.write(literals, 0, numLiterals);
+ }
+ repeat = false;
+ tailRunLength = 0;
+ numLiterals = 0;
+ }
+ }
+
+ public void flush() throws IOException {
+ writeValues();
+ output.flush();
+ }
+
+ public void write(byte value) throws IOException {
+ if (numLiterals == 0) {
+ literals[numLiterals++] = value;
+ tailRunLength = 1;
+ } else if (repeat) {
+ if (value == literals[0]) {
+ numLiterals += 1;
+ if (numLiterals == MAX_REPEAT_SIZE) {
+ writeValues();
+ }
+ } else {
+ writeValues();
+ literals[numLiterals++] = value;
+ tailRunLength = 1;
+ }
+ } else {
+ if (value == literals[numLiterals - 1]) {
+ tailRunLength += 1;
+ } else {
+ tailRunLength = 1;
+ }
+ if (tailRunLength == MIN_REPEAT_SIZE) {
+ if (numLiterals + 1 == MIN_REPEAT_SIZE) {
+ repeat = true;
+ numLiterals += 1;
+ } else {
+ numLiterals -= MIN_REPEAT_SIZE - 1;
+ writeValues();
+ literals[0] = value;
+ repeat = true;
+ numLiterals = MIN_REPEAT_SIZE;
+ }
+ } else {
+ literals[numLiterals++] = value;
+ if (numLiterals == MAX_LITERAL_SIZE) {
+ writeValues();
+ }
+ }
+ }
+ }
+
+ public void getPosition(PositionRecorder recorder) throws IOException {
+ output.getPosition(recorder);
+ recorder.addPosition(numLiterals);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java b/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java
new file mode 100644
index 0000000..f129c86
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+
+/**
+ * A reader that reads a sequence of integers.
+ * */
+public class RunLengthIntegerReader implements IntegerReader {
+ private InStream input;
+ private final boolean signed;
+ private final long[] literals =
+ new long[RunLengthIntegerWriter.MAX_LITERAL_SIZE];
+ private int numLiterals = 0;
+ private int delta = 0;
+ private int used = 0;
+ private boolean repeat = false;
+ private SerializationUtils utils;
+
+ public RunLengthIntegerReader(InStream input, boolean signed) throws IOException {
+ this.input = input;
+ this.signed = signed;
+ this.utils = new SerializationUtils();
+ }
+
+ private void readValues(boolean ignoreEof) throws IOException {
+ int control = input.read();
+ if (control == -1) {
+ if (!ignoreEof) {
+ throw new EOFException("Read past end of RLE integer from " + input);
+ }
+ used = numLiterals = 0;
+ return;
+ } else if (control < 0x80) {
+ numLiterals = control + RunLengthIntegerWriter.MIN_REPEAT_SIZE;
+ used = 0;
+ repeat = true;
+ delta = input.read();
+ if (delta == -1) {
+ throw new EOFException("End of stream in RLE Integer from " + input);
+ }
+ // convert from 0 to 255 to -128 to 127 by converting to a signed byte
+ delta = (byte) (0 + delta);
+ if (signed) {
+ literals[0] = utils.readVslong(input);
+ } else {
+ literals[0] = utils.readVulong(input);
+ }
+ } else {
+ repeat = false;
+ numLiterals = 0x100 - control;
+ used = 0;
+ for(int i=0; i < numLiterals; ++i) {
+ if (signed) {
+ literals[i] = utils.readVslong(input);
+ } else {
+ literals[i] = utils.readVulong(input);
+ }
+ }
+ }
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
+ return used != numLiterals || input.available() > 0;
+ }
+
+ @Override
+ public long next() throws IOException {
+ long result;
+ if (used == numLiterals) {
+ readValues(false);
+ }
+ if (repeat) {
+ result = literals[0] + (used++) * delta;
+ } else {
+ result = literals[used++];
+ }
+ return result;
+ }
+
+ @Override
+ public void nextVector(LongColumnVector previous, long previousLen) throws IOException {
+ previous.isRepeating = true;
+ for (int i = 0; i < previousLen; i++) {
+ if (!previous.isNull[i]) {
+ previous.vector[i] = next();
+ } else {
+ // The default value of null for int type in vectorized
+ // processing is 1, so set that if the value is null
+ previous.vector[i] = 1;
+ }
+
+ // The default value for nulls in Vectorization for int types is 1
+ // and given that non null value can also be 1, we need to check for isNull also
+ // when determining the isRepeating flag.
+ if (previous.isRepeating
+ && i > 0
+ && (previous.vector[i - 1] != previous.vector[i] || previous.isNull[i - 1] != previous.isNull[i])) {
+ previous.isRepeating = false;
+ }
+ }
+ }
+
+ @Override
+ public void setInStream(InStream data) {
+ input = data;
+ }
+
+ @Override
+ public void seek(PositionProvider index) throws IOException {
+ input.seek(index);
+ int consumed = (int) index.getNext();
+ if (consumed != 0) {
+ // a loop is required for cases where we break the run into two parts
+ while (consumed > 0) {
+ readValues(false);
+ used = consumed;
+ consumed -= numLiterals;
+ }
+ } else {
+ used = 0;
+ numLiterals = 0;
+ }
+ }
+
+ @Override
+ public void skip(long numValues) throws IOException {
+ while (numValues > 0) {
+ if (used == numLiterals) {
+ readValues(false);
+ }
+ long consume = Math.min(numValues, numLiterals - used);
+ used += consume;
+ numValues -= consume;
+ }
+ }
+}