You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2016/03/23 02:41:50 UTC
[5/7] tajo git commit: TAJO-2102: Migrate to Apache Orc from Presto's
one.
http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DecimalColumnStatistics.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DecimalColumnStatistics.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DecimalColumnStatistics.java
deleted file mode 100644
index 27cdac2..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DecimalColumnStatistics.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tajo.storage.thirdparty.orc;
-
-import org.apache.hadoop.hive.common.type.HiveDecimal;
-
-/**
- * Statistics for decimal columns.
- */
-public interface DecimalColumnStatistics extends ColumnStatistics {
-
- /**
- * Get the minimum value for the column.
- * @return the minimum value
- */
- HiveDecimal getMinimum();
-
- /**
- * Get the maximum value for the column.
- * @return the maximum value
- */
- HiveDecimal getMaximum();
-
- /**
- * Get the sum of the values of the column.
- * @return the sum
- */
- HiveDecimal getSum();
-
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DirectDecompressionCodec.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DirectDecompressionCodec.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DirectDecompressionCodec.java
deleted file mode 100644
index 5333052..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DirectDecompressionCodec.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tajo.storage.thirdparty.orc;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-public interface DirectDecompressionCodec extends CompressionCodec {
- public boolean isAvailable();
- public void directDecompress(ByteBuffer in, ByteBuffer out) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DoubleColumnStatistics.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DoubleColumnStatistics.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DoubleColumnStatistics.java
deleted file mode 100644
index ddce8f7..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DoubleColumnStatistics.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tajo.storage.thirdparty.orc;
-
-/**
- * Statistics for float and double columns.
- */
-public interface DoubleColumnStatistics extends ColumnStatistics {
-
- /**
- * Get the smallest value in the column. Only defined if getNumberOfValues
- * is non-zero.
- * @return the minimum
- */
- double getMinimum();
-
- /**
- * Get the largest value in the column. Only defined if getNumberOfValues
- * is non-zero.
- * @return the maximum
- */
- double getMaximum();
-
- /**
- * Get the sum of the values in the column.
- * @return the sum
- */
- double getSum();
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DynamicByteArray.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DynamicByteArray.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DynamicByteArray.java
deleted file mode 100644
index 1d44f77..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DynamicByteArray.java
+++ /dev/null
@@ -1,303 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tajo.storage.thirdparty.orc;
-
-import org.apache.hadoop.io.Text;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-
-/**
- * A class that is a growable array of bytes. Growth is managed in terms of
- * chunks that are allocated when needed.
- */
-final class DynamicByteArray {
- static final int DEFAULT_CHUNKSIZE = 32 * 1024;
- static final int DEFAULT_NUM_CHUNKS = 128;
-
- private final int chunkSize; // our allocation sizes
- private byte[][] data; // the real data
- private int length; // max set element index +1
- private int initializedChunks = 0; // the number of chunks created
-
- public DynamicByteArray() {
- this(DEFAULT_NUM_CHUNKS, DEFAULT_CHUNKSIZE);
- }
-
- public DynamicByteArray(int numChunks, int chunkSize) {
- if (chunkSize == 0) {
- throw new IllegalArgumentException("bad chunksize");
- }
- this.chunkSize = chunkSize;
- data = new byte[numChunks][];
- }
-
- /**
- * Ensure that the given index is valid.
- */
- private void grow(int chunkIndex) {
- if (chunkIndex >= initializedChunks) {
- if (chunkIndex >= data.length) {
- int newSize = Math.max(chunkIndex + 1, 2 * data.length);
- byte[][] newChunk = new byte[newSize][];
- System.arraycopy(data, 0, newChunk, 0, data.length);
- data = newChunk;
- }
- for(int i=initializedChunks; i <= chunkIndex; ++i) {
- data[i] = new byte[chunkSize];
- }
- initializedChunks = chunkIndex + 1;
- }
- }
-
- public byte get(int index) {
- if (index >= length) {
- throw new IndexOutOfBoundsException("Index " + index +
- " is outside of 0.." +
- (length - 1));
- }
- int i = index / chunkSize;
- int j = index % chunkSize;
- return data[i][j];
- }
-
- public void set(int index, byte value) {
- int i = index / chunkSize;
- int j = index % chunkSize;
- grow(i);
- if (index >= length) {
- length = index + 1;
- }
- data[i][j] = value;
- }
-
- public int add(byte value) {
- int i = length / chunkSize;
- int j = length % chunkSize;
- grow(i);
- data[i][j] = value;
- int result = length;
- length += 1;
- return result;
- }
-
- /**
- * Copy a slice of a byte array into our buffer.
- * @param value the array to copy from
- * @param valueOffset the first location to copy from value
- * @param valueLength the number of bytes to copy from value
- * @return the offset of the start of the value
- */
- public int add(byte[] value, int valueOffset, int valueLength) {
- int i = length / chunkSize;
- int j = length % chunkSize;
- grow((length + valueLength) / chunkSize);
- int remaining = valueLength;
- while (remaining > 0) {
- int size = Math.min(remaining, chunkSize - j);
- System.arraycopy(value, valueOffset, data[i], j, size);
- remaining -= size;
- valueOffset += size;
- i += 1;
- j = 0;
- }
- int result = length;
- length += valueLength;
- return result;
- }
-
- /**
- * Read the entire stream into this array.
- * @param in the stream to read from
- * @throws IOException
- */
- public void readAll(InputStream in) throws IOException {
- int currentChunk = length / chunkSize;
- int currentOffset = length % chunkSize;
- grow(currentChunk);
- int currentLength = in.read(data[currentChunk], currentOffset,
- chunkSize - currentOffset);
- while (currentLength > 0) {
- length += currentLength;
- currentOffset = length % chunkSize;
- if (currentOffset == 0) {
- currentChunk = length / chunkSize;
- grow(currentChunk);
- }
- currentLength = in.read(data[currentChunk], currentOffset,
- chunkSize - currentOffset);
- }
- }
-
- /**
- * Byte compare a set of bytes against the bytes in this dynamic array.
- * @param other source of the other bytes
- * @param otherOffset start offset in the other array
- * @param otherLength number of bytes in the other array
- * @param ourOffset the offset in our array
- * @param ourLength the number of bytes in our array
- * @return negative for less, 0 for equal, positive for greater
- */
- public int compare(byte[] other, int otherOffset, int otherLength,
- int ourOffset, int ourLength) {
- int currentChunk = ourOffset / chunkSize;
- int currentOffset = ourOffset % chunkSize;
- int maxLength = Math.min(otherLength, ourLength);
- while (maxLength > 0 &&
- other[otherOffset] == data[currentChunk][currentOffset]) {
- otherOffset += 1;
- currentOffset += 1;
- if (currentOffset == chunkSize) {
- currentChunk += 1;
- currentOffset = 0;
- }
- maxLength -= 1;
- }
- if (maxLength == 0) {
- return otherLength - ourLength;
- }
- int otherByte = 0xff & other[otherOffset];
- int ourByte = 0xff & data[currentChunk][currentOffset];
- return otherByte > ourByte ? 1 : -1;
- }
-
- /**
- * Get the size of the array.
- * @return the number of bytes in the array
- */
- public int size() {
- return length;
- }
-
- /**
- * Clear the array to its original pristine state.
- */
- public void clear() {
- length = 0;
- for(int i=0; i < data.length; ++i) {
- data[i] = null;
- }
- initializedChunks = 0;
- }
-
- /**
- * Set a text value from the bytes in this dynamic array.
- * @param result the value to set
- * @param offset the start of the bytes to copy
- * @param length the number of bytes to copy
- */
- public void setText(Text result, int offset, int length) {
- result.clear();
- int currentChunk = offset / chunkSize;
- int currentOffset = offset % chunkSize;
- int currentLength = Math.min(length, chunkSize - currentOffset);
- while (length > 0) {
- result.append(data[currentChunk], currentOffset, currentLength);
- length -= currentLength;
- currentChunk += 1;
- currentOffset = 0;
- currentLength = Math.min(length, chunkSize - currentOffset);
- }
- }
-
- /**
- * Write out a range of this dynamic array to an output stream.
- * @param out the stream to write to
- * @param offset the first offset to write
- * @param length the number of bytes to write
- * @throws IOException
- */
- public void write(OutputStream out, int offset,
- int length) throws IOException {
- int currentChunk = offset / chunkSize;
- int currentOffset = offset % chunkSize;
- while (length > 0) {
- int currentLength = Math.min(length, chunkSize - currentOffset);
- out.write(data[currentChunk], currentOffset, currentLength);
- length -= currentLength;
- currentChunk += 1;
- currentOffset = 0;
- }
- }
-
- @Override
- public String toString() {
- int i;
- StringBuilder sb = new StringBuilder(length * 3);
-
- sb.append('{');
- int l = length - 1;
- for (i=0; i<l; i++) {
- sb.append(Integer.toHexString(get(i)));
- sb.append(',');
- }
- sb.append(get(i));
- sb.append('}');
-
- return sb.toString();
- }
-
- public void setByteBuffer(ByteBuffer result, int offset, int length) {
- result.clear();
- int currentChunk = offset / chunkSize;
- int currentOffset = offset % chunkSize;
- int currentLength = Math.min(length, chunkSize - currentOffset);
- while (length > 0) {
- result.put(data[currentChunk], currentOffset, currentLength);
- length -= currentLength;
- currentChunk += 1;
- currentOffset = 0;
- currentLength = Math.min(length, chunkSize - currentOffset);
- }
- }
-
- /**
- * Gets all the bytes of the array.
- *
- * @return Bytes of the array
- */
- public byte[] get() {
- byte[] result = null;
- if (length > 0) {
- int currentChunk = 0;
- int currentOffset = 0;
- int currentLength = Math.min(length, chunkSize);
- int destOffset = 0;
- result = new byte[length];
- int totalLength = length;
- while (totalLength > 0) {
- System.arraycopy(data[currentChunk], currentOffset, result, destOffset, currentLength);
- destOffset += currentLength;
- totalLength -= currentLength;
- currentChunk += 1;
- currentOffset = 0;
- currentLength = Math.min(totalLength, chunkSize - currentOffset);
- }
- }
- return result;
- }
-
- /**
- * Get the size of the buffers.
- */
- public long getSizeInBytes() {
- return initializedChunks * chunkSize;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DynamicIntArray.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DynamicIntArray.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DynamicIntArray.java
deleted file mode 100644
index a347706..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DynamicIntArray.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tajo.storage.thirdparty.orc;
-
-/**
- * Dynamic int array that uses primitive types and chunks to avoid copying
- * large number of integers when it resizes.
- *
- * The motivation for this class is memory optimization, i.e. space efficient
- * storage of potentially huge arrays without good a-priori size guesses.
- *
- * The API of this class is between a primitive array and a AbstractList. It's
- * not a Collection implementation because it handles primitive types, but the
- * API could be extended to support iterators and the like.
- *
- * NOTE: Like standard Collection implementations/arrays, this class is not
- * synchronized.
- */
-final class DynamicIntArray {
- static final int DEFAULT_CHUNKSIZE = 8 * 1024;
- static final int INIT_CHUNKS = 128;
-
- private final int chunkSize; // our allocation size
- private int[][] data; // the real data
- private int length; // max set element index +1
- private int initializedChunks = 0; // the number of created chunks
-
- public DynamicIntArray() {
- this(DEFAULT_CHUNKSIZE);
- }
-
- public DynamicIntArray(int chunkSize) {
- this.chunkSize = chunkSize;
-
- data = new int[INIT_CHUNKS][];
- }
-
- /**
- * Ensure that the given index is valid.
- */
- private void grow(int chunkIndex) {
- if (chunkIndex >= initializedChunks) {
- if (chunkIndex >= data.length) {
- int newSize = Math.max(chunkIndex + 1, 2 * data.length);
- int[][] newChunk = new int[newSize][];
- System.arraycopy(data, 0, newChunk, 0, data.length);
- data = newChunk;
- }
- for (int i=initializedChunks; i <= chunkIndex; ++i) {
- data[i] = new int[chunkSize];
- }
- initializedChunks = chunkIndex + 1;
- }
- }
-
- public int get(int index) {
- if (index >= length) {
- throw new IndexOutOfBoundsException("Index " + index +
- " is outside of 0.." +
- (length - 1));
- }
- int i = index / chunkSize;
- int j = index % chunkSize;
- return data[i][j];
- }
-
- public void set(int index, int value) {
- int i = index / chunkSize;
- int j = index % chunkSize;
- grow(i);
- if (index >= length) {
- length = index + 1;
- }
- data[i][j] = value;
- }
-
- public void increment(int index, int value) {
- int i = index / chunkSize;
- int j = index % chunkSize;
- grow(i);
- if (index >= length) {
- length = index + 1;
- }
- data[i][j] += value;
- }
-
- public void add(int value) {
- int i = length / chunkSize;
- int j = length % chunkSize;
- grow(i);
- data[i][j] = value;
- length += 1;
- }
-
- public int size() {
- return length;
- }
-
- public void clear() {
- length = 0;
- for(int i=0; i < data.length; ++i) {
- data[i] = null;
- }
- initializedChunks = 0;
- }
-
- public String toString() {
- int i;
- StringBuilder sb = new StringBuilder(length * 4);
-
- sb.append('{');
- int l = length - 1;
- for (i=0; i<l; i++) {
- sb.append(get(i));
- sb.append(',');
- }
- sb.append(get(i));
- sb.append('}');
-
- return sb.toString();
- }
-
- public int getSizeInBytes() {
- return 4 * initializedChunks * chunkSize;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java
deleted file mode 100644
index 5357f51..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java
+++ /dev/null
@@ -1,133 +0,0 @@
-
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tajo.storage.thirdparty.orc;
-
-import com.facebook.presto.orc.DiskRange;
-import com.facebook.presto.orc.OrcDataSource;
-import com.google.common.collect.ImmutableMap;
-import io.airlift.slice.BasicSliceInput;
-import io.airlift.slice.FixedLengthSliceInput;
-import io.airlift.units.DataSize;
-import org.apache.hadoop.fs.FSDataInputStream;
-
-import java.io.IOException;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import static com.facebook.presto.orc.OrcDataSourceUtils.getDiskRangeSlice;
-import static com.facebook.presto.orc.OrcDataSourceUtils.mergeAdjacentDiskRanges;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * HDFS File data source class for Orc Reader
- *
- * Most of code is from Presto
- */
-public class HdfsOrcDataSource
- implements OrcDataSource
-{
- private final FSDataInputStream inputStream;
- private final String path;
- private final long size;
- private final DataSize maxMergeDistance;
- private final DataSize maxReadSize;
- private long readTimeNanos;
-
- public HdfsOrcDataSource(String path, FSDataInputStream inputStream, long size,
- DataSize maxMergeDistance, DataSize maxReadSize)
- {
- this.path = checkNotNull(path, "path is null");
- this.inputStream = checkNotNull(inputStream, "inputStream is null");
- this.size = size;
- checkArgument(size >= 0, "size is negative");
-
- this.maxMergeDistance = checkNotNull(maxMergeDistance, "maxMergeDistance is null");
- this.maxReadSize = checkNotNull(maxReadSize, "maxMergeDistance is null");
- }
-
- @Override
- public void close()
- throws IOException
- {
- inputStream.close();
- }
-
- @Override
- public long getReadTimeNanos()
- {
- return readTimeNanos;
- }
-
- @Override
- public long getSize()
- {
- return size;
- }
-
- @Override
- public void readFully(long position, byte[] buffer)
- throws IOException
- {
- readFully(position, buffer, 0, buffer.length);
- }
-
- @Override
- public void readFully(long position, byte[] buffer, int bufferOffset, int bufferLength)
- throws IOException
- {
- long start = System.nanoTime();
-
- inputStream.readFully(position, buffer, bufferOffset, bufferLength);
- readTimeNanos += System.nanoTime() - start;
- }
-
- @Override
- public <K> Map<K, FixedLengthSliceInput> readFully(Map<K, DiskRange> diskRanges)
- throws IOException
- {
- checkNotNull(diskRanges, "diskRanges is null");
-
- if (diskRanges.isEmpty()) {
- return ImmutableMap.of();
- }
-
- Iterable<DiskRange> mergedRanges = mergeAdjacentDiskRanges(diskRanges.values(), maxMergeDistance, maxReadSize);
-
- // read ranges
- Map<DiskRange, byte[]> buffers = new LinkedHashMap<>();
- for (DiskRange mergedRange : mergedRanges) {
- // read full range in one request
- byte[] buffer = new byte[mergedRange.getLength()];
- readFully(mergedRange.getOffset(), buffer);
- buffers.put(mergedRange, buffer);
- }
-
- ImmutableMap.Builder<K, FixedLengthSliceInput> slices = ImmutableMap.builder();
- diskRanges.forEach((K key, DiskRange range) ->
- slices.put(key, new BasicSliceInput(getDiskRangeSlice(range, buffers))));
-
- return slices.build();
- }
-
- @Override
- public String toString()
- {
- return path;
- }
-}
-
-
http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/IntegerColumnStatistics.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/IntegerColumnStatistics.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/IntegerColumnStatistics.java
deleted file mode 100644
index 208454f..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/IntegerColumnStatistics.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tajo.storage.thirdparty.orc;
-
-/**
- * Statistics for all of the integer columns, such as byte, short, int, and
- * long.
- */
-public interface IntegerColumnStatistics extends ColumnStatistics {
- /**
- * Get the smallest value in the column. Only defined if getNumberOfValues
- * is non-zero.
- * @return the minimum
- */
- long getMinimum();
-
- /**
- * Get the largest value in the column. Only defined if getNumberOfValues
- * is non-zero.
- * @return the maximum
- */
- long getMaximum();
-
- /**
- * Is the sum defined? If the sum overflowed the counter this will be false.
- * @return is the sum available
- */
- boolean isSumDefined();
-
- /**
- * Get the sum of the column. Only valid if isSumDefined returns true.
- * @return the sum of the column
- */
- long getSum();
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/IntegerWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/IntegerWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/IntegerWriter.java
deleted file mode 100644
index 6872882..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/IntegerWriter.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.thirdparty.orc;
-
-import java.io.IOException;
-
-/**
- * Interface for writing integers.
- */
-interface IntegerWriter {
-
- /**
- * Get position from the stream.
- * @param recorder
- * @throws IOException
- */
- void getPosition(PositionRecorder recorder) throws IOException;
-
- /**
- * Write the integer value
- * @param value
- * @throws IOException
- */
- void write(long value) throws IOException;
-
- /**
- * Flush the buffer
- * @throws IOException
- */
- void flush() throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/MemoryManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/MemoryManager.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/MemoryManager.java
deleted file mode 100644
index 79af80f..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/MemoryManager.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.thirdparty.orc;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * Implements a memory manager that keeps a global context of how many ORC
- * writers there are and manages the memory between them. For use cases with
- * dynamic partitions, it is easy to end up with many writers in the same task.
- * By managing the size of each allocation, we try to cut down the size of each
- * allocation and keep the task from running out of memory.
- *
- * This class is not thread safe, but is re-entrant - ensure creation and all
- * invocations are triggered from the same thread.
- */
-class MemoryManager {
-
- private static final Log LOG = LogFactory.getLog(MemoryManager.class);
-
- /**
- * How often should we check the memory sizes? Measured in rows added
- * to all of the writers.
- */
- private static final int ROWS_BETWEEN_CHECKS = 5000;
- private final long totalMemoryPool;
- private final Map<Path, WriterInfo> writerList =
- new HashMap<>();
- private long totalAllocation = 0;
- private double currentScale = 1;
- private int rowsAddedSinceCheck = 0;
- private final OwnedLock ownerLock = new OwnedLock();
-
- @SuppressWarnings("serial")
- private static class OwnedLock extends ReentrantLock {
- public Thread getOwner() {
- return super.getOwner();
- }
- }
-
- private static class WriterInfo {
- long allocation;
- Callback callback;
- WriterInfo(long allocation, Callback callback) {
- this.allocation = allocation;
- this.callback = callback;
- }
- }
-
- public interface Callback {
- /**
- * The writer needs to check its memory usage
- * @param newScale the current scale factor for memory allocations
- * @return true if the writer was over the limit
- * @throws IOException
- */
- boolean checkMemory(double newScale) throws IOException;
- }
-
- /**
- * Create the memory manager.
- * @param conf use the configuration to find the maximum size of the memory
- * pool.
- */
- MemoryManager(Configuration conf) {
- HiveConf.ConfVars poolVar = HiveConf.ConfVars.HIVE_ORC_FILE_MEMORY_POOL;
- double maxLoad = conf.getFloat(poolVar.varname, poolVar.defaultFloatVal);
- totalMemoryPool = Math.round(ManagementFactory.getMemoryMXBean().
- getHeapMemoryUsage().getMax() * maxLoad);
- ownerLock.lock();
- }
-
- /**
- * Light weight thread-safety check for multi-threaded access patterns
- */
- private void checkOwner() {
- Preconditions.checkArgument(ownerLock.isHeldByCurrentThread(),
- "Owner thread expected %s, got %s",
- ownerLock.getOwner(),
- Thread.currentThread());
- }
-
- /**
- * Add a new writer's memory allocation to the pool. We use the path
- * as a unique key to ensure that we don't get duplicates.
- * @param path the file that is being written
- * @param requestedAllocation the requested buffer size
- */
- void addWriter(Path path, long requestedAllocation,
- Callback callback) throws IOException {
- checkOwner();
- WriterInfo oldVal = writerList.get(path);
- // this should always be null, but we handle the case where the memory
- // manager wasn't told that a writer wasn't still in use and the task
- // starts writing to the same path.
- if (oldVal == null) {
- oldVal = new WriterInfo(requestedAllocation, callback);
- writerList.put(path, oldVal);
- totalAllocation += requestedAllocation;
- } else {
- // handle a new writer that is writing to the same path
- totalAllocation += requestedAllocation - oldVal.allocation;
- oldVal.allocation = requestedAllocation;
- oldVal.callback = callback;
- }
- updateScale(true);
- }
-
- /**
- * Remove the given writer from the pool.
- * @param path the file that has been closed
- */
- void removeWriter(Path path) throws IOException {
- checkOwner();
- WriterInfo val = writerList.get(path);
- if (val != null) {
- writerList.remove(path);
- totalAllocation -= val.allocation;
- if (writerList.isEmpty()) {
- rowsAddedSinceCheck = 0;
- }
- updateScale(false);
- }
- if(writerList.isEmpty()) {
- rowsAddedSinceCheck = 0;
- }
- }
-
- /**
- * Get the total pool size that is available for ORC writers.
- * @return the number of bytes in the pool
- */
- long getTotalMemoryPool() {
- return totalMemoryPool;
- }
-
- /**
- * The scaling factor for each allocation to ensure that the pool isn't
- * oversubscribed.
- * @return a fraction between 0.0 and 1.0 of the requested size that is
- * available for each writer.
- */
- double getAllocationScale() {
- return currentScale;
- }
-
- /**
- * Give the memory manager an opportunity for doing a memory check.
- * @throws IOException
- */
- void addedRow() throws IOException {
- if (++rowsAddedSinceCheck >= ROWS_BETWEEN_CHECKS) {
- notifyWriters();
- }
- }
-
- /**
- * Notify all of the writers that they should check their memory usage.
- * @throws IOException
- */
- void notifyWriters() throws IOException {
- checkOwner();
- LOG.debug("Notifying writers after " + rowsAddedSinceCheck);
- for(WriterInfo writer: writerList.values()) {
- boolean flushed = writer.callback.checkMemory(currentScale);
- if (LOG.isDebugEnabled() && flushed) {
- LOG.debug("flushed " + writer.toString());
- }
- }
- rowsAddedSinceCheck = 0;
- }
-
- /**
- * Update the currentScale based on the current allocation and pool size.
- * This also updates the notificationTrigger.
- * @param isAllocate is this an allocation?
- */
- private void updateScale(boolean isAllocate) throws IOException {
- if (totalAllocation <= totalMemoryPool) {
- currentScale = 1;
- } else {
- currentScale = (double) totalMemoryPool / totalAllocation;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/Metadata.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/Metadata.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/Metadata.java
deleted file mode 100644
index dfa4c36..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/Metadata.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.thirdparty.orc;
-
-import com.google.common.collect.Lists;
-
-import java.util.List;
-
-public class Metadata {
-
- private final OrcProto.Metadata metadata;
-
- Metadata(OrcProto.Metadata m) {
- this.metadata = m;
- }
-
- /**
- * Return list of stripe level column statistics
- *
- * @return list of stripe statistics
- */
- public List<StripeStatistics> getStripeStatistics() {
- List<StripeStatistics> result = Lists.newArrayList();
- for (OrcProto.StripeStatistics ss : metadata.getStripeStatsList()) {
- result.add(new StripeStatistics(ss.getColStatsList()));
- }
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcFile.java
index a291953..8f26d21 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcFile.java
@@ -21,11 +21,15 @@ package org.apache.tajo.storage.thirdparty.orc;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-
-import static org.apache.tajo.storage.thirdparty.orc.OrcConf.ConfVars.*;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.FileMetaInfo;
+import org.apache.orc.FileMetadata;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.MemoryManager;
+import org.apache.tajo.storage.orc.ORCAppender;
import java.io.IOException;
+import java.util.Properties;
import java.util.TimeZone;
/**
@@ -50,9 +54,9 @@ public final class OrcFile {
* prevent the new reader from reading ORC files generated by any released
* version of Hive.
*/
- public static enum Version {
+ public enum Version {
V_0_11("0.11", 0, 11),
- V_0_12("0.12", 0, 12);
+ V_0_12("0.12", 0, 12);
public static final Version CURRENT = V_0_12;
@@ -102,9 +106,14 @@ public final class OrcFile {
* For bugs in the writer, but the old readers already read the new data
* correctly, bump this version instead of the Version.
*/
- public static enum WriterVersion {
+ public enum WriterVersion {
ORIGINAL(0),
- HIVE_8732(1); // corrupted stripe/file maximum column statistics
+ HIVE_8732(1), // corrupted stripe/file maximum column statistics
+ HIVE_4243(2), // use real column names from Hive tables
+ HIVE_12055(3), // vectorized writer
+
+ // Don't use any magic numbers here except for the below:
+ FUTURE(Integer.MAX_VALUE); // a version from a future writer
private final int id;
@@ -112,67 +121,111 @@ public final class OrcFile {
return id;
}
- private WriterVersion(int id) {
+ WriterVersion(int id) {
this.id = id;
}
+
+ private static final WriterVersion[] values;
+ static {
+ // Assumes few non-negative values close to zero.
+ int max = Integer.MIN_VALUE;
+ for (WriterVersion v : WriterVersion.values()) {
+ if (v.id < 0) throw new AssertionError();
+ if (v.id > max && FUTURE.id != v.id) {
+ max = v.id;
+ }
+ }
+ values = new WriterVersion[max + 1];
+ for (WriterVersion v : WriterVersion.values()) {
+ if (v.id < values.length) {
+ values[v.id] = v;
+ }
+ }
+ }
+
+ public static WriterVersion from(int val) {
+ if (val == FUTURE.id) return FUTURE; // Special handling for the magic value.
+ return values[val];
+ }
}
+ public static final WriterVersion CURRENT_WRITER = WriterVersion.HIVE_12055;
- public static enum EncodingStrategy {
+ public enum EncodingStrategy {
SPEED, COMPRESSION;
}
- public static enum CompressionStrategy {
+ public enum CompressionStrategy {
SPEED, COMPRESSION;
}
- // Note : these string definitions for table properties are deprecated,
- // and retained only for backward compatibility, please do not add to
- // them, add to OrcTableProperties below instead
- @Deprecated public static final String COMPRESSION = "orc.compress";
- @Deprecated public static final String COMPRESSION_BLOCK_SIZE = "orc.compress.size";
- @Deprecated public static final String STRIPE_SIZE = "orc.stripe.size";
- @Deprecated public static final String ROW_INDEX_STRIDE = "orc.row.index.stride";
- @Deprecated public static final String ENABLE_INDEXES = "orc.create.index";
- @Deprecated public static final String BLOCK_PADDING = "orc.block.padding";
+ // unused
+ private OrcFile() {}
- /**
- * Enum container for all orc table properties.
- * If introducing a new orc-specific table property,
- * add it here.
- */
- public static enum OrcTableProperties {
- COMPRESSION("orc.compress"),
- COMPRESSION_BLOCK_SIZE("orc.compress.size"),
- STRIPE_SIZE("orc.stripe.size"),
- BLOCK_SIZE("orc.block.size"),
- ROW_INDEX_STRIDE("orc.row.index.stride"),
- ENABLE_INDEXES("orc.create.index"),
- BLOCK_PADDING("orc.block.padding"),
- ENCODING_STRATEGY("orc.encoding.strategy"),
- BLOOM_FILTER_COLUMNS("orc.bloom.filter.columns"),
- BLOOM_FILTER_FPP("orc.bloom.filter.fpp");
+ public static class ReaderOptions {
+ private final Configuration conf;
+ private FileSystem filesystem;
+ private FileMetaInfo fileMetaInfo; // TODO: this comes from some place.
+ private long maxLength = Long.MAX_VALUE;
+ private FileMetadata fullFileMetadata; // Propagate from LLAP cache.
+
+ public ReaderOptions(Configuration conf) {
+ this.conf = conf;
+ }
+
+ public ReaderOptions fileMetaInfo(FileMetaInfo info) {
+ fileMetaInfo = info;
+ return this;
+ }
+
+ public ReaderOptions filesystem(FileSystem fs) {
+ this.filesystem = fs;
+ return this;
+ }
+
+ public ReaderOptions maxLength(long val) {
+ maxLength = val;
+ return this;
+ }
- private final String propName;
+ public ReaderOptions fileMetadata(FileMetadata metadata) {
+ this.fullFileMetadata = metadata;
+ return this;
+ }
+
+ public Configuration getConfiguration() {
+ return conf;
+ }
- OrcTableProperties(String propName) {
- this.propName = propName;
+ public FileSystem getFilesystem() {
+ return filesystem;
}
- public String getPropName(){
- return this.propName;
+ public FileMetaInfo getFileMetaInfo() {
+ return fileMetaInfo;
+ }
+
+ public long getMaxLength() {
+ return maxLength;
+ }
+
+ public FileMetadata getFileMetadata() {
+ return fullFileMetadata;
}
}
- // unused
- private OrcFile() {}
+ public static ReaderOptions readerOptions(Configuration conf) {
+ return new ReaderOptions(conf);
+ }
+
+
- public static interface WriterContext {
+ public interface WriterContext {
Writer getWriter();
}
- public static interface WriterCallback {
- public void preStripeWrite(WriterContext context) throws IOException;
- public void preFooterWrite(WriterContext context) throws IOException;
+ public interface WriterCallback {
+ void preStripeWrite(WriterContext context) throws IOException;
+ void preFooterWrite(WriterContext context) throws IOException;
}
/**
@@ -181,7 +234,7 @@ public final class OrcFile {
public static class WriterOptions {
private final Configuration configuration;
private FileSystem fileSystemValue = null;
- private ObjectInspector inspectorValue = null;
+ private TypeDescription schema = null;
private long stripeSizeValue;
private long blockSizeValue;
private int rowIndexStrideValue;
@@ -193,45 +246,42 @@ public final class OrcFile {
private WriterCallback callback;
private EncodingStrategy encodingStrategy;
private CompressionStrategy compressionStrategy;
- private float paddingTolerance;
+ private double paddingTolerance;
private String bloomFilterColumns;
private double bloomFilterFpp;
- private TimeZone timezone;
- WriterOptions(Configuration conf) {
+ protected WriterOptions(Properties tableProperties, Configuration conf) {
configuration = conf;
- memoryManagerValue = getMemoryManager(conf);
- stripeSizeValue = OrcConf.getLongVar(conf, HIVE_ORC_DEFAULT_STRIPE_SIZE);
- blockSizeValue = OrcConf.getLongVar(conf, HIVE_ORC_DEFAULT_BLOCK_SIZE);
- rowIndexStrideValue = OrcConf.getIntVar(conf, HIVE_ORC_DEFAULT_ROW_INDEX_STRIDE);
- bufferSizeValue = OrcConf.getIntVar(conf, HIVE_ORC_DEFAULT_BUFFER_SIZE);
- blockPaddingValue = OrcConf.getBoolVar(conf, HIVE_ORC_DEFAULT_BLOCK_PADDING);
- compressValue = CompressionKind.valueOf(OrcConf.getVar(conf, HIVE_ORC_DEFAULT_COMPRESS));
- String versionName = OrcConf.getVar(conf, HIVE_ORC_WRITE_FORMAT);
- if (versionName == null) {
- versionValue = Version.CURRENT;
- } else {
- versionValue = Version.byName(versionName);
- }
- String enString =
- conf.get(OrcConf.ConfVars.HIVE_ORC_ENCODING_STRATEGY.varname);
- if (enString == null) {
- encodingStrategy = EncodingStrategy.SPEED;
- } else {
- encodingStrategy = EncodingStrategy.valueOf(enString);
- }
-
- String compString = conf
- .get(OrcConf.ConfVars.HIVE_ORC_COMPRESSION_STRATEGY.varname);
- if (compString == null) {
- compressionStrategy = CompressionStrategy.SPEED;
- } else {
- compressionStrategy = CompressionStrategy.valueOf(compString);
- }
-
- paddingTolerance = conf.getFloat(OrcConf.ConfVars.HIVE_ORC_BLOCK_PADDING_TOLERANCE.varname,
- OrcConf.ConfVars.HIVE_ORC_BLOCK_PADDING_TOLERANCE.defaultFloatVal);
- bloomFilterFpp = BloomFilterIO.DEFAULT_FPP;
+ memoryManagerValue = getStaticMemoryManager(conf);
+ stripeSizeValue = org.apache.orc.OrcConf.STRIPE_SIZE.getLong(tableProperties, conf);
+ blockSizeValue = org.apache.orc.OrcConf.BLOCK_SIZE.getLong(tableProperties, conf);
+ rowIndexStrideValue =
+ (int) org.apache.orc.OrcConf.ROW_INDEX_STRIDE.getLong(tableProperties, conf);
+ bufferSizeValue = (int) org.apache.orc.OrcConf.BUFFER_SIZE.getLong(tableProperties,
+ conf);
+ blockPaddingValue =
+ org.apache.orc.OrcConf.BLOCK_PADDING.getBoolean(tableProperties, conf);
+ compressValue =
+ CompressionKind.valueOf(org.apache.orc.OrcConf.COMPRESS.getString(tableProperties,
+ conf));
+ String versionName = org.apache.orc.OrcConf.WRITE_FORMAT.getString(tableProperties,
+ conf);
+ versionValue = Version.byName(versionName);
+ String enString = org.apache.orc.OrcConf.ENCODING_STRATEGY.getString(tableProperties,
+ conf);
+ encodingStrategy = EncodingStrategy.valueOf(enString);
+
+ String compString =
+ org.apache.orc.OrcConf.COMPRESSION_STRATEGY.getString(tableProperties, conf);
+ compressionStrategy = CompressionStrategy.valueOf(compString);
+
+ paddingTolerance =
+ org.apache.orc.OrcConf.BLOCK_PADDING_TOLERANCE.getDouble(tableProperties, conf);
+
+ bloomFilterColumns = org.apache.orc.OrcConf.BLOOM_FILTER_COLUMNS.getString(tableProperties,
+ conf);
+ bloomFilterFpp = org.apache.orc.OrcConf.BLOOM_FILTER_FPP.getDouble(tableProperties,
+ conf);
}
/**
@@ -302,7 +352,7 @@ public final class OrcFile {
/**
* Sets the tolerance for block padding as a percentage of stripe size.
*/
- public WriterOptions paddingTolerance(float value) {
+ public WriterOptions paddingTolerance(double value) {
paddingTolerance = value;
return this;
}
@@ -318,7 +368,7 @@ public final class OrcFile {
/**
* Specify the false positive probability for bloom filter.
* @param fpp - false positive probability
- * @return
+ * @return this
*/
public WriterOptions bloomFilterFpp(double fpp) {
bloomFilterFpp = fpp;
@@ -334,11 +384,12 @@ public final class OrcFile {
}
/**
- * A required option that sets the object inspector for the rows. Used
- * to determine the schema for the file.
+ * Set the schema for the file. This is a required parameter.
+ * @param schema the schema for the file.
+ * @return this
*/
- public WriterOptions inspector(ObjectInspector value) {
- inspectorValue = value;
+ public WriterOptions setSchema(TypeDescription schema) {
+ this.schema = schema;
return this;
}
@@ -353,7 +404,7 @@ public final class OrcFile {
/**
* Add a listener for when the stripe and file are about to be closed.
* @param callback the object to be called when the stripe is closed
- * @return
+ * @return this
*/
public WriterOptions callback(WriterCallback callback) {
this.callback = callback;
@@ -363,25 +414,112 @@ public final class OrcFile {
/**
* A package local option to set the memory manager.
*/
- WriterOptions memory(MemoryManager value) {
+ protected WriterOptions memory(MemoryManager value) {
memoryManagerValue = value;
return this;
}
- /**
- * Tajo-specific
- */
- WriterOptions timezone(TimeZone value) {
- timezone = value;
- return this;
+ public boolean getBlockPadding() {
+ return blockPaddingValue;
+ }
+
+ public long getBlockSize() {
+ return blockSizeValue;
+ }
+
+ public String getBloomFilterColumns() {
+ return bloomFilterColumns;
}
+
+ public FileSystem getFileSystem() {
+ return fileSystemValue;
+ }
+
+ public Configuration getConfiguration() {
+ return configuration;
+ }
+
+ public TypeDescription getSchema() {
+ return schema;
+ }
+
+ public long getStripeSize() {
+ return stripeSizeValue;
+ }
+
+ public CompressionKind getCompress() {
+ return compressValue;
+ }
+
+ public WriterCallback getCallback() {
+ return callback;
+ }
+
+ public Version getVersion() {
+ return versionValue;
+ }
+
+ public MemoryManager getMemoryManager() {
+ return memoryManagerValue;
+ }
+
+ public int getBufferSize() {
+ return bufferSizeValue;
+ }
+
+ public int getRowIndexStride() {
+ return rowIndexStrideValue;
+ }
+
+ public CompressionStrategy getCompressionStrategy() {
+ return compressionStrategy;
+ }
+
+ public EncodingStrategy getEncodingStrategy() {
+ return encodingStrategy;
+ }
+
+ public double getPaddingTolerance() {
+ return paddingTolerance;
+ }
+
+ public double getBloomFilterFpp() {
+ return bloomFilterFpp;
+ }
+ }
+
+ /**
+ * Create a set of writer options based on a configuration.
+ * @param conf the configuration to use for values
+ * @return A WriterOptions object that can be modified
+ */
+ public static ORCAppender.WriterOptions writerOptions(Configuration conf) {
+ return new ORCAppender.WriterOptions(null, conf);
}
/**
- * Create a default set of write options that can be modified.
+ * Create a set of write options based on a set of table properties and
+ * configuration.
+ * @param tableProperties the properties of the table
+ * @param conf the configuration of the query
+ * @return a WriterOptions object that can be modified
*/
- public static WriterOptions writerOptions(Configuration conf) {
- return new WriterOptions(conf);
+ public static WriterOptions writerOptions(Properties tableProperties,
+ Configuration conf) {
+ return new WriterOptions(tableProperties, conf);
+ }
+
+ private static synchronized MemoryManager getStaticMemoryManager(
+ final Configuration conf) {
+ if (memoryManager == null) {
+ memoryManager = new ThreadLocal<MemoryManager>() {
+ @Override
+ protected MemoryManager initialValue() {
+ return new MemoryManager(conf);
+ }
+ };
+ }
+ return memoryManager.get();
}
/**
@@ -393,54 +531,13 @@ public final class OrcFile {
* @throws IOException
*/
public static Writer createWriter(Path path,
- WriterOptions opts
- ) throws IOException {
- FileSystem fs = opts.fileSystemValue == null ?
- path.getFileSystem(opts.configuration) : opts.fileSystemValue;
-
- return new WriterImpl(fs, path, opts.configuration, opts.inspectorValue,
- opts.stripeSizeValue, opts.compressValue,
- opts.bufferSizeValue, opts.rowIndexStrideValue,
- opts.memoryManagerValue, opts.blockPaddingValue,
- opts.versionValue, opts.callback,
- opts.encodingStrategy, opts.compressionStrategy,
- opts.paddingTolerance, opts.blockSizeValue,
- opts.bloomFilterColumns, opts.bloomFilterFpp,
- opts.timezone);
- }
+ WriterOptions opts,
+ TimeZone timeZone
+ ) throws IOException {
+ FileSystem fs = opts.getFileSystem() == null ?
+ path.getFileSystem(opts.getConfiguration()) : opts.getFileSystem();
- /**
- * Create an ORC file writer. This method is provided for API backward
- * compatability with Hive 0.11.
- * @param fs file system
- * @param path filename to write to
- * @param inspector the ObjectInspector that inspects the rows
- * @param stripeSize the number of bytes in a stripe
- * @param compress how to compress the file
- * @param bufferSize the number of bytes to compress at once
- * @param rowIndexStride the number of rows between row index entries or
- * 0 to suppress all indexes
- * @return a new ORC file writer
- * @throws IOException
- */
- public static Writer createWriter(FileSystem fs,
- Path path,
- Configuration conf,
- ObjectInspector inspector,
- long stripeSize,
- CompressionKind compress,
- int bufferSize,
- int rowIndexStride,
- TimeZone timeZone) throws IOException {
- return createWriter(path,
- writerOptions(conf)
- .fileSystem(fs)
- .inspector(inspector)
- .stripeSize(stripeSize)
- .compress(compress)
- .bufferSize(bufferSize)
- .rowIndexStride(rowIndexStride)
- .timezone(timeZone));
+ return new WriterImpl(fs, path, opts, timeZone);
}
private static ThreadLocal<MemoryManager> memoryManager = null;
http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcRecordReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcRecordReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcRecordReader.java
new file mode 100644
index 0000000..7194bf4
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcRecordReader.java
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.thirdparty.orc;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.io.DiskRange;
+import org.apache.hadoop.hive.common.io.DiskRangeList;
+import org.apache.orc.*;
+import org.apache.orc.OrcProto;
+import org.apache.orc.impl.*;
+import org.apache.orc.impl.StreamName;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.thirdparty.orc.TreeReaderFactory.DatumTreeReader;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.*;
+
+public class OrcRecordReader implements Closeable {
+
+ private final Log LOG = LogFactory.getLog(OrcRecordReader.class);
+
+ private final Path path;
+ private final long firstRow;
+ private final List<StripeInformation> stripes = new ArrayList<>();
+ private OrcProto.StripeFooter stripeFooter;
+ private final long totalRowCount;
+ private final CompressionCodec codec;
+ private final List<OrcProto.Type> types;
+ private final int bufferSize;
+ private final boolean[] included;
+ private final long rowIndexStride;
+ private long rowInStripe = 0;
+ private int currentStripe = -1;
+ private long rowBaseInStripe = 0;
+ private long rowCountInStripe = 0;
+ private final Map<org.apache.orc.impl.StreamName, InStream> streams = new HashMap<>();
+ DiskRangeList bufferChunks = null;
+ private final TreeReaderFactory.DatumTreeReader[] reader;
+ private final OrcProto.RowIndex[] indexes;
+ private final OrcProto.BloomFilterIndex[] bloomFilterIndices;
+ private final Configuration conf;
+ private final MetadataReader metadata;
+ private final DataReader dataReader;
+ private final Tuple result;
+
+ public OrcRecordReader(List<StripeInformation> stripes,
+ FileSystem fileSystem,
+ Schema schema,
+ Column[] target,
+ FileFragment fragment,
+ List<OrcProto.Type> types,
+ CompressionCodec codec,
+ int bufferSize,
+ long strideRate,
+ Reader.Options options,
+ Configuration conf,
+ TimeZone timeZone) throws IOException {
+
+ result = new VTuple(target.length);
+
+ this.conf = conf;
+ this.path = fragment.getPath();
+ this.codec = codec;
+ this.types = types;
+ this.bufferSize = bufferSize;
+ this.included = new boolean[schema.size() + 1];
+ included[0] = target.length > 0; // always include root column except when target schema size is 0
+ Schema targetSchema = new Schema(target);
+ for (int i = 1; i < included.length; i++) {
+ included[i] = targetSchema.contains(schema.getColumn(i - 1));
+ }
+ this.rowIndexStride = strideRate;
+ this.metadata = new MetadataReaderImpl(fileSystem, path, codec, bufferSize, types.size());
+
+ long rows = 0;
+ long skippedRows = 0;
+ long offset = fragment.getStartKey();
+ long maxOffset = fragment.getStartKey() + fragment.getLength();
+ for(StripeInformation stripe: stripes) {
+ long stripeStart = stripe.getOffset();
+ if (offset > stripeStart) {
+ skippedRows += stripe.getNumberOfRows();
+ } else if (stripeStart < maxOffset) {
+ this.stripes.add(stripe);
+ rows += stripe.getNumberOfRows();
+ }
+ }
+
+ // TODO: we could change the ctor to pass this externally
+ this.dataReader = RecordReaderUtils.createDefaultDataReader(fileSystem, path, options.getUseZeroCopy(), codec);
+ this.dataReader.open();
+
+ firstRow = skippedRows;
+ totalRowCount = rows;
+
+ reader = new DatumTreeReader[target.length];
+ for (int i = 0; i < reader.length; i++) {
+ reader[i] = TreeReaderFactory.createTreeReader(timeZone, schema.getColumnId(target[i].getQualifiedName()), target[i],
+ options.getSkipCorruptRecords());
+ }
+
+ indexes = new OrcProto.RowIndex[types.size()];
+ bloomFilterIndices = new OrcProto.BloomFilterIndex[types.size()];
+ advanceToNextRow(reader, 0L, true);
+ }
+
+ /**
+ * Plan the ranges of the file that we need to read given the list of
+ * columns and row groups.
+ *
+ * @param streamList the list of streams available
+ * @param includedColumns which columns are needed
+ * @param doMergeBuffers
+ * @return the list of disk ranges that will be loaded
+ */
+ static DiskRangeList planReadPartialDataStreams
+ (List<OrcProto.Stream> streamList,
+ boolean[] includedColumns,
+ boolean doMergeBuffers) {
+ long offset = 0;
+ // figure out which columns have a present stream
+ DiskRangeList.CreateHelper list = new DiskRangeList.CreateHelper();
+ for (OrcProto.Stream stream : streamList) {
+ long length = stream.getLength();
+ int column = stream.getColumn();
+ OrcProto.Stream.Kind streamKind = stream.getKind();
+ // since stream kind is optional, first check if it exists
+ if (stream.hasKind() &&
+ (org.apache.orc.impl.StreamName.getArea(streamKind) == org.apache.orc.impl.StreamName.Area.DATA) &&
+ includedColumns[column]) {
+ RecordReaderUtils.addEntireStreamToRanges(offset, length, list, doMergeBuffers);
+ }
+ offset += length;
+ }
+ return list.extract();
+ }
+
+ void createStreams(List<OrcProto.Stream> streamDescriptions,
+ DiskRangeList ranges,
+ boolean[] includeColumn,
+ CompressionCodec codec,
+ int bufferSize,
+ Map<org.apache.orc.impl.StreamName, InStream> streams) throws IOException {
+ long streamOffset = 0;
+ for (OrcProto.Stream streamDesc : streamDescriptions) {
+ int column = streamDesc.getColumn();
+ if ((includeColumn != null && !includeColumn[column]) ||
+ streamDesc.hasKind() &&
+ (org.apache.orc.impl.StreamName.getArea(streamDesc.getKind()) != org.apache.orc.impl.StreamName.Area.DATA)) {
+ streamOffset += streamDesc.getLength();
+ continue;
+ }
+ List<DiskRange> buffers = RecordReaderUtils.getStreamBuffers(
+ ranges, streamOffset, streamDesc.getLength());
+ org.apache.orc.impl.StreamName name = new StreamName(column, streamDesc.getKind());
+ streams.put(name, InStream.create(name.toString(), buffers,
+ streamDesc.getLength(), codec, bufferSize));
+ streamOffset += streamDesc.getLength();
+ }
+ }
+
+ private void readPartialDataStreams(StripeInformation stripe) throws IOException {
+ List<OrcProto.Stream> streamList = stripeFooter.getStreamsList();
+ DiskRangeList toRead = planReadPartialDataStreams(streamList, included, true);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("chunks = " + RecordReaderUtils.stringifyDiskRanges(toRead));
+ }
+ bufferChunks = dataReader.readFileData(toRead, stripe.getOffset(), false);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("merge = " + RecordReaderUtils.stringifyDiskRanges(bufferChunks));
+ }
+
+ createStreams(streamList, bufferChunks, included, codec, bufferSize, streams);
+ }
+
+ /**
+ * Skip over rows that we aren't selecting, so that the next row is
+ * one that we will read.
+ *
+ * @param nextRow the row we want to go to
+ * @throws IOException
+ */
+ private boolean advanceToNextRow(
+ TreeReaderFactory.TreeReader[] reader, long nextRow, boolean canAdvanceStripe)
+ throws IOException {
+ long nextRowInStripe = nextRow - rowBaseInStripe;
+
+ if (nextRowInStripe >= rowCountInStripe) {
+ if (canAdvanceStripe) {
+ advanceStripe();
+ }
+ return canAdvanceStripe;
+ }
+ if (nextRowInStripe != rowInStripe) {
+ if (rowIndexStride != 0) {
+ int rowGroup = (int) (nextRowInStripe / rowIndexStride);
+ seekToRowEntry(reader, rowGroup);
+ for (TreeReaderFactory.TreeReader eachReader : reader) {
+ eachReader.skipRows(nextRowInStripe - rowGroup * rowIndexStride);
+ }
+ } else {
+ for (TreeReaderFactory.TreeReader eachReader : reader) {
+ eachReader.skipRows(nextRowInStripe - rowInStripe);
+ }
+ }
+ rowInStripe = nextRowInStripe;
+ }
+ return true;
+ }
+
+ public boolean hasNext() throws IOException {
+ return rowInStripe < rowCountInStripe;
+ }
+
+ public Tuple next() throws IOException {
+ if (hasNext()) {
+ try {
+ for (int i = 0; i < reader.length; i++) {
+ result.put(i, reader[i].next());
+ }
+ // find the next row
+ rowInStripe += 1;
+ advanceToNextRow(reader, rowInStripe + rowBaseInStripe, true);
+ return result;
+ } catch (IOException e) {
+ // Rethrow exception with file name in log message
+ throw new IOException("Error reading file: " + path, e);
+ }
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Read the next stripe until we find a row that we don't skip.
+ *
+ * @throws IOException
+ */
+ private void advanceStripe() throws IOException {
+ rowInStripe = rowCountInStripe;
+ while (rowInStripe >= rowCountInStripe &&
+ currentStripe < stripes.size() - 1) {
+ currentStripe += 1;
+ readStripe();
+ }
+ }
+
+ /**
+ * Read the current stripe into memory.
+ *
+ * @throws IOException
+ */
+ private void readStripe() throws IOException {
+ StripeInformation stripe = beginReadStripe();
+
+ // if we haven't skipped the whole stripe, read the data
+ if (rowInStripe < rowCountInStripe) {
+ // if we aren't projecting columns or filtering rows, just read it all
+ if (included == null) {
+ readAllDataStreams(stripe);
+ } else {
+ readPartialDataStreams(stripe);
+ }
+
+ for (TreeReaderFactory.TreeReader eachReader : reader) {
+ eachReader.startStripe(streams, stripeFooter);
+ }
+ // if we skipped the first row group, move the pointers forward
+ if (rowInStripe != 0) {
+ seekToRowEntry(reader, (int) (rowInStripe / rowIndexStride));
+ }
+ }
+ }
+
+ private void clearStreams() throws IOException {
+ // explicit close of all streams to de-ref ByteBuffers
+ for (InStream is : streams.values()) {
+ is.close();
+ }
+ if (bufferChunks != null) {
+ if (dataReader.isTrackingDiskRanges()) {
+ for (DiskRangeList range = bufferChunks; range != null; range = range.next) {
+ if (!(range instanceof BufferChunk)) {
+ continue;
+ }
+ dataReader.releaseBuffer(((BufferChunk) range).getChunk());
+ }
+ }
+ }
+ bufferChunks = null;
+ streams.clear();
+ }
+
+ OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws IOException {
+ return metadata.readStripeFooter(stripe);
+ }
+
+ private StripeInformation beginReadStripe() throws IOException {
+ StripeInformation stripe = stripes.get(currentStripe);
+ stripeFooter = readStripeFooter(stripe);
+ clearStreams();
+ // setup the position in the stripe
+ rowCountInStripe = stripe.getNumberOfRows();
+ rowInStripe = 0;
+ rowBaseInStripe = 0;
+ for (int i = 0; i < currentStripe; ++i) {
+ rowBaseInStripe += stripes.get(i).getNumberOfRows();
+ }
+ // reset all of the indexes
+ for (int i = 0; i < indexes.length; ++i) {
+ indexes[i] = null;
+ }
+ return stripe;
+ }
+
+ private void readAllDataStreams(StripeInformation stripe) throws IOException {
+ long start = stripe.getIndexLength();
+ long end = start + stripe.getDataLength();
+ // explicitly trigger 1 big read
+ DiskRangeList toRead = new DiskRangeList(start, end);
+ bufferChunks = dataReader.readFileData(toRead, stripe.getOffset(), false);
+ List<OrcProto.Stream> streamDescriptions = stripeFooter.getStreamsList();
+ createStreams(streamDescriptions, bufferChunks, included, codec, bufferSize, streams);
+ }
+
+ public long getRowNumber() {
+ return rowInStripe + rowBaseInStripe + firstRow;
+ }
+
+ public float getProgress() {
+ return ((float) rowBaseInStripe + rowInStripe) / totalRowCount;
+ }
+
+ private int findStripe(long rowNumber) {
+ for (int i = 0; i < stripes.size(); i++) {
+ StripeInformation stripe = stripes.get(i);
+ if (stripe.getNumberOfRows() > rowNumber) {
+ return i;
+ }
+ rowNumber -= stripe.getNumberOfRows();
+ }
+ throw new IllegalArgumentException("Seek after the end of reader range");
+ }
+
+ OrcIndex readRowIndex(
+ int stripeIndex, boolean[] included) throws IOException {
+ return readRowIndex(stripeIndex, included, null, null);
+ }
+
+ OrcIndex readRowIndex(int stripeIndex, boolean[] included, OrcProto.RowIndex[] indexes,
+ OrcProto.BloomFilterIndex[] bloomFilterIndex) throws IOException {
+ StripeInformation stripe = stripes.get(stripeIndex);
+ OrcProto.StripeFooter stripeFooter = null;
+ // if this is the current stripe, use the cached objects.
+ if (stripeIndex == currentStripe) {
+ stripeFooter = this.stripeFooter;
+ indexes = indexes == null ? this.indexes : indexes;
+ bloomFilterIndex = bloomFilterIndex == null ? this.bloomFilterIndices : bloomFilterIndex;
+ }
+ return metadata.readRowIndex(stripe, stripeFooter, included, indexes, null,
+ bloomFilterIndex);
+ }
+
+ private void seekToRowEntry(TreeReaderFactory.TreeReader []reader, int rowEntry)
+ throws IOException {
+ PositionProvider[] index = new PositionProvider[indexes.length];
+ for (int i = 0; i < indexes.length; ++i) {
+ if (indexes[i] != null) {
+ index[i] = new PositionProviderImpl(indexes[i].getEntry(rowEntry));
+ }
+ }
+ for (TreeReaderFactory.TreeReader eachReader : reader) {
+ eachReader.seek(index);
+ }
+ }
+
+ public void seekToRow(long rowNumber) throws IOException {
+ if (rowNumber < 0) {
+ throw new IllegalArgumentException("Seek to a negative row number " +
+ rowNumber);
+ } else if (rowNumber < firstRow) {
+ throw new IllegalArgumentException("Seek before reader range " +
+ rowNumber);
+ }
+ // convert to our internal form (rows from the beginning of slice)
+ rowNumber -= firstRow;
+
+ // move to the right stripe
+ int rightStripe = findStripe(rowNumber);
+ if (rightStripe != currentStripe) {
+ currentStripe = rightStripe;
+ readStripe();
+ }
+ readRowIndex(currentStripe, included);
+
+ // if we aren't to the right row yet, advance in the stripe.
+ advanceToNextRow(reader, rowNumber, true);
+ }
+
+ public long getNumBytes() {
+ return ((RecordReaderUtils.DefaultDataReader)dataReader).getReadBytes();
+ }
+
+ @Override
+ public void close() throws IOException {
+ clearStreams();
+ dataReader.close();
+ }
+
+ public static final class PositionProviderImpl implements PositionProvider {
+ private final OrcProto.RowIndexEntry entry;
+ private int index;
+
+ public PositionProviderImpl(OrcProto.RowIndexEntry entry) {
+ this(entry, 0);
+ }
+
+ public PositionProviderImpl(OrcProto.RowIndexEntry entry, int startPos) {
+ this.entry = entry;
+ this.index = startPos;
+ }
+
+ @Override
+ public long getNext() {
+ return entry.getPositions(index++);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcUtils.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcUtils.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcUtils.java
index 3a474dd..b8d3f52 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcUtils.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcUtils.java
@@ -17,185 +17,101 @@
*/
package org.apache.tajo.storage.thirdparty.orc;
-import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.serde2.objectinspector.*;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import org.apache.orc.CompressionCodec;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.SnappyCodec;
+import org.apache.orc.impl.ZlibCodec;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TypeDesc;
+import org.apache.tajo.exception.TajoRuntimeException;
+import org.apache.tajo.exception.UnsupportedDataTypeException;
public class OrcUtils {
private static final Log LOG = LogFactory.getLog(OrcUtils.class);
- /**
- * Returns selected columns as a boolean array with true value set for specified column names.
- * The result will contain number of elements equal to flattened number of columns.
- * For example:
- * selectedColumns - a,b,c
- * allColumns - a,b,c,d
- * If column c is a complex type, say list<string> and other types are primitives then result will
- * be [false, true, true, true, true, true, false]
- * Index 0 is the root element of the struct which is set to false by default, index 1,2
- * corresponds to columns a and b. Index 3,4 correspond to column c which is list<string> and
- * index 5 correspond to column d. After flattening list<string> gets 2 columns.
- *
- * @param selectedColumns - comma separated list of selected column names
- * @param allColumns - comma separated list of all column names
- * @param inspector - object inspector
- * @return - boolean array with true value set for the specified column names
- */
- public static boolean[] includeColumns(String selectedColumns, String allColumns,
- ObjectInspector inspector) {
- int numFlattenedCols = getFlattenedColumnsCount(inspector);
- boolean[] results = new boolean[numFlattenedCols];
- if ("*".equals(selectedColumns)) {
- Arrays.fill(results, true);
- return results;
- }
- if (selectedColumns != null && !selectedColumns.isEmpty()) {
- includeColumnsImpl(results, selectedColumns.toLowerCase(), allColumns, inspector);
- }
- return results;
- }
-
- private static void includeColumnsImpl(boolean[] includeColumns, String selectedColumns,
- String allColumns,
- ObjectInspector inspector) {
- Map<String, List<Integer>> columnSpanMap = getColumnSpan(allColumns, inspector);
- LOG.info("columnSpanMap: " + columnSpanMap);
-
- String[] selCols = selectedColumns.split(",");
- for (String sc : selCols) {
- if (columnSpanMap.containsKey(sc)) {
- List<Integer> colSpan = columnSpanMap.get(sc);
- int start = colSpan.get(0);
- int end = colSpan.get(1);
- for (int i = start; i <= end; i++) {
- includeColumns[i] = true;
+ public static org.apache.orc.CompressionCodec createCodec(org.apache.orc.CompressionKind kind) {
+ switch (kind) {
+ case NONE:
+ return null;
+ case ZLIB:
+ return new ZlibCodec();
+ case SNAPPY:
+ return new SnappyCodec();
+ case LZO:
+ try {
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ if (loader == null) {
+ throw new RuntimeException("error while getting a class loader");
}
+ @SuppressWarnings("unchecked")
+ Class<? extends org.apache.orc.CompressionCodec> lzo =
+ (Class<? extends CompressionCodec>)
+ loader.loadClass("org.apache.hadoop.hive.ql.io.orc.LzoCodec");
+ return lzo.newInstance();
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException("LZO is not available.", e);
+ } catch (InstantiationException e) {
+ throw new IllegalArgumentException("Problem initializing LZO", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException("Insufficient access to LZO", e);
}
- }
-
- LOG.info("includeColumns: " + Arrays.toString(includeColumns));
+ default:
+ throw new IllegalArgumentException("Unknown compression codec: " +
+ kind);
}
+ }
- private static Map<String, List<Integer>> getColumnSpan(String allColumns,
- ObjectInspector inspector) {
- // map that contains the column span for each column. Column span is the number of columns
- // required after flattening. For a given object inspector this map contains the start column
- // id and end column id (both inclusive) after flattening.
- // EXAMPLE:
- // schema: struct<a:int, b:float, c:map<string,int>>
- // column span map for the above struct will be
- // a => [1,1], b => [2,2], c => [3,5]
- Map<String, List<Integer>> columnSpanMap = new HashMap<>();
- if (allColumns != null) {
- String[] columns = allColumns.split(",");
- int startIdx = 0;
- int endIdx = 0;
- if (inspector instanceof StructObjectInspector) {
- StructObjectInspector soi = (StructObjectInspector) inspector;
- List<? extends StructField> fields = soi.getAllStructFieldRefs();
- for (int i = 0; i < fields.size(); i++) {
- StructField sf = fields.get(i);
-
- // we get the type (category) from object inspector but column name from the argument.
- // The reason for this is hive (FileSinkOperator) does not pass the actual column names,
- // instead it passes the internal column names (_col1,_col2).
- ObjectInspector sfOI = sf.getFieldObjectInspector();
- String colName = columns[i];
+ public static TypeDescription convertSchema(Schema schema) {
+ TypeDescription description = TypeDescription.createStruct();
- startIdx = endIdx + 1;
- switch (sfOI.getCategory()) {
- case PRIMITIVE:
- endIdx += 1;
- break;
- case STRUCT:
- endIdx += 1;
- StructObjectInspector structInsp = (StructObjectInspector) sfOI;
- List<? extends StructField> structFields = structInsp.getAllStructFieldRefs();
- for (StructField structField : structFields) {
- endIdx += getFlattenedColumnsCount(structField.getFieldObjectInspector());
- }
- break;
- case MAP:
- endIdx += 1;
- MapObjectInspector mapInsp = (MapObjectInspector) sfOI;
- endIdx += getFlattenedColumnsCount(mapInsp.getMapKeyObjectInspector());
- endIdx += getFlattenedColumnsCount(mapInsp.getMapValueObjectInspector());
- break;
- case LIST:
- endIdx += 1;
- ListObjectInspector listInsp = (ListObjectInspector) sfOI;
- endIdx += getFlattenedColumnsCount(listInsp.getListElementObjectInspector());
- break;
- case UNION:
- endIdx += 1;
- UnionObjectInspector unionInsp = (UnionObjectInspector) sfOI;
- List<ObjectInspector> choices = unionInsp.getObjectInspectors();
- for (ObjectInspector choice : choices) {
- endIdx += getFlattenedColumnsCount(choice);
- }
- break;
- default:
- throw new IllegalArgumentException("Bad category: " +
- inspector.getCategory());
- }
-
- columnSpanMap.put(colName, Lists.newArrayList(startIdx, endIdx));
- }
- }
+ for (Column eachColumn : schema.getRootColumns()) {
+ description.addField(eachColumn.getQualifiedName(),
+ convertTypeInfo(eachColumn.getTypeDesc()));
}
- return columnSpanMap;
+ return description;
}
- /**
- * Returns the number of columns after flatting complex types.
- *
- * @param inspector - object inspector
- * @return
- */
- public static int getFlattenedColumnsCount(ObjectInspector inspector) {
- int numWriters = 0;
- switch (inspector.getCategory()) {
- case PRIMITIVE:
- numWriters += 1;
- break;
- case STRUCT:
- numWriters += 1;
- StructObjectInspector structInsp = (StructObjectInspector) inspector;
- List<? extends StructField> fields = structInsp.getAllStructFieldRefs();
- for (StructField field : fields) {
- numWriters += getFlattenedColumnsCount(field.getFieldObjectInspector());
- }
- break;
- case MAP:
- numWriters += 1;
- MapObjectInspector mapInsp = (MapObjectInspector) inspector;
- numWriters += getFlattenedColumnsCount(mapInsp.getMapKeyObjectInspector());
- numWriters += getFlattenedColumnsCount(mapInsp.getMapValueObjectInspector());
- break;
- case LIST:
- numWriters += 1;
- ListObjectInspector listInsp = (ListObjectInspector) inspector;
- numWriters += getFlattenedColumnsCount(listInsp.getListElementObjectInspector());
- break;
- case UNION:
- numWriters += 1;
- UnionObjectInspector unionInsp = (UnionObjectInspector) inspector;
- List<ObjectInspector> choices = unionInsp.getObjectInspectors();
- for (ObjectInspector choice : choices) {
- numWriters += getFlattenedColumnsCount(choice);
+ public static TypeDescription convertTypeInfo(TypeDesc desc) {
+ switch (desc.getDataType().getType()) {
+ case BOOLEAN:
+ return TypeDescription.createBoolean();
+ case BIT:
+ return TypeDescription.createByte();
+ case INT2:
+ return TypeDescription.createShort();
+ case INT4:
+ case INET4:
+ return TypeDescription.createInt();
+ case INT8:
+ return TypeDescription.createLong();
+ case FLOAT4:
+ return TypeDescription.createFloat();
+ case FLOAT8:
+ return TypeDescription.createDouble();
+ case TEXT:
+ return TypeDescription.createString();
+ case DATE:
+ return TypeDescription.createDate();
+ case TIMESTAMP:
+ return TypeDescription.createTimestamp();
+ case BLOB:
+ return TypeDescription.createBinary();
+ case CHAR:
+ return TypeDescription.createChar()
+ .withMaxLength(desc.getDataType().getLength());
+ case RECORD: {
+ TypeDescription result = TypeDescription.createStruct();
+ for (Column eachColumn : desc.getNestedSchema().getRootColumns()) {
+ result.addField(eachColumn.getQualifiedName(),
+ convertTypeInfo(eachColumn.getTypeDesc()));
}
- break;
+ return result;
+ }
default:
- throw new IllegalArgumentException("Bad category: " +
- inspector.getCategory());
+ throw new TajoRuntimeException(new UnsupportedDataTypeException(desc.getDataType().getType().name()));
}
- return numWriters;
}
-
}