You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hj...@apache.org on 2014/12/03 06:30:38 UTC
[23/30] tajo git commit: TAJO-1122: Refactor the tajo-storage project
structure.
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java b/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
deleted file mode 100644
index f093f9d..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
+++ /dev/null
@@ -1,623 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.index.bst;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.proto.CatalogProtos.SchemaProto;
-import org.apache.tajo.storage.*;
-import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
-import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
-import org.apache.tajo.storage.index.IndexMethod;
-import org.apache.tajo.storage.index.IndexWriter;
-import org.apache.tajo.storage.index.OrderIndexReader;
-
-import java.io.Closeable;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.Set;
-import java.util.TreeMap;
-
-import static org.apache.tajo.index.IndexProtos.TupleComparatorProto;
-
-/**
- * This is two-level binary search tree index. This is one of the value-list
- * index structure. Thus, it is inefficient in the case where
- * the many of the values are same. Also, the BST shows the fast performance
- * when the selectivity of rows to be retrieved is less than 5%.
- * BSTIndexWriter is not thread-safe, whereas BSTIndexReader is thread-safe.
- */
-public class BSTIndex implements IndexMethod {
- private static final Log LOG = LogFactory.getLog(BSTIndex.class);
-
- public static final int ONE_LEVEL_INDEX = 1;
- public static final int TWO_LEVEL_INDEX = 2;
-
- private final Configuration conf;
-
- public BSTIndex(final Configuration conf) {
- this.conf = conf;
- }
-
- @Override
- public BSTIndexWriter getIndexWriter(final Path fileName, int level, Schema keySchema,
- TupleComparator comparator) throws IOException {
- return new BSTIndexWriter(fileName, level, keySchema, comparator);
- }
-
- @Override
- public BSTIndexReader getIndexReader(Path fileName, Schema keySchema, TupleComparator comparator) throws IOException {
- return new BSTIndexReader(fileName, keySchema, comparator);
- }
-
- public BSTIndexReader getIndexReader(Path fileName) throws IOException {
- return new BSTIndexReader(fileName);
- }
-
- public class BSTIndexWriter extends IndexWriter implements Closeable {
- private FSDataOutputStream out;
- private FileSystem fs;
- private int level;
- private int loadNum = 4096;
- private Path fileName;
-
- private final Schema keySchema;
- private final TupleComparator compartor;
- private final KeyOffsetCollector collector;
- private KeyOffsetCollector rootCollector;
-
- private Tuple firstKey;
- private Tuple lastKey;
-
- private RowStoreEncoder rowStoreEncoder;
-
- // private Tuple lastestKey = null;
-
- /**
- * constructor
- *
- * @param level
- * : IndexCreater.ONE_LEVEL_INDEX or IndexCreater.TWO_LEVEL_INDEX
- * @throws IOException
- */
- public BSTIndexWriter(final Path fileName, int level, Schema keySchema,
- TupleComparator comparator) throws IOException {
- this.fileName = fileName;
- this.level = level;
- this.keySchema = keySchema;
- this.compartor = comparator;
- this.collector = new KeyOffsetCollector(comparator);
- this.rowStoreEncoder = RowStoreUtil.createEncoder(keySchema);
- }
-
- public void setLoadNum(int loadNum) {
- this.loadNum = loadNum;
- }
-
- public void open() throws IOException {
- fs = fileName.getFileSystem(conf);
- if (fs.exists(fileName)) {
- throw new IOException("ERROR: index file (" + fileName + " already exists");
- }
- out = fs.create(fileName);
- }
-
- @Override
- public void write(Tuple key, long offset) throws IOException {
- if (firstKey == null || compartor.compare(key, firstKey) < 0) {
- firstKey = key;
- }
- if (lastKey == null || compartor.compare(lastKey, key) < 0) {
- lastKey = key;
- }
-
- collector.put(key, offset);
- }
-
- public TupleComparator getComparator() {
- return this.compartor;
- }
-
- public void flush() throws IOException {
- out.flush();
- }
-
- public void writeHeader(int entryNum) throws IOException {
- // schema
- byte [] schemaBytes = keySchema.getProto().toByteArray();
- out.writeInt(schemaBytes.length);
- out.write(schemaBytes);
-
- // comparator
- byte [] comparatorBytes = compartor.getProto().toByteArray();
- out.writeInt(comparatorBytes.length);
- out.write(comparatorBytes);
-
- // level
- out.writeInt(this.level);
- // entry
- out.writeInt(entryNum);
- if (entryNum > 0) {
- byte [] minBytes = rowStoreEncoder.toBytes(firstKey);
- out.writeInt(minBytes.length);
- out.write(minBytes);
- byte [] maxBytes = rowStoreEncoder.toBytes(lastKey);
- out.writeInt(maxBytes.length);
- out.write(maxBytes);
- }
- out.flush();
- }
-
- public void close() throws IOException {
- /* two level initialize */
- if (this.level == TWO_LEVEL_INDEX) {
- rootCollector = new KeyOffsetCollector(this.compartor);
- }
-
- /* data writing phase */
- TreeMap<Tuple, LinkedList<Long>> keyOffsetMap = collector.getMap();
- Set<Tuple> keySet = keyOffsetMap.keySet();
-
- int entryNum = keySet.size();
- writeHeader(entryNum);
-
- int loadCount = this.loadNum - 1;
- for (Tuple key : keySet) {
-
- if (this.level == TWO_LEVEL_INDEX) {
- loadCount++;
- if (loadCount == this.loadNum) {
- rootCollector.put(key, out.getPos());
- loadCount = 0;
- }
- }
- /* key writing */
- byte[] buf = rowStoreEncoder.toBytes(key);
- out.writeInt(buf.length);
- out.write(buf);
-
- /**/
- LinkedList<Long> offsetList = keyOffsetMap.get(key);
- /* offset num writing */
- int offsetSize = offsetList.size();
- out.writeInt(offsetSize);
- /* offset writing */
- for (Long offset : offsetList) {
- out.writeLong(offset);
- }
- }
-
- out.flush();
- out.close();
- keySet.clear();
- collector.clear();
-
- FSDataOutputStream rootOut = null;
- /* root index creating phase */
- if (this.level == TWO_LEVEL_INDEX) {
- TreeMap<Tuple, LinkedList<Long>> rootMap = rootCollector.getMap();
- keySet = rootMap.keySet();
-
- rootOut = fs.create(new Path(fileName + ".root"));
- rootOut.writeInt(this.loadNum);
- rootOut.writeInt(keySet.size());
-
- /* root key writing */
- for (Tuple key : keySet) {
- byte[] buf = rowStoreEncoder.toBytes(key);
- rootOut.writeInt(buf.length);
- rootOut.write(buf);
-
- LinkedList<Long> offsetList = rootMap.get(key);
- if (offsetList.size() > 1 || offsetList.size() == 0) {
- throw new IOException("Why root index doen't have one offset?");
- }
- rootOut.writeLong(offsetList.getFirst());
-
- }
- rootOut.flush();
- rootOut.close();
-
- keySet.clear();
- rootCollector.clear();
- }
- }
-
- private class KeyOffsetCollector {
- private TreeMap<Tuple, LinkedList<Long>> map;
-
- public KeyOffsetCollector(TupleComparator comparator) {
- map = new TreeMap<Tuple, LinkedList<Long>>(comparator);
- }
-
- public void put(Tuple key, long offset) {
- if (map.containsKey(key)) {
- map.get(key).add(offset);
- } else {
- LinkedList<Long> list = new LinkedList<Long>();
- list.add(offset);
- map.put(key, list);
- }
- }
-
- public TreeMap<Tuple, LinkedList<Long>> getMap() {
- return this.map;
- }
-
- public void clear() {
- this.map.clear();
- }
- }
- }
-
- /**
- * BSTIndexReader is thread-safe.
- */
- public class BSTIndexReader implements OrderIndexReader , Closeable{
- private Path fileName;
- private Schema keySchema;
- private TupleComparator comparator;
-
- private FileSystem fs;
- private FSDataInputStream indexIn;
- private FSDataInputStream subIn;
-
- private int level;
- private int entryNum;
- private int loadNum = -1;
- private Tuple firstKey;
- private Tuple lastKey;
-
- // the cursors of BST
- private int rootCursor;
- private int keyCursor;
- private int offsetCursor;
-
- // mutex
- private final Object mutex = new Object();
-
- private RowStoreDecoder rowStoreDecoder;
-
- /**
- *
- * @param fileName
- * @param keySchema
- * @param comparator
- * @throws IOException
- */
- public BSTIndexReader(final Path fileName, Schema keySchema, TupleComparator comparator) throws IOException {
- this.fileName = fileName;
- this.keySchema = keySchema;
- this.comparator = comparator;
- this.rowStoreDecoder = RowStoreUtil.createDecoder(keySchema);
- }
-
- public BSTIndexReader(final Path fileName) throws IOException {
- this.fileName = fileName;
- }
-
- public Schema getKeySchema() {
- return this.keySchema;
- }
-
- public TupleComparator getComparator() {
- return this.comparator;
- }
-
- private void readHeader() throws IOException {
- // schema
- int schemaByteSize = indexIn.readInt();
- byte [] schemaBytes = new byte[schemaByteSize];
- StorageUtil.readFully(indexIn, schemaBytes, 0, schemaByteSize);
-
- SchemaProto.Builder builder = SchemaProto.newBuilder();
- builder.mergeFrom(schemaBytes);
- SchemaProto proto = builder.build();
- this.keySchema = new Schema(proto);
- this.rowStoreDecoder = RowStoreUtil.createDecoder(keySchema);
-
- // comparator
- int compByteSize = indexIn.readInt();
- byte [] compBytes = new byte[compByteSize];
- StorageUtil.readFully(indexIn, compBytes, 0, compByteSize);
-
- TupleComparatorProto.Builder compProto = TupleComparatorProto.newBuilder();
- compProto.mergeFrom(compBytes);
- this.comparator = new BaseTupleComparator(compProto.build());
-
- // level
- this.level = indexIn.readInt();
- // entry
- this.entryNum = indexIn.readInt();
- if (entryNum > 0) { // if there is no any entry, do not read firstKey/lastKey values
- byte [] minBytes = new byte[indexIn.readInt()];
- StorageUtil.readFully(indexIn, minBytes, 0, minBytes.length);
- this.firstKey = rowStoreDecoder.toTuple(minBytes);
-
- byte [] maxBytes = new byte[indexIn.readInt()];
- StorageUtil.readFully(indexIn, maxBytes, 0, maxBytes.length);
- this.lastKey = rowStoreDecoder.toTuple(maxBytes);
- }
- }
-
- public void open()
- throws IOException {
- /* init the index file */
- fs = fileName.getFileSystem(conf);
- if (!fs.exists(fileName)) {
- throw new FileNotFoundException("ERROR: does not exist " + fileName.toString());
- }
-
- indexIn = fs.open(this.fileName);
- readHeader();
- fillData();
- }
-
- private void fillData() throws IOException {
- /* load on memory */
- if (this.level == TWO_LEVEL_INDEX) {
-
- Path rootPath = new Path(this.fileName + ".root");
- if (!fs.exists(rootPath)) {
- throw new FileNotFoundException("root index did not created");
- }
-
- subIn = indexIn;
- indexIn = fs.open(rootPath);
- /* root index header reading : type => loadNum => indexSize */
- this.loadNum = indexIn.readInt();
- this.entryNum = indexIn.readInt();
- /**/
- fillRootIndex(entryNum, indexIn);
-
- } else {
- fillLeafIndex(entryNum, indexIn, -1);
- }
- }
-
- /**
- *
- * @return
- * @throws IOException
- */
- public long find(Tuple key) throws IOException {
- return find(key, false);
- }
-
- @Override
- public long find(Tuple key, boolean nextKey) throws IOException {
- synchronized (mutex) {
- int pos = -1;
- if (this.level == ONE_LEVEL_INDEX) {
- pos = oneLevBS(key);
- } else if (this.level == TWO_LEVEL_INDEX) {
- pos = twoLevBS(key, this.loadNum + 1);
- } else {
- throw new IOException("More than TWL_LEVEL_INDEX is not supported.");
- }
-
- if (nextKey) {
- if (pos + 1 >= this.offsetSubIndex.length) {
- return -1;
- }
- keyCursor = pos + 1;
- offsetCursor = 0;
- } else {
- if (correctable) {
- keyCursor = pos;
- offsetCursor = 0;
- } else {
- return -1;
- }
- }
-
- return this.offsetSubIndex[keyCursor][offsetCursor];
- }
- }
-
- public long next() throws IOException {
- synchronized (mutex) {
- if (offsetSubIndex[keyCursor].length - 1 > offsetCursor) {
- offsetCursor++;
- } else {
- if (offsetSubIndex.length - 1 > keyCursor) {
- keyCursor++;
- offsetCursor = 0;
- } else {
- if (offsetIndex.length -1 > rootCursor) {
- rootCursor++;
- fillLeafIndex(loadNum + 1, subIn, this.offsetIndex[rootCursor]);
- keyCursor = 1;
- offsetCursor = 0;
- } else {
- return -1;
- }
- }
- }
-
- return this.offsetSubIndex[keyCursor][offsetCursor];
- }
- }
-
- public boolean isCurInMemory() {
- return (offsetSubIndex[keyCursor].length - 1 >= offsetCursor);
- }
-
- private void fillLeafIndex(int entryNum, FSDataInputStream in, long pos)
- throws IOException {
- int counter = 0;
- try {
- if (pos != -1) {
- in.seek(pos);
- }
- this.dataSubIndex = new Tuple[entryNum];
- this.offsetSubIndex = new long[entryNum][];
-
- byte[] buf;
- for (int i = 0; i < entryNum; i++) {
- counter++;
- buf = new byte[in.readInt()];
- StorageUtil.readFully(in, buf, 0, buf.length);
- dataSubIndex[i] = rowStoreDecoder.toTuple(buf);
-
- int offsetNum = in.readInt();
- this.offsetSubIndex[i] = new long[offsetNum];
- for (int j = 0; j < offsetNum; j++) {
- this.offsetSubIndex[i][j] = in.readLong();
- }
-
- }
-
- } catch (IOException e) {
- counter--;
- if (pos != -1) {
- in.seek(pos);
- }
- this.dataSubIndex = new Tuple[counter];
- this.offsetSubIndex = new long[counter][];
-
- byte[] buf;
- for (int i = 0; i < counter; i++) {
- buf = new byte[in.readInt()];
- StorageUtil.readFully(in, buf, 0, buf.length);
- dataSubIndex[i] = rowStoreDecoder.toTuple(buf);
-
- int offsetNum = in.readInt();
- this.offsetSubIndex[i] = new long[offsetNum];
- for (int j = 0; j < offsetNum; j++) {
- this.offsetSubIndex[i][j] = in.readLong();
- }
-
- }
- }
- }
-
- public Tuple getFirstKey() {
- return this.firstKey;
- }
-
- public Tuple getLastKey() {
- return this.lastKey;
- }
-
- private void fillRootIndex(int entryNum, FSDataInputStream in)
- throws IOException {
- this.dataIndex = new Tuple[entryNum];
- this.offsetIndex = new long[entryNum];
- Tuple keyTuple;
- byte[] buf;
- for (int i = 0; i < entryNum; i++) {
- buf = new byte[in.readInt()];
- StorageUtil.readFully(in, buf, 0, buf.length);
- keyTuple = rowStoreDecoder.toTuple(buf);
- dataIndex[i] = keyTuple;
- this.offsetIndex[i] = in.readLong();
- }
- }
-
- /* memory index, only one is used. */
- private Tuple[] dataIndex = null;
- private Tuple[] dataSubIndex = null;
-
- /* offset index */
- private long[] offsetIndex = null;
- private long[][] offsetSubIndex = null;
-
- private boolean correctable = true;
-
- private int oneLevBS(Tuple key) throws IOException {
- correctable = true;
- int pos = binarySearch(this.dataSubIndex, key, 0, this.dataSubIndex.length);
- return pos;
- }
-
- private int twoLevBS(Tuple key, int loadNum) throws IOException {
- int pos = binarySearch(this.dataIndex, key, 0, this.dataIndex.length);
- if(pos > 0) {
- rootCursor = pos;
- } else {
- rootCursor = 0;
- }
- fillLeafIndex(loadNum, subIn, this.offsetIndex[rootCursor]);
- pos = binarySearch(this.dataSubIndex, key, 0, this.dataSubIndex.length);
-
- return pos;
- }
-
- private int binarySearch(Tuple[] arr, Tuple key, int startPos, int endPos) {
- int offset = -1;
- int start = startPos;
- int end = endPos;
-
- //http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6412541
- int centerPos = (start + end) >>> 1;
- while (true) {
- if (comparator.compare(arr[centerPos], key) > 0) {
- if (centerPos == 0) {
- correctable = false;
- break;
- } else if (comparator.compare(arr[centerPos - 1], key) < 0) {
- correctable = false;
- offset = centerPos - 1;
- break;
- } else {
- end = centerPos;
- centerPos = (start + end) / 2;
- }
- } else if (comparator.compare(arr[centerPos], key) < 0) {
- if (centerPos == arr.length - 1) {
- correctable = false;
- offset = centerPos;
- break;
- } else if (comparator.compare(arr[centerPos + 1], key) > 0) {
- correctable = false;
- offset = centerPos;
- break;
- } else {
- start = centerPos + 1;
- centerPos = (start + end) / 2;
- }
- } else {
- correctable = true;
- offset = centerPos;
- break;
- }
- }
- return offset;
- }
-
- @Override
- public void close() throws IOException {
- this.indexIn.close();
- this.subIn.close();
- }
-
- @Override
- public String toString() {
- return "BSTIndex (" + firstKey + ", " + lastKey + ") " + fileName;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
deleted file mode 100644
index b10d423..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.parquet;
-
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.storage.StorageConstants;
-import parquet.hadoop.ParquetOutputFormat;
-import parquet.hadoop.metadata.CompressionCodecName;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.storage.FileAppender;
-import org.apache.tajo.storage.TableStatistics;
-import org.apache.tajo.storage.Tuple;
-
-import java.io.IOException;
-
-/**
- * FileAppender for writing to Parquet files.
- */
-public class ParquetAppender extends FileAppender {
- private TajoParquetWriter writer;
- private int blockSize;
- private int pageSize;
- private CompressionCodecName compressionCodecName;
- private boolean enableDictionary;
- private boolean validating;
- private TableStatistics stats;
-
- /**
- * Creates a new ParquetAppender.
- *
- * @param conf Configuration properties.
- * @param schema The table schema.
- * @param meta The table metadata.
- * @param workDir The path of the Parquet file to write to.
- */
- public ParquetAppender(Configuration conf, QueryUnitAttemptId taskAttemptId, Schema schema, TableMeta meta,
- Path workDir) throws IOException {
- super(conf, taskAttemptId, schema, meta, workDir);
- this.blockSize = Integer.parseInt(
- meta.getOption(ParquetOutputFormat.BLOCK_SIZE, StorageConstants.PARQUET_DEFAULT_BLOCK_SIZE));
- this.pageSize = Integer.parseInt(
- meta.getOption(ParquetOutputFormat.PAGE_SIZE, StorageConstants.PARQUET_DEFAULT_PAGE_SIZE));
- this.compressionCodecName = CompressionCodecName.fromConf(
- meta.getOption(ParquetOutputFormat.COMPRESSION, StorageConstants.PARQUET_DEFAULT_COMPRESSION_CODEC_NAME));
- this.enableDictionary = Boolean.parseBoolean(
- meta.getOption(ParquetOutputFormat.ENABLE_DICTIONARY, StorageConstants.PARQUET_DEFAULT_IS_DICTIONARY_ENABLED));
- this.validating = Boolean.parseBoolean(
- meta.getOption(ParquetOutputFormat.VALIDATION, StorageConstants.PARQUET_DEFAULT_IS_VALIDATION_ENABLED));
- }
-
- /**
- * Initializes the Appender. This method creates a new TajoParquetWriter
- * and initializes the table statistics if enabled.
- */
- public void init() throws IOException {
- writer = new TajoParquetWriter(path,
- schema,
- compressionCodecName,
- blockSize,
- pageSize,
- enableDictionary,
- validating);
- if (enabledStats) {
- this.stats = new TableStatistics(schema);
- }
- super.init();
- }
-
- /**
- * Gets the current offset. Tracking offsets is currenly not implemented, so
- * this method always returns 0.
- *
- * @return 0
- */
- @Override
- public long getOffset() throws IOException {
- return 0;
- }
-
- /**
- * Write a Tuple to the Parquet file.
- *
- * @param tuple The Tuple to write.
- */
- @Override
- public void addTuple(Tuple tuple) throws IOException {
- if (enabledStats) {
- for (int i = 0; i < schema.size(); ++i) {
- stats.analyzeField(i, tuple.get(i));
- }
- }
- writer.write(tuple);
- if (enabledStats) {
- stats.incrementRow();
- }
- }
-
- /**
- * The ParquetWriter does not need to be flushed, so this is a no-op.
- */
- @Override
- public void flush() throws IOException {
- }
-
- /**
- * Closes the Appender.
- */
- @Override
- public void close() throws IOException {
- writer.close();
- }
-
- public long getEstimatedOutputSize() throws IOException {
- return writer.getEstimatedWrittenSize();
- }
-
- /**
- * If table statistics is enabled, retrieve the table statistics.
- *
- * @return Table statistics if enabled or null otherwise.
- */
- @Override
- public TableStats getStats() {
- if (enabledStats) {
- return stats.getTableStat();
- } else {
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java
deleted file mode 100644
index 2f8efcf..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.parquet;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.storage.FileScanner;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.fragment.Fragment;
-
-import java.io.IOException;
-
-/**
- * FileScanner for reading Parquet files
- */
-public class ParquetScanner extends FileScanner {
- private TajoParquetReader reader;
-
- /**
- * Creates a new ParquetScanner.
- *
- * @param conf
- * @param schema
- * @param meta
- * @param fragment
- */
- public ParquetScanner(Configuration conf, final Schema schema,
- final TableMeta meta, final Fragment fragment) {
- super(conf, schema, meta, fragment);
- }
-
- /**
- * Initializes the ParquetScanner. This method initializes the
- * TajoParquetReader.
- */
- @Override
- public void init() throws IOException {
- if (targets == null) {
- targets = schema.toArray();
- }
- reader = new TajoParquetReader(fragment.getPath(), schema, new Schema(targets));
- super.init();
- }
-
- /**
- * Reads the next Tuple from the Parquet file.
- *
- * @return The next Tuple from the Parquet file or null if end of file is
- * reached.
- */
- @Override
- public Tuple next() throws IOException {
- return reader.read();
- }
-
- /**
- * Resets the scanner
- */
- @Override
- public void reset() throws IOException {
- }
-
- /**
- * Closes the scanner.
- */
- @Override
- public void close() throws IOException {
- if (reader != null) {
- reader.close();
- }
- }
-
- /**
- * Returns whether this scanner is projectable.
- *
- * @return true
- */
- @Override
- public boolean isProjectable() {
- return true;
- }
-
- /**
- * Returns whether this scanner is selectable.
- *
- * @return false
- */
- @Override
- public boolean isSelectable() {
- return false;
- }
-
- /**
- * Returns whether this scanner is splittable.
- *
- * @return false
- */
- @Override
- public boolean isSplittable() {
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java
deleted file mode 100644
index a765f48..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.parquet;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.thirdparty.parquet.ParquetReader;
-import parquet.filter.UnboundRecordFilter;
-
-import java.io.IOException;
-
-/**
- * Tajo implementation of {@link ParquetReader} to read Tajo records from a
- * Parquet file. Users should use {@link ParquetScanner} and not this class
- * directly.
- */
-public class TajoParquetReader extends ParquetReader<Tuple> {
- /**
- * Creates a new TajoParquetReader.
- *
- * @param file The file to read from.
- * @param readSchema Tajo schema of the table.
- */
- public TajoParquetReader(Path file, Schema readSchema) throws IOException {
- super(file, new TajoReadSupport(readSchema));
- }
-
- /**
- * Creates a new TajoParquetReader.
- *
- * @param file The file to read from.
- * @param readSchema Tajo schema of the table.
- * @param requestedSchema Tajo schema of the projection.
- */
- public TajoParquetReader(Path file, Schema readSchema,
- Schema requestedSchema) throws IOException {
- super(file, new TajoReadSupport(readSchema, requestedSchema));
- }
-
- /**
- * Creates a new TajoParquetReader.
- *
- * @param file The file to read from.
- * @param readSchema Tajo schema of the table.
- * @param recordFilter Record filter.
- */
- public TajoParquetReader(Path file, Schema readSchema,
- UnboundRecordFilter recordFilter)
- throws IOException {
- super(file, new TajoReadSupport(readSchema), recordFilter);
- }
-
- /**
- * Creates a new TajoParquetReader.
- *
- * @param file The file to read from.
- * @param readSchema Tajo schema of the table.
- * @param requestedSchema Tajo schema of the projection.
- * @param recordFilter Record filter.
- */
- public TajoParquetReader(Path file, Schema readSchema,
- Schema requestedSchema,
- UnboundRecordFilter recordFilter)
- throws IOException {
- super(file, new TajoReadSupport(readSchema, requestedSchema),
- recordFilter);
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java
deleted file mode 100644
index 69b76c4..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.parquet;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.thirdparty.parquet.ParquetWriter;
-import parquet.hadoop.metadata.CompressionCodecName;
-
-import java.io.IOException;
-
-/**
- * Tajo implementation of {@link ParquetWriter} to write Tajo records to a
- * Parquet file. Users should use {@link ParquetAppender} and not this class
- * directly.
- */
-public class TajoParquetWriter extends ParquetWriter<Tuple> {
- /**
- * Create a new TajoParquetWriter
- *
- * @param file The file name to write to.
- * @param schema The Tajo schema of the table.
- * @param compressionCodecName Compression codec to use, or
- * CompressionCodecName.UNCOMPRESSED.
- * @param blockSize The block size threshold.
- * @param pageSize See parquet write up. Blocks are subdivided into pages
- * for alignment.
- * @throws IOException
- */
- public TajoParquetWriter(Path file,
- Schema schema,
- CompressionCodecName compressionCodecName,
- int blockSize,
- int pageSize) throws IOException {
- super(file,
- new TajoWriteSupport(schema),
- compressionCodecName,
- blockSize,
- pageSize);
- }
-
- /**
- * Create a new TajoParquetWriter.
- *
- * @param file The file name to write to.
- * @param schema The Tajo schema of the table.
- * @param compressionCodecName Compression codec to use, or
- * CompressionCodecName.UNCOMPRESSED.
- * @param blockSize The block size threshold.
- * @param pageSize See parquet write up. Blocks are subdivided into pages
- * for alignment.
- * @param enableDictionary Whether to use a dictionary to compress columns.
- * @param validating Whether to turn on validation.
- * @throws IOException
- */
- public TajoParquetWriter(Path file,
- Schema schema,
- CompressionCodecName compressionCodecName,
- int blockSize,
- int pageSize,
- boolean enableDictionary,
- boolean validating) throws IOException {
- super(file,
- new TajoWriteSupport(schema),
- compressionCodecName,
- blockSize,
- pageSize,
- enableDictionary,
- validating);
- }
-
- /**
- * Creates a new TajoParquetWriter. The default block size is 128 MB.
- * The default page size is 1 MB. Default compression is no compression.
- *
- * @param file The Path of the file to write to.
- * @param schema The Tajo schema of the table.
- * @throws IOException
- */
- public TajoParquetWriter(Path file, Schema schema) throws IOException {
- this(file,
- schema,
- CompressionCodecName.UNCOMPRESSED,
- DEFAULT_BLOCK_SIZE,
- DEFAULT_PAGE_SIZE);
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java
deleted file mode 100644
index 269f782..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.parquet;
-
-import java.util.Map;
-
-import parquet.Log;
-import parquet.hadoop.api.InitContext;
-import parquet.hadoop.api.ReadSupport;
-import parquet.io.api.RecordMaterializer;
-import parquet.schema.MessageType;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.json.CatalogGsonHelper;
-import org.apache.tajo.storage.Tuple;
-
-/**
- * Tajo implementation of {@link ReadSupport} for {@link Tuple}s.
- * Users should use {@link ParquetScanner} and not this class directly.
- */
-public class TajoReadSupport extends ReadSupport<Tuple> {
- private static final Log LOG = Log.getLog(TajoReadSupport.class);
-
- private Schema readSchema;
- private Schema requestedSchema;
-
- /**
- * Creates a new TajoReadSupport.
- *
- * @param requestedSchema The Tajo schema of the requested projection passed
- * down by ParquetScanner.
- */
- public TajoReadSupport(Schema readSchema, Schema requestedSchema) {
- super();
- this.readSchema = readSchema;
- this.requestedSchema = requestedSchema;
- }
-
- /**
- * Creates a new TajoReadSupport.
- *
- * @param readSchema The schema of the table.
- */
- public TajoReadSupport(Schema readSchema) {
- super();
- this.readSchema = readSchema;
- this.requestedSchema = readSchema;
- }
-
- /**
- * Initializes the ReadSupport.
- *
- * @param context The InitContext.
- * @return A ReadContext that defines how to read the file.
- */
- @Override
- public ReadContext init(InitContext context) {
- if (requestedSchema == null) {
- throw new RuntimeException("requestedSchema is null.");
- }
- MessageType requestedParquetSchema =
- new TajoSchemaConverter().convert(requestedSchema);
- LOG.debug("Reading data with projection:\n" + requestedParquetSchema);
- return new ReadContext(requestedParquetSchema);
- }
-
- /**
- * Prepares for read.
- *
- * @param configuration The job configuration.
- * @param keyValueMetaData App-specific metadata from the file.
- * @param fileSchema The schema of the Parquet file.
- * @param readContext Returned by the init method.
- */
- @Override
- public RecordMaterializer<Tuple> prepareForRead(
- Configuration configuration,
- Map<String, String> keyValueMetaData,
- MessageType fileSchema,
- ReadContext readContext) {
- MessageType parquetRequestedSchema = readContext.getRequestedSchema();
- return new TajoRecordMaterializer(parquetRequestedSchema, requestedSchema, readSchema);
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
deleted file mode 100644
index 7c3d79d..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
+++ /dev/null
@@ -1,386 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.parquet;
-
-import com.google.protobuf.Message;
-import com.google.protobuf.InvalidProtocolBufferException;
-
-import java.nio.ByteBuffer;
-
-import parquet.io.api.GroupConverter;
-import parquet.io.api.Converter;
-import parquet.io.api.PrimitiveConverter;
-import parquet.io.api.Binary;
-import parquet.schema.Type;
-import parquet.schema.GroupType;
-
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.common.TajoDataTypes.DataType;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.BlobDatum;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.datum.ProtobufDatumFactory;
-
-/**
- * Converter to convert a Parquet record into a Tajo Tuple.
- */
-public class TajoRecordConverter extends GroupConverter {
- private final GroupType parquetSchema;
- private final Schema tajoReadSchema;
- private final int[] projectionMap;
- private final int tupleSize;
-
- private final Converter[] converters;
-
- private Tuple currentTuple;
-
- /**
- * Creates a new TajoRecordConverter.
- *
- * @param parquetSchema The Parquet schema of the projection.
- * @param tajoReadSchema The Tajo schema of the table.
- * @param projectionMap An array mapping the projection column to the column
- * index in the table.
- */
- public TajoRecordConverter(GroupType parquetSchema, Schema tajoReadSchema,
- int[] projectionMap) {
- this.parquetSchema = parquetSchema;
- this.tajoReadSchema = tajoReadSchema;
- this.projectionMap = projectionMap;
- this.tupleSize = tajoReadSchema.size();
-
- // The projectionMap.length does not match parquetSchema.getFieldCount()
- // when the projection contains NULL_TYPE columns. We will skip over the
- // NULL_TYPE columns when we construct the converters and populate the
- // NULL_TYPE columns with NullDatums in start().
- int index = 0;
- this.converters = new Converter[parquetSchema.getFieldCount()];
- for (int i = 0; i < projectionMap.length; ++i) {
- final int projectionIndex = projectionMap[i];
- Column column = tajoReadSchema.getColumn(projectionIndex);
- if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE) {
- continue;
- }
- Type type = parquetSchema.getType(index);
- converters[index] = newConverter(column, type, new ParentValueContainer() {
- @Override
- void add(Object value) {
- TajoRecordConverter.this.set(projectionIndex, value);
- }
- });
- ++index;
- }
- }
-
- private void set(int index, Object value) {
- currentTuple.put(index, (Datum)value);
- }
-
- private Converter newConverter(Column column, Type type,
- ParentValueContainer parent) {
- DataType dataType = column.getDataType();
- switch (dataType.getType()) {
- case BOOLEAN:
- return new FieldBooleanConverter(parent);
- case BIT:
- return new FieldBitConverter(parent);
- case CHAR:
- return new FieldCharConverter(parent);
- case INT2:
- return new FieldInt2Converter(parent);
- case INT4:
- return new FieldInt4Converter(parent);
- case INT8:
- return new FieldInt8Converter(parent);
- case FLOAT4:
- return new FieldFloat4Converter(parent);
- case FLOAT8:
- return new FieldFloat8Converter(parent);
- case INET4:
- return new FieldInet4Converter(parent);
- case INET6:
- throw new RuntimeException("No converter for INET6");
- case TEXT:
- return new FieldTextConverter(parent);
- case PROTOBUF:
- return new FieldProtobufConverter(parent, dataType);
- case BLOB:
- return new FieldBlobConverter(parent);
- case NULL_TYPE:
- throw new RuntimeException("No converter for NULL_TYPE.");
- default:
- throw new RuntimeException("Unsupported data type");
- }
- }
-
- /**
- * Gets the converter for a specific field.
- *
- * @param fieldIndex Index of the field in the projection.
- * @return The converter for the field.
- */
- @Override
- public Converter getConverter(int fieldIndex) {
- return converters[fieldIndex];
- }
-
- /**
- * Called before processing fields. This method fills any fields that have
- * NULL values or have type NULL_TYPE with a NullDatum.
- */
- @Override
- public void start() {
- currentTuple = new VTuple(tupleSize);
- }
-
- /**
- * Called after all fields have been processed.
- */
- @Override
- public void end() {
- for (int i = 0; i < projectionMap.length; ++i) {
- final int projectionIndex = projectionMap[i];
- Column column = tajoReadSchema.getColumn(projectionIndex);
- if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE
- || currentTuple.get(projectionIndex) == null) {
- set(projectionIndex, NullDatum.get());
- }
- }
- }
-
- /**
- * Returns the current record converted by this converter.
- *
- * @return The current record.
- */
- public Tuple getCurrentRecord() {
- return currentTuple;
- }
-
- static abstract class ParentValueContainer {
- /**
- * Adds the value to the parent.
- *
- * @param value The value to add.
- */
- abstract void add(Object value);
- }
-
- static final class FieldBooleanConverter extends PrimitiveConverter {
- private final ParentValueContainer parent;
-
- public FieldBooleanConverter(ParentValueContainer parent) {
- this.parent = parent;
- }
-
- @Override
- final public void addBoolean(boolean value) {
- parent.add(DatumFactory.createBool(value));
- }
- }
-
- static final class FieldBitConverter extends PrimitiveConverter {
- private final ParentValueContainer parent;
-
- public FieldBitConverter(ParentValueContainer parent) {
- this.parent = parent;
- }
-
- @Override
- final public void addInt(int value) {
- parent.add(DatumFactory.createBit((byte)(value & 0xff)));
- }
- }
-
- static final class FieldCharConverter extends PrimitiveConverter {
- private final ParentValueContainer parent;
-
- public FieldCharConverter(ParentValueContainer parent) {
- this.parent = parent;
- }
-
- @Override
- final public void addBinary(Binary value) {
- parent.add(DatumFactory.createChar(value.toStringUsingUTF8()));
- }
- }
-
- static final class FieldInt2Converter extends PrimitiveConverter {
- private final ParentValueContainer parent;
-
- public FieldInt2Converter(ParentValueContainer parent) {
- this.parent = parent;
- }
-
- @Override
- final public void addInt(int value) {
- parent.add(DatumFactory.createInt2((short)value));
- }
- }
-
- static final class FieldInt4Converter extends PrimitiveConverter {
- private final ParentValueContainer parent;
-
- public FieldInt4Converter(ParentValueContainer parent) {
- this.parent = parent;
- }
-
- @Override
- final public void addInt(int value) {
- parent.add(DatumFactory.createInt4(value));
- }
- }
-
- static final class FieldInt8Converter extends PrimitiveConverter {
- private final ParentValueContainer parent;
-
- public FieldInt8Converter(ParentValueContainer parent) {
- this.parent = parent;
- }
-
- @Override
- final public void addLong(long value) {
- parent.add(DatumFactory.createInt8(value));
- }
-
- @Override
- final public void addInt(int value) {
- parent.add(DatumFactory.createInt8(Long.valueOf(value)));
- }
- }
-
- static final class FieldFloat4Converter extends PrimitiveConverter {
- private final ParentValueContainer parent;
-
- public FieldFloat4Converter(ParentValueContainer parent) {
- this.parent = parent;
- }
-
- @Override
- final public void addInt(int value) {
- parent.add(DatumFactory.createFloat4(Float.valueOf(value)));
- }
-
- @Override
- final public void addLong(long value) {
- parent.add(DatumFactory.createFloat4(Float.valueOf(value)));
- }
-
- @Override
- final public void addFloat(float value) {
- parent.add(DatumFactory.createFloat4(value));
- }
- }
-
- static final class FieldFloat8Converter extends PrimitiveConverter {
- private final ParentValueContainer parent;
-
- public FieldFloat8Converter(ParentValueContainer parent) {
- this.parent = parent;
- }
-
- @Override
- final public void addInt(int value) {
- parent.add(DatumFactory.createFloat8(Double.valueOf(value)));
- }
-
- @Override
- final public void addLong(long value) {
- parent.add(DatumFactory.createFloat8(Double.valueOf(value)));
- }
-
- @Override
- final public void addFloat(float value) {
- parent.add(DatumFactory.createFloat8(Double.valueOf(value)));
- }
-
- @Override
- final public void addDouble(double value) {
- parent.add(DatumFactory.createFloat8(value));
- }
- }
-
- static final class FieldInet4Converter extends PrimitiveConverter {
- private final ParentValueContainer parent;
-
- public FieldInet4Converter(ParentValueContainer parent) {
- this.parent = parent;
- }
-
- @Override
- final public void addBinary(Binary value) {
- parent.add(DatumFactory.createInet4(value.getBytes()));
- }
- }
-
- static final class FieldTextConverter extends PrimitiveConverter {
- private final ParentValueContainer parent;
-
- public FieldTextConverter(ParentValueContainer parent) {
- this.parent = parent;
- }
-
- @Override
- final public void addBinary(Binary value) {
- parent.add(DatumFactory.createText(value.toStringUsingUTF8()));
- }
- }
-
- static final class FieldBlobConverter extends PrimitiveConverter {
- private final ParentValueContainer parent;
-
- public FieldBlobConverter(ParentValueContainer parent) {
- this.parent = parent;
- }
-
- @Override
- final public void addBinary(Binary value) {
- parent.add(new BlobDatum(ByteBuffer.wrap(value.getBytes())));
- }
- }
-
- static final class FieldProtobufConverter extends PrimitiveConverter {
- private final ParentValueContainer parent;
- private final DataType dataType;
-
- public FieldProtobufConverter(ParentValueContainer parent,
- DataType dataType) {
- this.parent = parent;
- this.dataType = dataType;
- }
-
- @Override
- final public void addBinary(Binary value) {
- try {
- ProtobufDatumFactory factory =
- ProtobufDatumFactory.get(dataType.getCode());
- Message.Builder builder = factory.newBuilder();
- builder.mergeFrom(value.getBytes());
- parent.add(factory.createDatum(builder));
- } catch (InvalidProtocolBufferException e) {
- throw new RuntimeException(e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java
deleted file mode 100644
index e31828c..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.parquet;
-
-import parquet.io.api.GroupConverter;
-import parquet.io.api.RecordMaterializer;
-import parquet.schema.MessageType;
-
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.storage.Tuple;
-
-/**
- * Materializes a Tajo Tuple from a stream of Parquet data.
- */
-class TajoRecordMaterializer extends RecordMaterializer<Tuple> {
- private final TajoRecordConverter root;
-
- /**
- * Creates a new TajoRecordMaterializer.
- *
- * @param parquetSchema The Parquet schema of the projection.
- * @param tajoSchema The Tajo schema of the projection.
- * @param tajoReadSchema The Tajo schema of the table.
- */
- public TajoRecordMaterializer(MessageType parquetSchema, Schema tajoSchema,
- Schema tajoReadSchema) {
- int[] projectionMap = getProjectionMap(tajoReadSchema, tajoSchema);
- this.root = new TajoRecordConverter(parquetSchema, tajoReadSchema,
- projectionMap);
- }
-
- private int[] getProjectionMap(Schema schema, Schema projection) {
- Column[] targets = projection.toArray();
- int[] projectionMap = new int[targets.length];
- for (int i = 0; i < targets.length; ++i) {
- int tid = schema.getColumnId(targets[i].getQualifiedName());
- projectionMap[i] = tid;
- }
- return projectionMap;
- }
-
- /**
- * Returns the current record being materialized.
- *
- * @return The record being materialized.
- */
- @Override
- public Tuple getCurrentRecord() {
- return root.getCurrentRecord();
- }
-
- /**
- * Returns the root converter.
- *
- * @return The root converter
- */
- @Override
- public GroupConverter getRootConverter() {
- return root;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java
deleted file mode 100644
index 2592231..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.parquet;
-
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.common.TajoDataTypes;
-import parquet.schema.MessageType;
-import parquet.schema.OriginalType;
-import parquet.schema.PrimitiveType;
-import parquet.schema.PrimitiveType.PrimitiveTypeName;
-import parquet.schema.Type;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Converts between Parquet and Tajo schemas. See package documentation for
- * details on the mapping.
- */
-public class TajoSchemaConverter {
- private static final String TABLE_SCHEMA = "table_schema";
-
- /**
- * Creates a new TajoSchemaConverter.
- */
- public TajoSchemaConverter() {
- }
-
- /**
- * Converts a Parquet schema to a Tajo schema.
- *
- * @param parquetSchema The Parquet schema to convert.
- * @return The resulting Tajo schema.
- */
- public Schema convert(MessageType parquetSchema) {
- return convertFields(parquetSchema.getFields());
- }
-
- private Schema convertFields(List<Type> parquetFields) {
- List<Column> columns = new ArrayList<Column>();
- for (int i = 0; i < parquetFields.size(); ++i) {
- Type fieldType = parquetFields.get(i);
- if (fieldType.isRepetition(Type.Repetition.REPEATED)) {
- throw new RuntimeException("REPEATED not supported outside LIST or" +
- " MAP. Type: " + fieldType);
- }
- columns.add(convertField(fieldType));
- }
- Column[] columnsArray = new Column[columns.size()];
- columnsArray = columns.toArray(columnsArray);
- return new Schema(columnsArray);
- }
-
- private Column convertField(final Type fieldType) {
- if (fieldType.isPrimitive()) {
- return convertPrimitiveField(fieldType);
- } else {
- return convertComplexField(fieldType);
- }
- }
-
- private Column convertPrimitiveField(final Type fieldType) {
- final String fieldName = fieldType.getName();
- final PrimitiveTypeName parquetPrimitiveTypeName =
- fieldType.asPrimitiveType().getPrimitiveTypeName();
- final OriginalType originalType = fieldType.getOriginalType();
- return parquetPrimitiveTypeName.convert(
- new PrimitiveType.PrimitiveTypeNameConverter<Column, RuntimeException>() {
- @Override
- public Column convertBOOLEAN(PrimitiveTypeName primitiveTypeName) {
- return new Column(fieldName, TajoDataTypes.Type.BOOLEAN);
- }
-
- @Override
- public Column convertINT32(PrimitiveTypeName primitiveTypeName) {
- return new Column(fieldName, TajoDataTypes.Type.INT4);
- }
-
- @Override
- public Column convertINT64(PrimitiveTypeName primitiveTypeName) {
- return new Column(fieldName, TajoDataTypes.Type.INT8);
- }
-
- @Override
- public Column convertFLOAT(PrimitiveTypeName primitiveTypeName) {
- return new Column(fieldName, TajoDataTypes.Type.FLOAT4);
- }
-
- @Override
- public Column convertDOUBLE(PrimitiveTypeName primitiveTypeName) {
- return new Column(fieldName, TajoDataTypes.Type.FLOAT8);
- }
-
- @Override
- public Column convertFIXED_LEN_BYTE_ARRAY(
- PrimitiveTypeName primitiveTypeName) {
- return new Column(fieldName, TajoDataTypes.Type.BLOB);
- }
-
- @Override
- public Column convertBINARY(PrimitiveTypeName primitiveTypeName) {
- if (originalType == OriginalType.UTF8) {
- return new Column(fieldName, TajoDataTypes.Type.TEXT);
- } else {
- return new Column(fieldName, TajoDataTypes.Type.BLOB);
- }
- }
-
- @Override
- public Column convertINT96(PrimitiveTypeName primitiveTypeName) {
- throw new RuntimeException("Converting from INT96 not supported.");
- }
- });
- }
-
- private Column convertComplexField(final Type fieldType) {
- throw new RuntimeException("Complex types not supported.");
- }
-
- /**
- * Converts a Tajo schema to a Parquet schema.
- *
- * @param tajoSchema The Tajo schema to convert.
- * @return The resulting Parquet schema.
- */
- public MessageType convert(Schema tajoSchema) {
- List<Type> types = new ArrayList<Type>();
- for (int i = 0; i < tajoSchema.size(); ++i) {
- Column column = tajoSchema.getColumn(i);
- if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE) {
- continue;
- }
- types.add(convertColumn(column));
- }
- return new MessageType(TABLE_SCHEMA, types);
- }
-
- private Type convertColumn(Column column) {
- TajoDataTypes.Type type = column.getDataType().getType();
- switch (type) {
- case BOOLEAN:
- return primitive(column.getSimpleName(),
- PrimitiveType.PrimitiveTypeName.BOOLEAN);
- case BIT:
- case INT2:
- case INT4:
- return primitive(column.getSimpleName(),
- PrimitiveType.PrimitiveTypeName.INT32);
- case INT8:
- return primitive(column.getSimpleName(),
- PrimitiveType.PrimitiveTypeName.INT64);
- case FLOAT4:
- return primitive(column.getSimpleName(),
- PrimitiveType.PrimitiveTypeName.FLOAT);
- case FLOAT8:
- return primitive(column.getSimpleName(),
- PrimitiveType.PrimitiveTypeName.DOUBLE);
- case CHAR:
- case TEXT:
- return primitive(column.getSimpleName(),
- PrimitiveType.PrimitiveTypeName.BINARY,
- OriginalType.UTF8);
- case PROTOBUF:
- return primitive(column.getSimpleName(),
- PrimitiveType.PrimitiveTypeName.BINARY);
- case BLOB:
- return primitive(column.getSimpleName(),
- PrimitiveType.PrimitiveTypeName.BINARY);
- case INET4:
- case INET6:
- return primitive(column.getSimpleName(),
- PrimitiveType.PrimitiveTypeName.BINARY);
- default:
- throw new RuntimeException("Cannot convert Tajo type: " + type);
- }
- }
-
- private PrimitiveType primitive(String name,
- PrimitiveType.PrimitiveTypeName primitive) {
- return new PrimitiveType(Type.Repetition.OPTIONAL, primitive, name, null);
- }
-
- private PrimitiveType primitive(String name,
- PrimitiveType.PrimitiveTypeName primitive,
- OriginalType originalType) {
- return new PrimitiveType(Type.Repetition.OPTIONAL, primitive, name,
- originalType);
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java
deleted file mode 100644
index 35165de..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.parquet;
-
-import java.util.Map;
-import java.util.HashMap;
-import java.util.List;
-
-import parquet.hadoop.api.WriteSupport;
-import parquet.io.api.Binary;
-import parquet.io.api.RecordConsumer;
-import parquet.schema.GroupType;
-import parquet.schema.MessageType;
-import parquet.schema.Type;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.datum.Datum;
-
-/**
- * Tajo implementation of {@link WriteSupport} for {@link Tuple}s.
- * Users should use {@link ParquetAppender} and not this class directly.
- */
-public class TajoWriteSupport extends WriteSupport<Tuple> {
- private RecordConsumer recordConsumer;
- private MessageType rootSchema;
- private Schema rootTajoSchema;
-
- /**
- * Creates a new TajoWriteSupport.
- *
- * @param tajoSchema The Tajo schema for the table.
- */
- public TajoWriteSupport(Schema tajoSchema) {
- this.rootSchema = new TajoSchemaConverter().convert(tajoSchema);
- this.rootTajoSchema = tajoSchema;
- }
-
- /**
- * Initializes the WriteSupport.
- *
- * @param configuration The job's configuration.
- * @return A WriteContext that describes how to write the file.
- */
- @Override
- public WriteContext init(Configuration configuration) {
- Map<String, String> extraMetaData = new HashMap<String, String>();
- return new WriteContext(rootSchema, extraMetaData);
- }
-
- /**
- * Called once per row group.
- *
- * @param recordConsumer The {@link RecordConsumer} to write to.
- */
- @Override
- public void prepareForWrite(RecordConsumer recordConsumer) {
- this.recordConsumer = recordConsumer;
- }
-
- /**
- * Writes a Tuple to the file.
- *
- * @param tuple The Tuple to write to the file.
- */
- @Override
- public void write(Tuple tuple) {
- recordConsumer.startMessage();
- writeRecordFields(rootSchema, rootTajoSchema, tuple);
- recordConsumer.endMessage();
- }
-
- private void writeRecordFields(GroupType schema, Schema tajoSchema,
- Tuple tuple) {
- List<Type> fields = schema.getFields();
- // Parquet ignores Tajo NULL_TYPE columns, so the index may differ.
- int index = 0;
- for (int tajoIndex = 0; tajoIndex < tajoSchema.size(); ++tajoIndex) {
- Column column = tajoSchema.getColumn(tajoIndex);
- if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE) {
- continue;
- }
- Datum datum = tuple.get(tajoIndex);
- Type fieldType = fields.get(index);
- if (!tuple.isNull(tajoIndex)) {
- recordConsumer.startField(fieldType.getName(), index);
- writeValue(fieldType, column, datum);
- recordConsumer.endField(fieldType.getName(), index);
- } else if (fieldType.isRepetition(Type.Repetition.REQUIRED)) {
- throw new RuntimeException("Null-value for required field: " +
- column.getSimpleName());
- }
- ++index;
- }
- }
-
- private void writeValue(Type fieldType, Column column, Datum datum) {
- switch (column.getDataType().getType()) {
- case BOOLEAN:
- recordConsumer.addBoolean((Boolean) datum.asBool());
- break;
- case BIT:
- case INT2:
- case INT4:
- recordConsumer.addInteger(datum.asInt4());
- break;
- case INT8:
- recordConsumer.addLong(datum.asInt8());
- break;
- case FLOAT4:
- recordConsumer.addFloat(datum.asFloat4());
- break;
- case FLOAT8:
- recordConsumer.addDouble(datum.asFloat8());
- break;
- case CHAR:
- case TEXT:
- recordConsumer.addBinary(Binary.fromString(datum.asChars()));
- break;
- case PROTOBUF:
- case BLOB:
- case INET4:
- case INET6:
- recordConsumer.addBinary(Binary.fromByteArray(datum.asByteArray()));
- break;
- default:
- break;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/package-info.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/package-info.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/package-info.java
deleted file mode 100644
index d7d16b7..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/package-info.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * <p>
- * Provides read and write support for Parquet files. Tajo schemas are
- * converted to Parquet schemas according to the following mapping of Tajo
- * and Parquet types:
- * </p>
- *
- * <table>
- * <tr>
- * <th>Tajo type</th>
- * <th>Parquet type</th>
- * </tr>
- * <tr>
- * <td>NULL_TYPE</td>
- * <td>No type. The field is not encoded in Parquet.</td>
- * </tr>
- * <tr>
- * <td>BOOLEAN</td>
- * <td>BOOLEAN</td>
- * </tr>
- * <tr>
- * <td>BIT</td>
- * <td>INT32</td>
- * </tr>
- * <tr>
- * <td>INT2</td>
- * <td>INT32</td>
- * </tr>
- * <tr>
- * <td>INT4</td>
- * <td>INT32</td>
- * </tr>
- * <tr>
- * <td>INT8</td>
- * <td>INT64</td>
- * </tr>
- * <tr>
- * <td>FLOAT4</td>
- * <td>FLOAT</td>
- * </tr>
- * <tr>
- * <td>FLOAT8</td>
- * <td>DOUBLE</td>
- * </tr>
- * <tr>
- * <td>CHAR</td>
- * <td>BINARY (with OriginalType UTF8)</td>
- * </tr>
- * <tr>
- * <td>TEXT</td>
- * <td>BINARY (with OriginalType UTF8)</td>
- * </tr>
- * <tr>
- * <td>PROTOBUF</td>
- * <td>BINARY</td>
- * </tr>
- * <tr>
- * <td>BLOB</td>
- * <td>BINARY</td>
- * </tr>
- * <tr>
- * <td>INET4</td>
- * <td>BINARY</td>
- * </tr>
- * </table>
- *
- * <p>
- * Because Tajo fields can be NULL, all Parquet fields are marked as optional.
- * </p>
- *
- * <p>
- * The conversion from Tajo to Parquet is lossy without the original Tajo
- * schema. As a result, Parquet files are read using the Tajo schema saved in
- * the Tajo catalog for the table the Parquet files belong to, which was
- * defined when the table was created.
- * </p>
- */
-
-package org.apache.tajo.storage.parquet;
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefArrayWritable.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefArrayWritable.java b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefArrayWritable.java
deleted file mode 100644
index 5e200a0..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefArrayWritable.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.rcfile;
-
-import com.google.common.base.Objects;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableFactories;
-import org.apache.hadoop.io.WritableFactory;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Arrays;
-
-/**
- * <tt>BytesRefArrayWritable</tt> holds an array reference to BytesRefWritable,
- * and is able to resize without recreating new array if not necessary.
- * <p>
- *
- * Each <tt>BytesRefArrayWritable holds</tt> instance has a <i>valid</i> field,
- * which is the desired valid number of <tt>BytesRefWritable</tt> it holds.
- * <tt>resetValid</tt> can reset the valid, but it will not care the underlying
- * BytesRefWritable.
- */
-
-public class BytesRefArrayWritable implements Writable,
- Comparable<BytesRefArrayWritable> {
-
- private BytesRefWritable[] bytesRefWritables = null;
-
- private int valid = 0;
-
- /**
- * Constructs an empty array with the specified capacity.
- *
- * @param capacity
- * initial capacity
- * @exception IllegalArgumentException
- * if the specified initial capacity is negative
- */
- public BytesRefArrayWritable(int capacity) {
- if (capacity < 0) {
- throw new IllegalArgumentException("Capacity can not be negative.");
- }
- bytesRefWritables = new BytesRefWritable[0];
- ensureCapacity(capacity);
- }
-
- /**
- * Constructs an empty array with a capacity of ten.
- */
- public BytesRefArrayWritable() {
- this(10);
- }
-
- /**
- * Returns the number of valid elements.
- *
- * @return the number of valid elements
- */
- public int size() {
- return valid;
- }
-
- /**
- * Gets the BytesRefWritable at the specified position. Make sure the position
- * is valid by first call resetValid.
- *
- * @param index
- * the position index, starting from zero
- * @throws IndexOutOfBoundsException
- */
- public BytesRefWritable get(int index) {
- if (index >= valid) {
- throw new IndexOutOfBoundsException(
- "This BytesRefArrayWritable only has " + valid + " valid values.");
- }
- return bytesRefWritables[index];
- }
-
- /**
- * Gets the BytesRefWritable at the specified position without checking.
- *
- * @param index
- * the position index, starting from zero
- * @throws IndexOutOfBoundsException
- */
- public BytesRefWritable unCheckedGet(int index) {
- return bytesRefWritables[index];
- }
-
- /**
- * Set the BytesRefWritable at the specified position with the specified
- * BytesRefWritable.
- *
- * @param index
- * index position
- * @param bytesRefWritable
- * the new element
- * @throws IllegalArgumentException
- * if the specified new element is null
- */
- public void set(int index, BytesRefWritable bytesRefWritable) {
- if (bytesRefWritable == null) {
- throw new IllegalArgumentException("Can not assign null.");
- }
- ensureCapacity(index + 1);
- bytesRefWritables[index] = bytesRefWritable;
- if (valid <= index) {
- valid = index + 1;
- }
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public int compareTo(BytesRefArrayWritable other) {
- if (other == null) {
- throw new IllegalArgumentException("Argument can not be null.");
- }
- if (this == other) {
- return 0;
- }
- int sizeDiff = valid - other.valid;
- if (sizeDiff != 0) {
- return sizeDiff;
- }
- for (int i = 0; i < valid; i++) {
- if (other.contains(bytesRefWritables[i])) {
- continue;
- } else {
- return 1;
- }
- }
- return 0;
- }
-
- @Override
- public int hashCode(){
- return Objects.hashCode(bytesRefWritables);
- }
- /**
- * Returns <tt>true</tt> if this instance contains one or more the specified
- * BytesRefWritable.
- *
- * @param bytesRefWritable
- * BytesRefWritable element to be tested
- * @return <tt>true</tt> if contains the specified element
- * @throws IllegalArgumentException
- * if the specified element is null
- */
- public boolean contains(BytesRefWritable bytesRefWritable) {
- if (bytesRefWritable == null) {
- throw new IllegalArgumentException("Argument can not be null.");
- }
- for (int i = 0; i < valid; i++) {
- if (bytesRefWritables[i].equals(bytesRefWritable)) {
- return true;
- }
- }
- return false;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean equals(Object o) {
- if (o == null || !(o instanceof BytesRefArrayWritable)) {
- return false;
- }
- return compareTo((BytesRefArrayWritable) o) == 0;
- }
-
- /**
- * Removes all elements.
- */
- public void clear() {
- valid = 0;
- }
-
- /**
- * enlarge the capacity if necessary, to ensure that it can hold the number of
- * elements specified by newValidCapacity argument. It will also narrow the
- * valid capacity when needed. Notice: it only enlarge or narrow the valid
- * capacity with no care of the already stored invalid BytesRefWritable.
- *
- * @param newValidCapacity
- * the desired capacity
- */
- public void resetValid(int newValidCapacity) {
- ensureCapacity(newValidCapacity);
- valid = newValidCapacity;
- }
-
- protected void ensureCapacity(int newCapacity) {
- int size = bytesRefWritables.length;
- if (size < newCapacity) {
- bytesRefWritables = Arrays.copyOf(bytesRefWritables, newCapacity);
- while (size < newCapacity) {
- bytesRefWritables[size] = new BytesRefWritable();
- size++;
- }
- }
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void readFields(DataInput in) throws IOException {
- int count = in.readInt();
- ensureCapacity(count);
- for (int i = 0; i < count; i++) {
- bytesRefWritables[i].readFields(in);
- }
- valid = count;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeInt(valid);
-
- for (int i = 0; i < valid; i++) {
- BytesRefWritable cu = bytesRefWritables[i];
- cu.write(out);
- }
- }
-
- static {
- WritableFactories.setFactory(BytesRefArrayWritable.class,
- new WritableFactory() {
-
- @Override
- public Writable newInstance() {
- return new BytesRefArrayWritable();
- }
-
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefWritable.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefWritable.java b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefWritable.java
deleted file mode 100644
index c83b505..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefWritable.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.rcfile;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.io.WritableFactories;
-import org.apache.hadoop.io.WritableFactory;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-/**
- * <tt>BytesRefWritable</tt> referenced a section of byte array. It can be used
- * to avoid unnecessary byte copy.
- */
-public class BytesRefWritable implements Writable, Comparable<BytesRefWritable> {
-
- private static final byte[] EMPTY_BYTES = new byte[0];
- public static final BytesRefWritable ZeroBytesRefWritable = new BytesRefWritable();
-
- int start = 0;
- int length = 0;
- byte[] bytes = null;
-
- LazyDecompressionCallback lazyDecompressObj;
-
- /**
- * Create a zero-size bytes.
- */
- public BytesRefWritable() {
- this(EMPTY_BYTES);
- }
-
- /**
- * Create a BytesRefWritable with <tt>length</tt> bytes.
- */
- public BytesRefWritable(int length) {
- assert length > 0;
- this.length = length;
- bytes = new byte[this.length];
- start = 0;
- }
-
- /**
- * Create a BytesRefWritable referenced to the given bytes.
- */
- public BytesRefWritable(byte[] bytes) {
- this.bytes = bytes;
- length = bytes.length;
- start = 0;
- }
-
- /**
- * Create a BytesRefWritable referenced to one section of the given bytes. The
- * section is determined by argument <tt>offset</tt> and <tt>len</tt>.
- */
- public BytesRefWritable(byte[] data, int offset, int len) {
- bytes = data;
- start = offset;
- length = len;
- }
-
- /**
- * Create a BytesRefWritable referenced to one section of the given bytes. The
- * argument <tt>lazyDecompressData</tt> refers to a LazyDecompressionCallback
- * object. The arguments <tt>offset</tt> and <tt>len</tt> are referred to
- * uncompressed bytes of <tt>lazyDecompressData</tt>. Use <tt>offset</tt> and
- * <tt>len</tt> after uncompressing the data.
- */
- public BytesRefWritable(LazyDecompressionCallback lazyDecompressData,
- int offset, int len) {
- lazyDecompressObj = lazyDecompressData;
- start = offset;
- length = len;
- }
-
- private void lazyDecompress() throws IOException {
- if (bytes == null && lazyDecompressObj != null) {
- bytes = lazyDecompressObj.decompress();
- }
- }
-
- /**
- * Returns a copy of the underlying bytes referenced by this instance.
- *
- * @return a new copied byte array
- * @throws IOException
- */
- public byte[] getBytesCopy() throws IOException {
- lazyDecompress();
- byte[] bb = new byte[length];
- System.arraycopy(bytes, start, bb, 0, length);
- return bb;
- }
-
- /**
- * Returns the underlying bytes.
- *
- * @throws IOException
- */
- public byte[] getData() throws IOException {
- lazyDecompress();
- return bytes;
- }
-
- /**
- * readFields() will corrupt the array. So use the set method whenever
- * possible.
- *
- * @see #readFields(DataInput)
- */
- public void set(byte[] newData, int offset, int len) {
- bytes = newData;
- start = offset;
- length = len;
- lazyDecompressObj = null;
- }
-
- /**
- * readFields() will corrupt the array. So use the set method whenever
- * possible.
- *
- * @see #readFields(DataInput)
- */
- public void set(LazyDecompressionCallback newData, int offset, int len) {
- bytes = null;
- start = offset;
- length = len;
- lazyDecompressObj = newData;
- }
-
- public void writeDataTo(DataOutput out) throws IOException {
- lazyDecompress();
- out.write(bytes, start, length);
- }
-
- /**
- * Always reuse the bytes array if length of bytes array is equal or greater
- * to the current record, otherwise create a new one. readFields will corrupt
- * the array. Please use set() whenever possible.
- *
- * @see #set(byte[], int, int)
- */
- public void readFields(DataInput in) throws IOException {
- int len = in.readInt();
- if (len > bytes.length) {
- bytes = new byte[len];
- }
- start = 0;
- length = len;
- in.readFully(bytes, start, length);
- }
-
- /** {@inheritDoc} */
- public void write(DataOutput out) throws IOException {
- lazyDecompress();
- out.writeInt(length);
- out.write(bytes, start, length);
- }
-
- /** {@inheritDoc} */
- @Override
- public int hashCode() {
- return super.hashCode();
- }
-
- /** {@inheritDoc} */
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder(3 * length);
- for (int idx = start; idx < length; idx++) {
- // if not the first, put a blank separator in
- if (idx != 0) {
- sb.append(' ');
- }
- String num = Integer.toHexString(0xff & bytes[idx]);
- // if it is only one digit, add a leading 0.
- if (num.length() < 2) {
- sb.append('0');
- }
- sb.append(num);
- }
- return sb.toString();
- }
-
- /** {@inheritDoc} */
- @Override
- public int compareTo(BytesRefWritable other) {
- if (other == null) {
- throw new IllegalArgumentException("Argument can not be null.");
- }
- if (this == other) {
- return 0;
- }
- try {
- return WritableComparator.compareBytes(getData(), start, getLength(),
- other.getData(), other.start, other.getLength());
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public boolean equals(Object right_obj) {
- if (right_obj == null || !(right_obj instanceof BytesRefWritable)) {
- return false;
- }
- return compareTo((BytesRefWritable) right_obj) == 0;
- }
-
- static {
- WritableFactories.setFactory(BytesRefWritable.class, new WritableFactory() {
-
- @Override
- public Writable newInstance() {
- return new BytesRefWritable();
- }
-
- });
- }
-
- public int getLength() {
- return length;
- }
-
- public int getStart() {
- return start;
- }
-}