You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2015/12/12 00:28:03 UTC
[06/16] hive git commit: HIVE-11890. Create ORC submodue. (omalley
reviewed by prasanthj)
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
deleted file mode 100644
index 227cfca..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
+++ /dev/null
@@ -1,497 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.io.orc;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.ListIterator;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hive.common.io.DiskRange;
-import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.BufferChunk;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.CodedInputStream;
-
-public abstract class InStream extends InputStream {
-
- private static final Logger LOG = LoggerFactory.getLogger(InStream.class);
- private static final int PROTOBUF_MESSAGE_MAX_LIMIT = 1024 << 20; // 1GB
-
- protected final String name;
- protected long length;
-
- public InStream(String name, long length) {
- this.name = name;
- this.length = length;
- }
-
- public String getStreamName() {
- return name;
- }
-
- public long getStreamLength() {
- return length;
- }
-
- static class UncompressedStream extends InStream {
- private List<DiskRange> bytes;
- private long length;
- protected long currentOffset;
- private ByteBuffer range;
- private int currentRange;
-
- public UncompressedStream(String name, List<DiskRange> input, long length) {
- super(name, length);
- reset(input, length);
- }
-
- protected void reset(List<DiskRange> input, long length) {
- this.bytes = input;
- this.length = length;
- currentRange = 0;
- currentOffset = 0;
- range = null;
- }
-
- @Override
- public int read() {
- if (range == null || range.remaining() == 0) {
- if (currentOffset == length) {
- return -1;
- }
- seek(currentOffset);
- }
- currentOffset += 1;
- return 0xff & range.get();
- }
-
- @Override
- public int read(byte[] data, int offset, int length) {
- if (range == null || range.remaining() == 0) {
- if (currentOffset == this.length) {
- return -1;
- }
- seek(currentOffset);
- }
- int actualLength = Math.min(length, range.remaining());
- range.get(data, offset, actualLength);
- currentOffset += actualLength;
- return actualLength;
- }
-
- @Override
- public int available() {
- if (range != null && range.remaining() > 0) {
- return range.remaining();
- }
- return (int) (length - currentOffset);
- }
-
- @Override
- public void close() {
- currentRange = bytes.size();
- currentOffset = length;
- // explicit de-ref of bytes[]
- bytes.clear();
- }
-
- @Override
- public void seek(PositionProvider index) throws IOException {
- seek(index.getNext());
- }
-
- public void seek(long desired) {
- if (desired == 0 && bytes.isEmpty()) {
- logEmptySeek(name);
- return;
- }
- int i = 0;
- for (DiskRange curRange : bytes) {
- if (desired == 0 && curRange.getData().remaining() == 0) {
- logEmptySeek(name);
- return;
- }
- if (curRange.getOffset() <= desired &&
- (desired - curRange.getOffset()) < curRange.getLength()) {
- currentOffset = desired;
- currentRange = i;
- this.range = curRange.getData().duplicate();
- int pos = range.position();
- pos += (int)(desired - curRange.getOffset()); // this is why we duplicate
- this.range.position(pos);
- return;
- }
- ++i;
- }
- // if they are seeking to the precise end, go ahead and let them go there
- int segments = bytes.size();
- if (segments != 0 && desired == bytes.get(segments - 1).getEnd()) {
- currentOffset = desired;
- currentRange = segments - 1;
- DiskRange curRange = bytes.get(currentRange);
- this.range = curRange.getData().duplicate();
- int pos = range.position();
- pos += (int)(desired - curRange.getOffset()); // this is why we duplicate
- this.range.position(pos);
- return;
- }
- throw new IllegalArgumentException("Seek in " + name + " to " +
- desired + " is outside of the data");
- }
-
- @Override
- public String toString() {
- return "uncompressed stream " + name + " position: " + currentOffset +
- " length: " + length + " range: " + currentRange +
- " offset: " + (range == null ? 0 : range.position()) + " limit: " + (range == null ? 0 : range.limit());
- }
- }
-
- private static ByteBuffer allocateBuffer(int size, boolean isDirect) {
- // TODO: use the same pool as the ORC readers
- if (isDirect) {
- return ByteBuffer.allocateDirect(size);
- } else {
- return ByteBuffer.allocate(size);
- }
- }
-
- private static class CompressedStream extends InStream {
- private final List<DiskRange> bytes;
- private final int bufferSize;
- private ByteBuffer uncompressed;
- private final CompressionCodec codec;
- private ByteBuffer compressed;
- private long currentOffset;
- private int currentRange;
- private boolean isUncompressedOriginal;
-
- public CompressedStream(String name, List<DiskRange> input, long length,
- CompressionCodec codec, int bufferSize) {
- super(name, length);
- this.bytes = input;
- this.codec = codec;
- this.bufferSize = bufferSize;
- currentOffset = 0;
- currentRange = 0;
- }
-
- private void allocateForUncompressed(int size, boolean isDirect) {
- uncompressed = allocateBuffer(size, isDirect);
- }
-
- private void readHeader() throws IOException {
- if (compressed == null || compressed.remaining() <= 0) {
- seek(currentOffset);
- }
- if (compressed.remaining() > OutStream.HEADER_SIZE) {
- int b0 = compressed.get() & 0xff;
- int b1 = compressed.get() & 0xff;
- int b2 = compressed.get() & 0xff;
- boolean isOriginal = (b0 & 0x01) == 1;
- int chunkLength = (b2 << 15) | (b1 << 7) | (b0 >> 1);
-
- if (chunkLength > bufferSize) {
- throw new IllegalArgumentException("Buffer size too small. size = " +
- bufferSize + " needed = " + chunkLength);
- }
- // read 3 bytes, which should be equal to OutStream.HEADER_SIZE always
- assert OutStream.HEADER_SIZE == 3 : "The Orc HEADER_SIZE must be the same in OutStream and InStream";
- currentOffset += OutStream.HEADER_SIZE;
-
- ByteBuffer slice = this.slice(chunkLength);
-
- if (isOriginal) {
- uncompressed = slice;
- isUncompressedOriginal = true;
- } else {
- if (isUncompressedOriginal) {
- allocateForUncompressed(bufferSize, slice.isDirect());
- isUncompressedOriginal = false;
- } else if (uncompressed == null) {
- allocateForUncompressed(bufferSize, slice.isDirect());
- } else {
- uncompressed.clear();
- }
- codec.decompress(slice, uncompressed);
- }
- } else {
- throw new IllegalStateException("Can't read header at " + this);
- }
- }
-
- @Override
- public int read() throws IOException {
- if (uncompressed == null || uncompressed.remaining() == 0) {
- if (currentOffset == length) {
- return -1;
- }
- readHeader();
- }
- return 0xff & uncompressed.get();
- }
-
- @Override
- public int read(byte[] data, int offset, int length) throws IOException {
- if (uncompressed == null || uncompressed.remaining() == 0) {
- if (currentOffset == this.length) {
- return -1;
- }
- readHeader();
- }
- int actualLength = Math.min(length, uncompressed.remaining());
- uncompressed.get(data, offset, actualLength);
- return actualLength;
- }
-
- @Override
- public int available() throws IOException {
- if (uncompressed == null || uncompressed.remaining() == 0) {
- if (currentOffset == length) {
- return 0;
- }
- readHeader();
- }
- return uncompressed.remaining();
- }
-
- @Override
- public void close() {
- uncompressed = null;
- compressed = null;
- currentRange = bytes.size();
- currentOffset = length;
- bytes.clear();
- }
-
- @Override
- public void seek(PositionProvider index) throws IOException {
- seek(index.getNext());
- long uncompressedBytes = index.getNext();
- if (uncompressedBytes != 0) {
- readHeader();
- uncompressed.position(uncompressed.position() +
- (int) uncompressedBytes);
- } else if (uncompressed != null) {
- // mark the uncompressed buffer as done
- uncompressed.position(uncompressed.limit());
- }
- }
-
- /* slices a read only contiguous buffer of chunkLength */
- private ByteBuffer slice(int chunkLength) throws IOException {
- int len = chunkLength;
- final long oldOffset = currentOffset;
- ByteBuffer slice;
- if (compressed.remaining() >= len) {
- slice = compressed.slice();
- // simple case
- slice.limit(len);
- currentOffset += len;
- compressed.position(compressed.position() + len);
- return slice;
- } else if (currentRange >= (bytes.size() - 1)) {
- // nothing has been modified yet
- throw new IOException("EOF in " + this + " while trying to read " +
- chunkLength + " bytes");
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format(
- "Crossing into next BufferChunk because compressed only has %d bytes (needs %d)",
- compressed.remaining(), len));
- }
-
- // we need to consolidate 2 or more buffers into 1
- // first copy out compressed buffers
- ByteBuffer copy = allocateBuffer(chunkLength, compressed.isDirect());
- currentOffset += compressed.remaining();
- len -= compressed.remaining();
- copy.put(compressed);
- ListIterator<DiskRange> iter = bytes.listIterator(currentRange);
-
- while (len > 0 && iter.hasNext()) {
- ++currentRange;
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("Read slow-path, >1 cross block reads with %s", this.toString()));
- }
- DiskRange range = iter.next();
- compressed = range.getData().duplicate();
- if (compressed.remaining() >= len) {
- slice = compressed.slice();
- slice.limit(len);
- copy.put(slice);
- currentOffset += len;
- compressed.position(compressed.position() + len);
- return copy;
- }
- currentOffset += compressed.remaining();
- len -= compressed.remaining();
- copy.put(compressed);
- }
-
- // restore offsets for exception clarity
- seek(oldOffset);
- throw new IOException("EOF in " + this + " while trying to read " +
- chunkLength + " bytes");
- }
-
- private void seek(long desired) throws IOException {
- if (desired == 0 && bytes.isEmpty()) {
- logEmptySeek(name);
- return;
- }
- int i = 0;
- for (DiskRange range : bytes) {
- if (range.getOffset() <= desired && desired < range.getEnd()) {
- currentRange = i;
- compressed = range.getData().duplicate();
- int pos = compressed.position();
- pos += (int)(desired - range.getOffset());
- compressed.position(pos);
- currentOffset = desired;
- return;
- }
- ++i;
- }
- // if they are seeking to the precise end, go ahead and let them go there
- int segments = bytes.size();
- if (segments != 0 && desired == bytes.get(segments - 1).getEnd()) {
- DiskRange range = bytes.get(segments - 1);
- currentRange = segments - 1;
- compressed = range.getData().duplicate();
- compressed.position(compressed.limit());
- currentOffset = desired;
- return;
- }
- throw new IOException("Seek outside of data in " + this + " to " + desired);
- }
-
- private String rangeString() {
- StringBuilder builder = new StringBuilder();
- int i = 0;
- for (DiskRange range : bytes) {
- if (i != 0) {
- builder.append("; ");
- }
- builder.append(" range " + i + " = " + range.getOffset()
- + " to " + (range.getEnd() - range.getOffset()));
- ++i;
- }
- return builder.toString();
- }
-
- @Override
- public String toString() {
- return "compressed stream " + name + " position: " + currentOffset +
- " length: " + length + " range: " + currentRange +
- " offset: " + (compressed == null ? 0 : compressed.position()) + " limit: " + (compressed == null ? 0 : compressed.limit()) +
- rangeString() +
- (uncompressed == null ? "" :
- " uncompressed: " + uncompressed.position() + " to " +
- uncompressed.limit());
- }
- }
-
- public abstract void seek(PositionProvider index) throws IOException;
-
- private static void logEmptySeek(String name) {
- if (LOG.isWarnEnabled()) {
- LOG.warn("Attempting seek into empty stream (" + name + ") Skipping stream.");
- }
- }
-
- /**
- * Create an input stream from a list of buffers.
- * @param fileName name of the file
- * @param streamName the name of the stream
- * @param buffers the list of ranges of bytes for the stream
- * @param offsets a list of offsets (the same length as input) that must
- * contain the first offset of the each set of bytes in input
- * @param length the length in bytes of the stream
- * @param codec the compression codec
- * @param bufferSize the compression buffer size
- * @return an input stream
- * @throws IOException
- */
- @VisibleForTesting
- @Deprecated
- public static InStream create(String streamName,
- ByteBuffer[] buffers,
- long[] offsets,
- long length,
- CompressionCodec codec,
- int bufferSize) throws IOException {
- List<DiskRange> input = new ArrayList<DiskRange>(buffers.length);
- for (int i = 0; i < buffers.length; ++i) {
- input.add(new BufferChunk(buffers[i], offsets[i]));
- }
- return create(streamName, input, length, codec, bufferSize);
- }
-
- /**
- * Create an input stream from a list of disk ranges with data.
- * @param name the name of the stream
- * @param input the list of ranges of bytes for the stream; from disk or cache
- * @param length the length in bytes of the stream
- * @param codec the compression codec
- * @param bufferSize the compression buffer size
- * @param cache Low-level cache to use to put data, if any. Only works with compressed streams.
- * @return an input stream
- * @throws IOException
- */
- public static InStream create(String name,
- List<DiskRange> input,
- long length,
- CompressionCodec codec,
- int bufferSize) throws IOException {
- if (codec == null) {
- return new UncompressedStream(name, input, length);
- } else {
- return new CompressedStream(name, input, length, codec, bufferSize);
- }
- }
-
- /**
- * Creates coded input stream (used for protobuf message parsing) with higher message size limit.
- *
- * @param name the name of the stream
- * @param input the list of ranges of bytes for the stream; from disk or cache
- * @param length the length in bytes of the stream
- * @param codec the compression codec
- * @param bufferSize the compression buffer size
- * @return coded input stream
- * @throws IOException
- */
- public static CodedInputStream createCodedInputStream(
- String name,
- List<DiskRange> input,
- long length,
- CompressionCodec codec,
- int bufferSize) throws IOException {
- InStream inStream = create(name, input, length, codec, bufferSize);
- CodedInputStream codedInputStream = CodedInputStream.newInstance(inStream);
- codedInputStream.setSizeLimit(PROTOBUF_MESSAGE_MAX_LIMIT);
- return codedInputStream;
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerColumnStatistics.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerColumnStatistics.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerColumnStatistics.java
deleted file mode 100644
index 15625fa..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerColumnStatistics.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.io.orc;
-
-/**
- * Statistics for all of the integer columns, such as byte, short, int, and
- * long.
- */
-public interface IntegerColumnStatistics extends ColumnStatistics {
- /**
- * Get the smallest value in the column. Only defined if getNumberOfValues
- * is non-zero.
- * @return the minimum
- */
- long getMinimum();
-
- /**
- * Get the largest value in the column. Only defined if getNumberOfValues
- * is non-zero.
- * @return the maximum
- */
- long getMaximum();
-
- /**
- * Is the sum defined? If the sum overflowed the counter this will be false.
- * @return is the sum available
- */
- boolean isSumDefined();
-
- /**
- * Get the sum of the column. Only valid if isSumDefined returns true.
- * @return the sum of the column
- */
- long getSum();
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java
deleted file mode 100644
index b1c7f0a..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.io.orc;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
-
-/**
- * Interface for reading integers.
- */
-public interface IntegerReader {
-
- /**
- * Seek to the position provided by index.
- * @param index
- * @throws IOException
- */
- void seek(PositionProvider index) throws IOException;
-
- /**
- * Skip number of specified rows.
- * @param numValues
- * @throws IOException
- */
- void skip(long numValues) throws IOException;
-
- /**
- * Check if there are any more values left.
- * @return
- * @throws IOException
- */
- boolean hasNext() throws IOException;
-
- /**
- * Return the next available value.
- * @return
- * @throws IOException
- */
- long next() throws IOException;
-
- /**
- * Return the next available vector for values.
- * @return
- * @throws IOException
- */
- void nextVector(LongColumnVector previous, long previousLen)
- throws IOException;
-
- void setInStream(InStream data);
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerWriter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerWriter.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerWriter.java
deleted file mode 100644
index 775d02e..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerWriter.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.io.orc;
-
-import java.io.IOException;
-
-/**
- * Interface for writing integers.
- */
-interface IntegerWriter {
-
- /**
- * Get position from the stream.
- * @param recorder
- * @throws IOException
- */
- void getPosition(PositionRecorder recorder) throws IOException;
-
- /**
- * Write the integer value
- * @param value
- * @throws IOException
- */
- void write(long value) throws IOException;
-
- /**
- * Flush the buffer
- * @throws IOException
- */
- void flush() throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/ql/src/java/org/apache/hadoop/hive/ql/io/orc/JsonFileDump.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/JsonFileDump.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/JsonFileDump.java
index f9a6d9e..b746390 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/JsonFileDump.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/JsonFileDump.java
@@ -26,6 +26,20 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO;
import org.codehaus.jettison.json.JSONArray;
+import org.apache.orc.BinaryColumnStatistics;
+import org.apache.orc.BooleanColumnStatistics;
+import org.apache.orc.ColumnStatistics;
+import org.apache.orc.impl.ColumnStatisticsImpl;
+import org.apache.orc.DateColumnStatistics;
+import org.apache.orc.DecimalColumnStatistics;
+import org.apache.orc.DoubleColumnStatistics;
+import org.apache.orc.IntegerColumnStatistics;
+import org.apache.orc.impl.OrcIndex;
+import org.apache.orc.OrcProto;
+import org.apache.orc.StringColumnStatistics;
+import org.apache.orc.StripeInformation;
+import org.apache.orc.StripeStatistics;
+import org.apache.orc.TimestampColumnStatistics;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.codehaus.jettison.json.JSONStringer;
@@ -151,7 +165,7 @@ public class JsonFileDump {
for (int colIdx : rowIndexCols) {
sargColumns[colIdx] = true;
}
- RecordReaderImpl.Index indices = rows.readRowIndex(stripeIx, null, sargColumns);
+ OrcIndex indices = rows.readRowIndex(stripeIx, null, sargColumns);
writer.key("indexes").array();
for (int col : rowIndexCols) {
writer.object();
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java
deleted file mode 100644
index bb35b13..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.io.orc;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-
-import com.google.common.base.Preconditions;
-
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * Implements a memory manager that keeps a global context of how many ORC
- * writers there are and manages the memory between them. For use cases with
- * dynamic partitions, it is easy to end up with many writers in the same task.
- * By managing the size of each allocation, we try to cut down the size of each
- * allocation and keep the task from running out of memory.
- *
- * This class is not thread safe, but is re-entrant - ensure creation and all
- * invocations are triggered from the same thread.
- */
-class MemoryManager {
-
- private static final Logger LOG = LoggerFactory.getLogger(MemoryManager.class);
-
- /**
- * How often should we check the memory sizes? Measured in rows added
- * to all of the writers.
- */
- private static final int ROWS_BETWEEN_CHECKS = 5000;
- private final long totalMemoryPool;
- private final Map<Path, WriterInfo> writerList =
- new HashMap<Path, WriterInfo>();
- private long totalAllocation = 0;
- private double currentScale = 1;
- private int rowsAddedSinceCheck = 0;
- private final OwnedLock ownerLock = new OwnedLock();
-
- @SuppressWarnings("serial")
- private static class OwnedLock extends ReentrantLock {
- public Thread getOwner() {
- return super.getOwner();
- }
- }
-
- private static class WriterInfo {
- long allocation;
- Callback callback;
- WriterInfo(long allocation, Callback callback) {
- this.allocation = allocation;
- this.callback = callback;
- }
- }
-
- public interface Callback {
- /**
- * The writer needs to check its memory usage
- * @param newScale the current scale factor for memory allocations
- * @return true if the writer was over the limit
- * @throws IOException
- */
- boolean checkMemory(double newScale) throws IOException;
- }
-
- /**
- * Create the memory manager.
- * @param conf use the configuration to find the maximum size of the memory
- * pool.
- */
- MemoryManager(Configuration conf) {
- double maxLoad = OrcConf.MEMORY_POOL.getDouble(conf);
- totalMemoryPool = Math.round(ManagementFactory.getMemoryMXBean().
- getHeapMemoryUsage().getMax() * maxLoad);
- ownerLock.lock();
- }
-
- /**
- * Light weight thread-safety check for multi-threaded access patterns
- */
- private void checkOwner() {
- Preconditions.checkArgument(ownerLock.isHeldByCurrentThread(),
- "Owner thread expected %s, got %s",
- ownerLock.getOwner(),
- Thread.currentThread());
- }
-
- /**
- * Add a new writer's memory allocation to the pool. We use the path
- * as a unique key to ensure that we don't get duplicates.
- * @param path the file that is being written
- * @param requestedAllocation the requested buffer size
- */
- void addWriter(Path path, long requestedAllocation,
- Callback callback) throws IOException {
- checkOwner();
- WriterInfo oldVal = writerList.get(path);
- // this should always be null, but we handle the case where the memory
- // manager wasn't told that a writer wasn't still in use and the task
- // starts writing to the same path.
- if (oldVal == null) {
- oldVal = new WriterInfo(requestedAllocation, callback);
- writerList.put(path, oldVal);
- totalAllocation += requestedAllocation;
- } else {
- // handle a new writer that is writing to the same path
- totalAllocation += requestedAllocation - oldVal.allocation;
- oldVal.allocation = requestedAllocation;
- oldVal.callback = callback;
- }
- updateScale(true);
- }
-
- /**
- * Remove the given writer from the pool.
- * @param path the file that has been closed
- */
- void removeWriter(Path path) throws IOException {
- checkOwner();
- WriterInfo val = writerList.get(path);
- if (val != null) {
- writerList.remove(path);
- totalAllocation -= val.allocation;
- if (writerList.isEmpty()) {
- rowsAddedSinceCheck = 0;
- }
- updateScale(false);
- }
- if(writerList.isEmpty()) {
- rowsAddedSinceCheck = 0;
- }
- }
-
- /**
- * Get the total pool size that is available for ORC writers.
- * @return the number of bytes in the pool
- */
- long getTotalMemoryPool() {
- return totalMemoryPool;
- }
-
- /**
- * The scaling factor for each allocation to ensure that the pool isn't
- * oversubscribed.
- * @return a fraction between 0.0 and 1.0 of the requested size that is
- * available for each writer.
- */
- double getAllocationScale() {
- return currentScale;
- }
-
- /**
- * Give the memory manager an opportunity for doing a memory check.
- * @param rows number of rows added
- * @throws IOException
- */
- void addedRow(int rows) throws IOException {
- rowsAddedSinceCheck += rows;
- if (rowsAddedSinceCheck >= ROWS_BETWEEN_CHECKS) {
- notifyWriters();
- }
- }
-
- /**
- * Notify all of the writers that they should check their memory usage.
- * @throws IOException
- */
- void notifyWriters() throws IOException {
- checkOwner();
- LOG.debug("Notifying writers after " + rowsAddedSinceCheck);
- for(WriterInfo writer: writerList.values()) {
- boolean flushed = writer.callback.checkMemory(currentScale);
- if (LOG.isDebugEnabled() && flushed) {
- LOG.debug("flushed " + writer.toString());
- }
- }
- rowsAddedSinceCheck = 0;
- }
-
- /**
- * Update the currentScale based on the current allocation and pool size.
- * This also updates the notificationTrigger.
- * @param isAllocate is this an allocation?
- */
- private void updateScale(boolean isAllocate) throws IOException {
- if (totalAllocation <= totalMemoryPool) {
- currentScale = 1;
- } else {
- currentScale = (double) totalMemoryPool / totalAllocation;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReader.java
deleted file mode 100644
index cea324c..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReader.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.io.orc;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hive.ql.io.orc.OrcProto.BloomFilterIndex;
-import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndex;
-import org.apache.hadoop.hive.ql.io.orc.OrcProto.StripeFooter;
-
-public interface MetadataReader {
- RecordReaderImpl.Index readRowIndex(StripeInformation stripe, StripeFooter footer,
- boolean[] included, RowIndex[] indexes, boolean[] sargColumns,
- BloomFilterIndex[] bloomFilterIndices) throws IOException;
-
- StripeFooter readStripeFooter(StripeInformation stripe) throws IOException;
-
- void close() throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReaderImpl.java
deleted file mode 100644
index 4624927..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReaderImpl.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.io.orc;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.io.DiskRange;
-import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.BufferChunk;
-
-import com.google.common.collect.Lists;
-
-public class MetadataReaderImpl implements MetadataReader {
- private final FSDataInputStream file;
- private final CompressionCodec codec;
- private final int bufferSize;
- private final int typeCount;
-
- public MetadataReaderImpl(FileSystem fileSystem, Path path,
- CompressionCodec codec, int bufferSize, int typeCount) throws IOException {
- this(fileSystem.open(path), codec, bufferSize, typeCount);
- }
-
- public MetadataReaderImpl(FSDataInputStream file,
- CompressionCodec codec, int bufferSize, int typeCount) {
- this.file = file;
- this.codec = codec;
- this.bufferSize = bufferSize;
- this.typeCount = typeCount;
- }
-
- @Override
- public RecordReaderImpl.Index readRowIndex(StripeInformation stripe,
- OrcProto.StripeFooter footer, boolean[] included, OrcProto.RowIndex[] indexes,
- boolean[] sargColumns, OrcProto.BloomFilterIndex[] bloomFilterIndices) throws IOException {
- if (footer == null) {
- footer = readStripeFooter(stripe);
- }
- if (indexes == null) {
- indexes = new OrcProto.RowIndex[typeCount];
- }
- if (bloomFilterIndices == null) {
- bloomFilterIndices = new OrcProto.BloomFilterIndex[typeCount];
- }
- long offset = stripe.getOffset();
- List<OrcProto.Stream> streams = footer.getStreamsList();
- for (int i = 0; i < streams.size(); i++) {
- OrcProto.Stream stream = streams.get(i);
- OrcProto.Stream nextStream = null;
- if (i < streams.size() - 1) {
- nextStream = streams.get(i+1);
- }
- int col = stream.getColumn();
- int len = (int) stream.getLength();
- // row index stream and bloom filter are interlaced, check if the sarg column contains bloom
- // filter and combine the io to read row index and bloom filters for that column together
- if (stream.hasKind() && (stream.getKind() == OrcProto.Stream.Kind.ROW_INDEX)) {
- boolean readBloomFilter = false;
- if (sargColumns != null && sargColumns[col] &&
- nextStream.getKind() == OrcProto.Stream.Kind.BLOOM_FILTER) {
- len += nextStream.getLength();
- i += 1;
- readBloomFilter = true;
- }
- if ((included == null || included[col]) && indexes[col] == null) {
- byte[] buffer = new byte[len];
- file.readFully(offset, buffer, 0, buffer.length);
- ByteBuffer bb = ByteBuffer.wrap(buffer);
- indexes[col] = OrcProto.RowIndex.parseFrom(InStream.create("index",
- Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)), stream.getLength(),
- codec, bufferSize));
- if (readBloomFilter) {
- bb.position((int) stream.getLength());
- bloomFilterIndices[col] = OrcProto.BloomFilterIndex.parseFrom(InStream.create(
- "bloom_filter", Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)),
- nextStream.getLength(), codec, bufferSize));
- }
- }
- }
- offset += len;
- }
-
- RecordReaderImpl.Index index = new RecordReaderImpl.Index(indexes, bloomFilterIndices);
- return index;
- }
-
- @Override
- public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws IOException {
- long offset = stripe.getOffset() + stripe.getIndexLength() + stripe.getDataLength();
- int tailLength = (int) stripe.getFooterLength();
-
- // read the footer
- ByteBuffer tailBuf = ByteBuffer.allocate(tailLength);
- file.readFully(offset, tailBuf.array(), tailBuf.arrayOffset(), tailLength);
- return OrcProto.StripeFooter.parseFrom(InStream.createCodedInputStream("footer",
- Lists.<DiskRange>newArrayList(new BufferChunk(tailBuf, 0)),
- tailLength, codec, bufferSize));
- }
-
- @Override
- public void close() throws IOException {
- file.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcConf.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcConf.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcConf.java
deleted file mode 100644
index 132889c..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcConf.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.io.orc;
-
-import org.apache.hadoop.conf.Configuration;
-
-import java.util.Properties;
-
-/**
- * Define the configuration properties that Orc understands.
- */
-public enum OrcConf {
- STRIPE_SIZE("orc.stripe.size", "hive.exec.orc.default.stripe.size",
- 64L * 1024 * 1024,
- "Define the default ORC stripe size, in bytes."),
- BLOCK_SIZE("orc.block.size", "hive.exec.orc.default.block.size",
- 256L * 1024 * 1024,
- "Define the default file system block size for ORC files."),
- ENABLE_INDEXES("orc.create.index", "orc.create.index", true,
- "Should the ORC writer create indexes as part of the file."),
- ROW_INDEX_STRIDE("orc.row.index.stride",
- "hive.exec.orc.default.row.index.stride", 10000,
- "Define the default ORC index stride in number of rows. (Stride is the\n"+
- " number of rows n index entry represents.)"),
- BUFFER_SIZE("orc.compress.size", "hive.exec.orc.default.buffer.size",
- 256 * 1024, "Define the default ORC buffer size, in bytes."),
- BLOCK_PADDING("orc.block.padding", "hive.exec.orc.default.block.padding",
- true,
- "Define whether stripes should be padded to the HDFS block boundaries."),
- COMPRESS("orc.compress", "hive.exec.orc.default.compress", "ZLIB",
- "Define the default compression codec for ORC file"),
- WRITE_FORMAT("orc.write.format", "hive.exec.orc.write.format", "0.12",
- "Define the version of the file to write. Possible values are 0.11 and\n"+
- " 0.12. If this parameter is not defined, ORC will use the run\n" +
- " length encoding (RLE) introduced in Hive 0.12."),
- ENCODING_STRATEGY("orc.encoding.strategy", "hive.exec.orc.encoding.strategy",
- "SPEED",
- "Define the encoding strategy to use while writing data. Changing this\n"+
- "will only affect the light weight encoding for integers. This\n" +
- "flag will not change the compression level of higher level\n" +
- "compression codec (like ZLIB)."),
- COMPRESSION_STRATEGY("orc.compression.strategy",
- "hive.exec.orc.compression.strategy", "SPEED",
- "Define the compression strategy to use while writing data.\n" +
- "This changes the compression level of higher level compression\n" +
- "codec (like ZLIB)."),
- BLOCK_PADDING_TOLERANCE("orc.block.padding.tolerance",
- "hive.exec.orc.block.padding.tolerance", 0.05,
- "Define the tolerance for block padding as a decimal fraction of\n" +
- "stripe size (for example, the default value 0.05 is 5% of the\n" +
- "stripe size). For the defaults of 64Mb ORC stripe and 256Mb HDFS\n" +
- "blocks, the default block padding tolerance of 5% will\n" +
- "reserve a maximum of 3.2Mb for padding within the 256Mb block.\n" +
- "In that case, if the available size within the block is more than\n"+
- "3.2Mb, a new smaller stripe will be inserted to fit within that\n" +
- "space. This will make sure that no stripe written will block\n" +
- " boundaries and cause remote reads within a node local task."),
- BLOOM_FILTER_FPP("orc.bloom.filter.fpp", "orc.default.bloom.fpp", 0.05,
- "Define the default false positive probability for bloom filters."),
- USE_ZEROCOPY("orc.use.zerocopy", "hive.exec.orc.zerocopy", false,
- "Use zerocopy reads with ORC. (This requires Hadoop 2.3 or later.)"),
- SKIP_CORRUPT_DATA("orc.skip.corrupt.data", "hive.exec.orc.skip.corrupt.data",
- false,
- "If ORC reader encounters corrupt data, this value will be used to\n" +
- "determine whether to skip the corrupt data or throw exception.\n" +
- "The default behavior is to throw exception."),
- MEMORY_POOL("orc.memory.pool", "hive.exec.orc.memory.pool", 0.5,
- "Maximum fraction of heap that can be used by ORC file writers"),
- DICTIONARY_KEY_SIZE_THRESHOLD("orc.dictionary.key.threshold",
- "hive.exec.orc.dictionary.key.size.threshold",
- 0.8,
- "If the number of distinct keys in a dictionary is greater than this\n" +
- "fraction of the total number of non-null rows, turn off \n" +
- "dictionary encoding. Use 1 to always use dictionary encoding."),
- ROW_INDEX_STRIDE_DICTIONARY_CHECK("orc.dictionary.early.check",
- "hive.orc.row.index.stride.dictionary.check",
- true,
- "If enabled dictionary check will happen after first row index stride\n" +
- "(default 10000 rows) else dictionary check will happen before\n" +
- "writing first stripe. In both cases, the decision to use\n" +
- "dictionary or not will be retained thereafter."),
- BLOOM_FILTER_COLUMNS("orc.bloom.filter.columns", "orc.bloom.filter.columns",
- "", "List of columns to create bloom filters for when writing.")
- ;
-
- private final String attribute;
- private final String hiveConfName;
- private final Object defaultValue;
- private final String description;
-
- OrcConf(String attribute,
- String hiveConfName,
- Object defaultValue,
- String description) {
- this.attribute = attribute;
- this.hiveConfName = hiveConfName;
- this.defaultValue = defaultValue;
- this.description = description;
- }
-
- public String getAttribute() {
- return attribute;
- }
-
- public String getHiveConfName() {
- return hiveConfName;
- }
-
- public Object getDefaultValue() {
- return defaultValue;
- }
-
- public String getDescription() {
- return description;
- }
-
- private String lookupValue(Properties tbl, Configuration conf) {
- String result = null;
- if (tbl != null) {
- result = tbl.getProperty(attribute);
- }
- if (result == null && conf != null) {
- result = conf.get(attribute);
- if (result == null) {
- result = conf.get(hiveConfName);
- }
- }
- return result;
- }
-
- public long getLong(Properties tbl, Configuration conf) {
- String value = lookupValue(tbl, conf);
- if (value != null) {
- return Long.parseLong(value);
- }
- return ((Number) defaultValue).longValue();
- }
-
- public long getLong(Configuration conf) {
- return getLong(null, conf);
- }
-
- public String getString(Properties tbl, Configuration conf) {
- String value = lookupValue(tbl, conf);
- return value == null ? (String) defaultValue : value;
- }
-
- public String getString(Configuration conf) {
- return getString(null, conf);
- }
-
- public boolean getBoolean(Properties tbl, Configuration conf) {
- String value = lookupValue(tbl, conf);
- if (value != null) {
- return Boolean.parseBoolean(value);
- }
- return (Boolean) defaultValue;
- }
-
- public boolean getBoolean(Configuration conf) {
- return getBoolean(null, conf);
- }
-
- public double getDouble(Properties tbl, Configuration conf) {
- String value = lookupValue(tbl, conf);
- if (value != null) {
- return Double.parseDouble(value);
- }
- return ((Number) defaultValue).doubleValue();
- }
-
- public double getDouble(Configuration conf) {
- return getDouble(null, conf);
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
index 56ac40b..975825a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
@@ -19,8 +19,6 @@
package org.apache.hadoop.hive.ql.io.orc;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
@@ -28,132 +26,17 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.orc.FileMetadata;
+import org.apache.orc.impl.MemoryManager;
+import org.apache.orc.TypeDescription;
/**
* Contains factory methods to read or write ORC files.
*/
-public final class OrcFile {
-
- public static final String MAGIC = "ORC";
-
- /**
- * Create a version number for the ORC file format, so that we can add
- * non-forward compatible changes in the future. To make it easier for users
- * to understand the version numbers, we use the Hive release number that
- * first wrote that version of ORC files.
- *
- * Thus, if you add new encodings or other non-forward compatible changes
- * to ORC files, which prevent the old reader from reading the new format,
- * you should change these variable to reflect the next Hive release number.
- * Non-forward compatible changes should never be added in patch releases.
- *
- * Do not make any changes that break backwards compatibility, which would
- * prevent the new reader from reading ORC files generated by any released
- * version of Hive.
- */
- public enum Version {
- V_0_11("0.11", 0, 11),
- V_0_12("0.12", 0, 12);
-
- public static final Version CURRENT = V_0_12;
-
- private final String name;
- private final int major;
- private final int minor;
-
- Version(String name, int major, int minor) {
- this.name = name;
- this.major = major;
- this.minor = minor;
- }
-
- public static Version byName(String name) {
- for(Version version: values()) {
- if (version.name.equals(name)) {
- return version;
- }
- }
- throw new IllegalArgumentException("Unknown ORC version " + name);
- }
-
- /**
- * Get the human readable name for the version.
- */
- public String getName() {
- return name;
- }
-
- /**
- * Get the major version number.
- */
- public int getMajor() {
- return major;
- }
-
- /**
- * Get the minor version number.
- */
- public int getMinor() {
- return minor;
- }
- }
-
- /**
- * Records the version of the writer in terms of which bugs have been fixed.
- * For bugs in the writer, but the old readers already read the new data
- * correctly, bump this version instead of the Version.
- */
- public enum WriterVersion {
- ORIGINAL(0),
- HIVE_8732(1), // corrupted stripe/file maximum column statistics
- HIVE_4243(2), // use real column names from Hive tables
-// Don't use any magic numbers here except for the below:
- FUTURE(Integer.MAX_VALUE); // a version from a future writer
-
- private final int id;
-
- public int getId() {
- return id;
- }
-
- WriterVersion(int id) {
- this.id = id;
- }
-
- private static final WriterVersion[] values;
- static {
- // Assumes few non-negative values close to zero.
- int max = Integer.MIN_VALUE;
- for (WriterVersion v : WriterVersion.values()) {
- if (v.id < 0) throw new AssertionError();
- if (v.id > max && FUTURE.id != v.id) {
- max = v.id;
- }
- }
- values = new WriterVersion[max + 1];
- for (WriterVersion v : WriterVersion.values()) {
- if (v.id < values.length) {
- values[v.id] = v;
- }
- }
- }
-
- public static WriterVersion from(int val) {
- if (val == FUTURE.id) return FUTURE; // Special handling for the magic value.
- return values[val];
- }
- }
-
- public enum EncodingStrategy {
- SPEED, COMPRESSION
- }
-
- public enum CompressionStrategy {
- SPEED, COMPRESSION
- }
+public final class OrcFile extends org.apache.orc.OrcFile {
// unused
- private OrcFile() {}
+ protected OrcFile() {}
/**
* Create an ORC file reader.
@@ -162,62 +45,32 @@ public final class OrcFile {
* @return a new ORC file reader.
* @throws IOException
*/
- public static Reader createReader(FileSystem fs, Path path
- ) throws IOException {
+ public static Reader createReader(FileSystem fs,
+ Path path) throws IOException {
ReaderOptions opts = new ReaderOptions(new Configuration());
opts.filesystem(fs);
return new ReaderImpl(path, opts);
}
- public static class ReaderOptions {
- private final Configuration conf;
- private FileSystem filesystem;
- private FileMetaInfo fileMetaInfo; // TODO: this comes from some place.
- private long maxLength = Long.MAX_VALUE;
- private FileMetadata fullFileMetadata; // Propagate from LLAP cache.
-
+ public static class ReaderOptions extends org.apache.orc.OrcFile.ReaderOptions {
public ReaderOptions(Configuration conf) {
- this.conf = conf;
- }
- ReaderOptions fileMetaInfo(FileMetaInfo info) {
- fileMetaInfo = info;
- return this;
+ super(conf);
}
public ReaderOptions filesystem(FileSystem fs) {
- this.filesystem = fs;
+ super.filesystem(fs);
return this;
}
public ReaderOptions maxLength(long val) {
- maxLength = val;
+ super.maxLength(val);
return this;
}
public ReaderOptions fileMetadata(FileMetadata metadata) {
- this.fullFileMetadata = metadata;
+ super.fileMetadata(metadata);
return this;
}
-
- Configuration getConfiguration() {
- return conf;
- }
-
- FileSystem getFilesystem() {
- return filesystem;
- }
-
- FileMetaInfo getFileMetaInfo() {
- return fileMetaInfo;
- }
-
- long getMaxLength() {
- return maxLength;
- }
-
- FileMetadata getFileMetadata() {
- return fullFileMetadata;
- }
}
public static ReaderOptions readerOptions(Configuration conf) {
@@ -229,71 +82,39 @@ public final class OrcFile {
return new ReaderImpl(path, options);
}
- public interface WriterContext {
- Writer getWriter();
- }
-
- public interface WriterCallback {
- void preStripeWrite(WriterContext context) throws IOException;
- void preFooterWrite(WriterContext context) throws IOException;
- }
-
/**
* Options for creating ORC file writers.
*/
- public static class WriterOptions {
- private final Configuration configuration;
- private FileSystem fileSystemValue = null;
+ public static class WriterOptions extends org.apache.orc.OrcFile.WriterOptions {
private boolean explicitSchema = false;
- private TypeDescription schema = null;
private ObjectInspector inspector = null;
- private long stripeSizeValue;
- private long blockSizeValue;
- private int rowIndexStrideValue;
- private int bufferSizeValue;
- private boolean blockPaddingValue;
- private CompressionKind compressValue;
- private MemoryManager memoryManagerValue;
- private Version versionValue;
- private WriterCallback callback;
- private EncodingStrategy encodingStrategy;
- private CompressionStrategy compressionStrategy;
- private double paddingTolerance;
- private String bloomFilterColumns;
- private double bloomFilterFpp;
WriterOptions(Properties tableProperties, Configuration conf) {
- configuration = conf;
- memoryManagerValue = getMemoryManager(conf);
- stripeSizeValue = OrcConf.STRIPE_SIZE.getLong(tableProperties, conf);
- blockSizeValue = OrcConf.BLOCK_SIZE.getLong(tableProperties, conf);
- rowIndexStrideValue =
- (int) OrcConf.ROW_INDEX_STRIDE.getLong(tableProperties, conf);
- bufferSizeValue = (int) OrcConf.BUFFER_SIZE.getLong(tableProperties,
- conf);
- blockPaddingValue =
- OrcConf.BLOCK_PADDING.getBoolean(tableProperties, conf);
- compressValue =
- CompressionKind.valueOf(OrcConf.COMPRESS.getString(tableProperties,
- conf));
- String versionName = OrcConf.WRITE_FORMAT.getString(tableProperties,
- conf);
- versionValue = Version.byName(versionName);
- String enString = OrcConf.ENCODING_STRATEGY.getString(tableProperties,
- conf);
- encodingStrategy = EncodingStrategy.valueOf(enString);
-
- String compString =
- OrcConf.COMPRESSION_STRATEGY.getString(tableProperties, conf);
- compressionStrategy = CompressionStrategy.valueOf(compString);
-
- paddingTolerance =
- OrcConf.BLOCK_PADDING_TOLERANCE.getDouble(tableProperties, conf);
-
- bloomFilterColumns = OrcConf.BLOOM_FILTER_COLUMNS.getString(tableProperties,
- conf);
- bloomFilterFpp = OrcConf.BLOOM_FILTER_FPP.getDouble(tableProperties,
- conf);
+ super(tableProperties, conf);
+ }
+
+ /**
+ * A required option that sets the object inspector for the rows. If
+ * setSchema is not called, it also defines the schema.
+ */
+ public WriterOptions inspector(ObjectInspector value) {
+ this.inspector = value;
+ if (!explicitSchema) {
+ super.setSchema(OrcInputFormat.convertTypeInfo(
+ TypeInfoUtils.getTypeInfoFromObjectInspector(value)));
+ }
+ return this;
+ }
+
+ /**
+ * Set the schema for the file. This is a required parameter.
+ * @param schema the schema for the file.
+ * @return this
+ */
+ public WriterOptions setSchema(TypeDescription schema) {
+ this.explicitSchema = true;
+ super.setSchema(schema);
+ return this;
}
/**
@@ -301,7 +122,7 @@ public final class OrcFile {
* If it is not provided, it will be found from the path.
*/
public WriterOptions fileSystem(FileSystem value) {
- fileSystemValue = value;
+ super.fileSystem(value);
return this;
}
@@ -311,7 +132,7 @@ public final class OrcFile {
* is flushed to the HDFS file and the next stripe started.
*/
public WriterOptions stripeSize(long value) {
- stripeSizeValue = value;
+ super.stripeSize(value);
return this;
}
@@ -320,7 +141,7 @@ public final class OrcFile {
* set the block size to be multiple factors of stripe size.
*/
public WriterOptions blockSize(long value) {
- blockSizeValue = value;
+ super.blockSize(value);
return this;
}
@@ -330,7 +151,7 @@ public final class OrcFile {
* set to 0, no indexes will be included in the file.
*/
public WriterOptions rowIndexStride(int value) {
- rowIndexStrideValue = value;
+ super.rowIndexStride(value);
return this;
}
@@ -339,7 +160,7 @@ public final class OrcFile {
* stripe in memory.
*/
public WriterOptions bufferSize(int value) {
- bufferSizeValue = value;
+ super.bufferSize(value);
return this;
}
@@ -349,7 +170,7 @@ public final class OrcFile {
* reading, but costs space.
*/
public WriterOptions blockPadding(boolean value) {
- blockPaddingValue = value;
+ super.blockPadding(value);
return this;
}
@@ -357,7 +178,7 @@ public final class OrcFile {
* Sets the encoding strategy that is used to encode the data.
*/
public WriterOptions encodingStrategy(EncodingStrategy strategy) {
- encodingStrategy = strategy;
+ super.encodingStrategy(strategy);
return this;
}
@@ -365,7 +186,7 @@ public final class OrcFile {
* Sets the tolerance for block padding as a percentage of stripe size.
*/
public WriterOptions paddingTolerance(double value) {
- paddingTolerance = value;
+ super.paddingTolerance(value);
return this;
}
@@ -373,7 +194,7 @@ public final class OrcFile {
* Comma separated values of column names for which bloom filter is to be created.
*/
public WriterOptions bloomFilterColumns(String columns) {
- bloomFilterColumns = columns;
+ super.bloomFilterColumns(columns);
return this;
}
@@ -383,7 +204,7 @@ public final class OrcFile {
* @return this
*/
public WriterOptions bloomFilterFpp(double fpp) {
- bloomFilterFpp = fpp;
+ super.bloomFilterFpp(fpp);
return this;
}
@@ -391,31 +212,15 @@ public final class OrcFile {
* Sets the generic compression that is used to compress the data.
*/
public WriterOptions compress(CompressionKind value) {
- compressValue = value;
+ super.compress(value.getUnderlying());
return this;
}
/**
- * A required option that sets the object inspector for the rows. If
- * setSchema is not called, it also defines the schema.
- */
- public WriterOptions inspector(ObjectInspector value) {
- this.inspector = value;
- if (!explicitSchema) {
- schema = OrcUtils.convertTypeInfo(
- TypeInfoUtils.getTypeInfoFromObjectInspector(value));
- }
- return this;
- }
-
- /**
- * Set the schema for the file. This is a required parameter.
- * @param schema the schema for the file.
- * @return this
+ * Sets the generic compression that is used to compress the data.
*/
- public WriterOptions setSchema(TypeDescription schema) {
- this.explicitSchema = true;
- this.schema = schema;
+ public WriterOptions compress(org.apache.orc.CompressionKind value) {
+ super.compress(value);
return this;
}
@@ -423,7 +228,7 @@ public final class OrcFile {
* Sets the version of the file that will be written.
*/
public WriterOptions version(Version value) {
- versionValue = value;
+ super.version(value);
return this;
}
@@ -433,18 +238,17 @@ public final class OrcFile {
* @return this
*/
public WriterOptions callback(WriterCallback callback) {
- this.callback = callback;
+ super.callback(callback);
return this;
}
/**
* A package local option to set the memory manager.
*/
- WriterOptions memory(MemoryManager value) {
- memoryManagerValue = value;
+ protected WriterOptions memory(MemoryManager value) {
+ super.memory(value);
return this;
}
-
}
/**
@@ -479,18 +283,19 @@ public final class OrcFile {
public static Writer createWriter(Path path,
WriterOptions opts
) throws IOException {
- FileSystem fs = opts.fileSystemValue == null ?
- path.getFileSystem(opts.configuration) : opts.fileSystemValue;
-
- return new WriterImpl(fs, path, opts.configuration, opts.inspector,
- opts.schema,
- opts.stripeSizeValue, opts.compressValue,
- opts.bufferSizeValue, opts.rowIndexStrideValue,
- opts.memoryManagerValue, opts.blockPaddingValue,
- opts.versionValue, opts.callback,
- opts.encodingStrategy, opts.compressionStrategy,
- opts.paddingTolerance, opts.blockSizeValue,
- opts.bloomFilterColumns, opts.bloomFilterFpp);
+ FileSystem fs = opts.getFileSystem() == null ?
+ path.getFileSystem(opts.getConfiguration()) : opts.getFileSystem();
+
+ return new WriterImpl(fs, path, opts.getConfiguration(), opts.inspector,
+ opts.getSchema(),
+ opts.getStripeSize(), opts.getCompress(),
+ opts.getBufferSize(), opts.getRowIndexStride(),
+ opts.getMemoryManager(), opts.getBlockPadding(),
+ opts.getVersion(), opts.getCallback(),
+ opts.getEncodingStrategy(),
+ opts.getCompressionStrategy(),
+ opts.getPaddingTolerance(), opts.getBlockSize(),
+ opts.getBloomFilterColumns(), opts.getBloomFilterFpp());
}
/**
@@ -515,29 +320,12 @@ public final class OrcFile {
CompressionKind compress,
int bufferSize,
int rowIndexStride) throws IOException {
- return createWriter(path,
- writerOptions(conf)
- .fileSystem(fs)
- .inspector(inspector)
- .stripeSize(stripeSize)
- .compress(compress)
- .bufferSize(bufferSize)
- .rowIndexStride(rowIndexStride));
- }
-
- private static ThreadLocal<MemoryManager> memoryManager = null;
-
- private static synchronized MemoryManager getMemoryManager(
- final Configuration conf) {
- if (memoryManager == null) {
- memoryManager = new ThreadLocal<MemoryManager>() {
- @Override
- protected MemoryManager initialValue() {
- return new MemoryManager(conf);
- }
- };
- }
- return memoryManager.get();
+ return createWriter(path, writerOptions(conf)
+ .inspector(inspector)
+ .fileSystem(fs)
+ .stripeSize(stripeSize)
+ .compress(compress)
+ .bufferSize(bufferSize)
+ .rowIndexStride(rowIndexStride));
}
-
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileKeyWrapper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileKeyWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileKeyWrapper.java
index 8eda37f..40f1da0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileKeyWrapper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileKeyWrapper.java
@@ -25,6 +25,8 @@ import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcProto;
/**
* Key for OrcFileMergeMapper task. Contains orc file related information that
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeRecordReader.java
index 41a97a3..f06195f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeRecordReader.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.RecordReader;
+import org.apache.orc.OrcProto;
+import org.apache.orc.StripeInformation;
public class OrcFileStripeMergeRecordReader implements
RecordReader<OrcFileKeyWrapper, OrcFileValueWrapper> {
@@ -98,7 +100,7 @@ public class OrcFileStripeMergeRecordReader implements
valueWrapper.setUserMetadata(((ReaderImpl) reader).getOrcProtoUserMetadata());
}
keyWrapper.setInputPath(path);
- keyWrapper.setCompression(reader.getCompression());
+ keyWrapper.setCompression(reader.getCompressionKind());
keyWrapper.setCompressBufferSize(reader.getCompressionSize());
keyWrapper.setVersion(reader.getFileVersion());
keyWrapper.setRowIndexStride(reader.getRowIndexStride());
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileValueWrapper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileValueWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileValueWrapper.java
index 7389ef5..846c874 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileValueWrapper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileValueWrapper.java
@@ -24,6 +24,8 @@ import java.io.IOException;
import java.util.List;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.orc.OrcProto;
+import org.apache.orc.StripeInformation;
/**
* Value for OrcFileMergeMapper. Contains stripe related information for the
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 0567cbe..b3b48d5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -39,6 +39,23 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.codec.binary.Hex;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.typeinfo.BaseCharTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
+import org.apache.orc.ColumnStatistics;
+import org.apache.orc.FileMetaInfo;
+import org.apache.orc.OrcUtils;
+import org.apache.orc.StripeInformation;
+import org.apache.orc.StripeStatistics;
+import org.apache.orc.TypeDescription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -63,8 +80,6 @@ import org.apache.hadoop.hive.ql.io.LlapWrappableInputFormatInterface;
import org.apache.hadoop.hive.ql.io.RecordIdentifier;
import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface;
import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader;
-import org.apache.hadoop.hive.ql.io.orc.OrcFile.WriterVersion;
-import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
@@ -86,6 +101,7 @@ import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.StringUtils;
+import org.apache.orc.OrcProto;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
@@ -234,7 +250,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
/**
* Do we have schema on read in the configuration variables?
*/
- TypeDescription schema = OrcUtils.getDesiredRowTypeDescr(conf);
+ TypeDescription schema = getDesiredRowTypeDescr(conf);
Reader.Options options = new Reader.Options().range(offset, length);
options.schema(schema);
@@ -242,7 +258,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
List<OrcProto.Type> types = file.getTypes();
options.include(genIncludedColumns(types, conf, isOriginal));
setSearchArgument(options, types, conf, isOriginal);
- return file.rowsOptions(options);
+ return (RecordReader) file.rowsOptions(options);
}
public static boolean isOriginal(Reader file) {
@@ -1500,7 +1516,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
/**
* Do we have schema on read in the configuration variables?
*/
- TypeDescription schema = OrcUtils.getDesiredRowTypeDescr(conf);
+ TypeDescription schema = getDesiredRowTypeDescr(conf);
final Reader reader;
final int bucket;
@@ -1508,7 +1524,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
readOptions.range(split.getStart(), split.getLength());
// TODO: Convert genIncludedColumns and setSearchArgument to use TypeDescription.
- final List<Type> schemaTypes = OrcUtils.getOrcTypes(schema);
+ final List<OrcProto.Type> schemaTypes = OrcUtils.getOrcTypes(schema);
readOptions.include(genIncludedColumns(schemaTypes, conf, SCHEMA_TYPES_IS_ORIGINAL));
setSearchArgument(readOptions, schemaTypes, conf, SCHEMA_TYPES_IS_ORIGINAL);
@@ -1594,7 +1610,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
public static boolean[] pickStripesViaTranslatedSarg(SearchArgument sarg,
- WriterVersion writerVersion, List<OrcProto.Type> types,
+ OrcFile.WriterVersion writerVersion, List<OrcProto.Type> types,
List<StripeStatistics> stripeStats, int stripeCount) {
LOG.info("Translated ORC pushdown predicate: " + sarg);
assert sarg != null;
@@ -1608,7 +1624,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
private static boolean[] pickStripes(SearchArgument sarg, String[] sargColNames,
- WriterVersion writerVersion, boolean isOriginal, List<StripeStatistics> stripeStats,
+ OrcFile.WriterVersion writerVersion, boolean isOriginal, List<StripeStatistics> stripeStats,
int stripeCount, Path filePath) {
if (sarg == null || stripeStats == null || writerVersion == OrcFile.WriterVersion.ORIGINAL) {
return null; // only do split pruning if HIVE-8732 has been fixed in the writer
@@ -1927,4 +1943,165 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
return true;
}
}
+ /**
+ * Convert a Hive type property string that contains separated type names into a list of
+ * TypeDescription objects.
+ * @return the list of TypeDescription objects.
+ */
+ public static ArrayList<TypeDescription> typeDescriptionsFromHiveTypeProperty(
+ String hiveTypeProperty) {
+
+ // CONSDIER: We need a type name parser for TypeDescription.
+
+ ArrayList<TypeInfo> typeInfoList = TypeInfoUtils.getTypeInfosFromTypeString(hiveTypeProperty);
+ ArrayList<TypeDescription> typeDescrList =new ArrayList<TypeDescription>(typeInfoList.size());
+ for (TypeInfo typeInfo : typeInfoList) {
+ typeDescrList.add(convertTypeInfo(typeInfo));
+ }
+ return typeDescrList;
+ }
+
+ public static TypeDescription convertTypeInfo(TypeInfo info) {
+ switch (info.getCategory()) {
+ case PRIMITIVE: {
+ PrimitiveTypeInfo pinfo = (PrimitiveTypeInfo) info;
+ switch (pinfo.getPrimitiveCategory()) {
+ case BOOLEAN:
+ return TypeDescription.createBoolean();
+ case BYTE:
+ return TypeDescription.createByte();
+ case SHORT:
+ return TypeDescription.createShort();
+ case INT:
+ return TypeDescription.createInt();
+ case LONG:
+ return TypeDescription.createLong();
+ case FLOAT:
+ return TypeDescription.createFloat();
+ case DOUBLE:
+ return TypeDescription.createDouble();
+ case STRING:
+ return TypeDescription.createString();
+ case DATE:
+ return TypeDescription.createDate();
+ case TIMESTAMP:
+ return TypeDescription.createTimestamp();
+ case BINARY:
+ return TypeDescription.createBinary();
+ case DECIMAL: {
+ DecimalTypeInfo dinfo = (DecimalTypeInfo) pinfo;
+ return TypeDescription.createDecimal()
+ .withScale(dinfo.getScale())
+ .withPrecision(dinfo.getPrecision());
+ }
+ case VARCHAR: {
+ BaseCharTypeInfo cinfo = (BaseCharTypeInfo) pinfo;
+ return TypeDescription.createVarchar()
+ .withMaxLength(cinfo.getLength());
+ }
+ case CHAR: {
+ BaseCharTypeInfo cinfo = (BaseCharTypeInfo) pinfo;
+ return TypeDescription.createChar()
+ .withMaxLength(cinfo.getLength());
+ }
+ default:
+ throw new IllegalArgumentException("ORC doesn't handle primitive" +
+ " category " + pinfo.getPrimitiveCategory());
+ }
+ }
+ case LIST: {
+ ListTypeInfo linfo = (ListTypeInfo) info;
+ return TypeDescription.createList
+ (convertTypeInfo(linfo.getListElementTypeInfo()));
+ }
+ case MAP: {
+ MapTypeInfo minfo = (MapTypeInfo) info;
+ return TypeDescription.createMap
+ (convertTypeInfo(minfo.getMapKeyTypeInfo()),
+ convertTypeInfo(minfo.getMapValueTypeInfo()));
+ }
+ case UNION: {
+ UnionTypeInfo minfo = (UnionTypeInfo) info;
+ TypeDescription result = TypeDescription.createUnion();
+ for (TypeInfo child: minfo.getAllUnionObjectTypeInfos()) {
+ result.addUnionChild(convertTypeInfo(child));
+ }
+ return result;
+ }
+ case STRUCT: {
+ StructTypeInfo sinfo = (StructTypeInfo) info;
+ TypeDescription result = TypeDescription.createStruct();
+ for(String fieldName: sinfo.getAllStructFieldNames()) {
+ result.addField(fieldName,
+ convertTypeInfo(sinfo.getStructFieldTypeInfo(fieldName)));
+ }
+ return result;
+ }
+ default:
+ throw new IllegalArgumentException("ORC doesn't handle " +
+ info.getCategory());
+ }
+ }
+
+
+ public static TypeDescription getDesiredRowTypeDescr(Configuration conf) {
+
+ String columnNameProperty = null;
+ String columnTypeProperty = null;
+
+ ArrayList<String> schemaEvolutionColumnNames = null;
+ ArrayList<TypeDescription> schemaEvolutionTypeDescrs = null;
+
+ boolean haveSchemaEvolutionProperties = false;
+ if (HiveConf.getBoolVar(conf, ConfVars.HIVE_SCHEMA_EVOLUTION)) {
+
+ columnNameProperty = conf.get(IOConstants.SCHEMA_EVOLUTION_COLUMNS);
+ columnTypeProperty = conf.get(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES);
+
+ haveSchemaEvolutionProperties =
+ (columnNameProperty != null && columnTypeProperty != null);
+
+ if (haveSchemaEvolutionProperties) {
+ schemaEvolutionColumnNames = Lists.newArrayList(columnNameProperty.split(","));
+ if (schemaEvolutionColumnNames.size() == 0) {
+ haveSchemaEvolutionProperties = false;
+ } else {
+ schemaEvolutionTypeDescrs =
+ typeDescriptionsFromHiveTypeProperty(columnTypeProperty);
+ if (schemaEvolutionTypeDescrs.size() != schemaEvolutionColumnNames.size()) {
+ haveSchemaEvolutionProperties = false;
+ }
+ }
+ }
+ }
+
+ if (!haveSchemaEvolutionProperties) {
+
+ // Try regular properties;
+ columnNameProperty = conf.get(serdeConstants.LIST_COLUMNS);
+ columnTypeProperty = conf.get(serdeConstants.LIST_COLUMN_TYPES);
+ if (columnTypeProperty == null || columnNameProperty == null) {
+ return null;
+ }
+
+ schemaEvolutionColumnNames = Lists.newArrayList(columnNameProperty.split(","));
+ if (schemaEvolutionColumnNames.size() == 0) {
+ return null;
+ }
+ schemaEvolutionTypeDescrs =
+ typeDescriptionsFromHiveTypeProperty(columnTypeProperty);
+ if (schemaEvolutionTypeDescrs.size() != schemaEvolutionColumnNames.size()) {
+ return null;
+ }
+ }
+
+ // Desired schema does not include virtual columns or partition columns.
+ TypeDescription result = TypeDescription.createStruct();
+ for (int i = 0; i < schemaEvolutionColumnNames.size(); i++) {
+ result.addField(schemaEvolutionColumnNames.get(i), schemaEvolutionTypeDescrs.get(i));
+ }
+
+ return result;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java
index c15b35f..2782d7e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.orc.OrcProto;
/** An InputFormat for ORC files. Keys are meaningless,
* value is the OrcStruct object */
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java
index edc7490..2e63aba 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.ql.io.AcidInputFormat;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.orc.FileMetaInfo;
/**
* OrcFileSplit. Holds file meta info
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
index 2d0eaaf..3fb6a86 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
@@ -24,6 +24,8 @@ import java.util.Arrays;
import java.util.List;
import java.util.Properties;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.TypeDescription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FileSystem;
@@ -33,7 +35,6 @@ import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.ql.io.RecordUpdater;
import org.apache.hadoop.hive.ql.io.StatsProvidingRecordWriter;
-import org.apache.hadoop.hive.ql.io.orc.OrcFile.EncodingStrategy;
import org.apache.hadoop.hive.ql.io.orc.OrcSerde.OrcSerdeRow;
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -160,7 +161,7 @@ public class OrcOutputFormat extends FileOutputFormat<NullWritable, OrcSerdeRow>
TypeDescription schema = TypeDescription.createStruct();
for (int i = 0; i < columnNames.size(); ++i) {
schema.addField(columnNames.get(i),
- OrcUtils.convertTypeInfo(columnTypes.get(i)));
+ OrcInputFormat.convertTypeInfo(columnTypes.get(i)));
}
if (LOG.isDebugEnabled()) {
LOG.debug("ORC schema = " + schema);
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index 090d63b..e5f9786 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -26,6 +26,11 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import org.apache.orc.OrcUtils;
+import org.apache.orc.StripeInformation;
+import org.apache.orc.TypeDescription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -442,7 +447,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
this.length = options.getLength();
this.validTxnList = validTxnList;
- TypeDescription typeDescr = OrcUtils.getDesiredRowTypeDescr(conf);
+ TypeDescription typeDescr = OrcInputFormat.getDesiredRowTypeDescr(conf);
if (typeDescr == null) {
throw new IOException(ErrorMsg.SCHEMA_REQUIRED_TO_READ_ACID_TABLES.getErrorCodedMsg());
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java
index c0e9b1a..59876e2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Properties;
+import org.apache.orc.OrcConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
index 81afb48..61cde41 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
@@ -25,12 +25,12 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import org.apache.orc.FileMetaInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.ColumnarSplit;
import org.apache.hadoop.hive.ql.io.AcidInputFormat;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapred.FileSplit;
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java
index 7a17b92..d48cadd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
import org.apache.hadoop.io.Writable;
+import org.apache.orc.OrcProto;
import java.io.DataInput;
import java.io.DataOutput;