You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/12/12 09:22:40 UTC
[26/45] tajo git commit: TAJO-1233: Merge hbase_storage branch to the
master branch. (Hyoungjun Kim via hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java
deleted file mode 100644
index 73ce7c2..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java
+++ /dev/null
@@ -1,504 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.thirdparty.parquet;
-
-import static parquet.Log.DEBUG;
-import static parquet.format.Util.writeFileMetaData;
-
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import parquet.Log;
-import parquet.Version;
-import parquet.bytes.BytesInput;
-import parquet.bytes.BytesUtils;
-import parquet.column.ColumnDescriptor;
-import parquet.column.page.DictionaryPage;
-import parquet.column.statistics.Statistics;
-import parquet.format.converter.ParquetMetadataConverter;
-import parquet.hadoop.Footer;
-import parquet.hadoop.metadata.BlockMetaData;
-import parquet.hadoop.metadata.ColumnChunkMetaData;
-import parquet.hadoop.metadata.ColumnPath;
-import parquet.hadoop.metadata.CompressionCodecName;
-import parquet.hadoop.metadata.FileMetaData;
-import parquet.hadoop.metadata.GlobalMetaData;
-import parquet.hadoop.metadata.ParquetMetadata;
-import parquet.io.ParquetEncodingException;
-import parquet.schema.MessageType;
-import parquet.schema.PrimitiveType.PrimitiveTypeName;
-
-/**
- * Internal implementation of the Parquet file writer as a block container
- *
- * @author Julien Le Dem
- *
- */
-public class ParquetFileWriter {
- private static final Log LOG = Log.getLog(ParquetFileWriter.class);
-
- public static final String PARQUET_METADATA_FILE = "_metadata";
- public static final byte[] MAGIC = "PAR1".getBytes(Charset.forName("ASCII"));
- public static final int CURRENT_VERSION = 1;
-
- private static final ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter();
-
- private final MessageType schema;
- private final FSDataOutputStream out;
- private BlockMetaData currentBlock;
- private ColumnChunkMetaData currentColumn;
- private long currentRecordCount;
- private List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
- private long uncompressedLength;
- private long compressedLength;
- private Set<parquet.column.Encoding> currentEncodings;
-
- private CompressionCodecName currentChunkCodec;
- private ColumnPath currentChunkPath;
- private PrimitiveTypeName currentChunkType;
- private long currentChunkFirstDataPage;
- private long currentChunkDictionaryPageOffset;
- private long currentChunkValueCount;
-
- private Statistics currentStatistics;
-
- /**
- * Captures the order in which methods should be called
- *
- * @author Julien Le Dem
- *
- */
- private enum STATE {
- NOT_STARTED {
- STATE start() {
- return STARTED;
- }
- },
- STARTED {
- STATE startBlock() {
- return BLOCK;
- }
- STATE end() {
- return ENDED;
- }
- },
- BLOCK {
- STATE startColumn() {
- return COLUMN;
- }
- STATE endBlock() {
- return STARTED;
- }
- },
- COLUMN {
- STATE endColumn() {
- return BLOCK;
- };
- STATE write() {
- return this;
- }
- },
- ENDED;
-
- STATE start() throws IOException { return error(); }
- STATE startBlock() throws IOException { return error(); }
- STATE startColumn() throws IOException { return error(); }
- STATE write() throws IOException { return error(); }
- STATE endColumn() throws IOException { return error(); }
- STATE endBlock() throws IOException { return error(); }
- STATE end() throws IOException { return error(); }
-
- private final STATE error() throws IOException {
- throw new IOException("The file being written is in an invalid state. Probably caused by an error thrown previously. Current state: " + this.name());
- }
- }
-
- private STATE state = STATE.NOT_STARTED;
-
- /**
- *
- * @param schema the schema of the data
- * @param out the file to write to
- * @param codec the codec to use to compress blocks
- * @throws IOException if the file can not be created
- */
- public ParquetFileWriter(Configuration configuration, MessageType schema, Path file) throws IOException {
- super();
- this.schema = schema;
- FileSystem fs = file.getFileSystem(configuration);
- this.out = fs.create(file, false);
- }
-
- /**
- * start the file
- * @throws IOException
- */
- public void start() throws IOException {
- state = state.start();
- if (DEBUG) LOG.debug(out.getPos() + ": start");
- out.write(MAGIC);
- }
-
- /**
- * start a block
- * @param recordCount the record count in this block
- * @throws IOException
- */
- public void startBlock(long recordCount) throws IOException {
- state = state.startBlock();
- if (DEBUG) LOG.debug(out.getPos() + ": start block");
-// out.write(MAGIC); // TODO: add a magic delimiter
- currentBlock = new BlockMetaData();
- currentRecordCount = recordCount;
- }
-
- /**
- * start a column inside a block
- * @param descriptor the column descriptor
- * @param valueCount the value count in this column
- * @param statistics the statistics in this column
- * @param compressionCodecName
- * @throws IOException
- */
- public void startColumn(ColumnDescriptor descriptor,
- long valueCount,
- CompressionCodecName compressionCodecName) throws IOException {
- state = state.startColumn();
- if (DEBUG) LOG.debug(out.getPos() + ": start column: " + descriptor + " count=" + valueCount);
- currentEncodings = new HashSet<parquet.column.Encoding>();
- currentChunkPath = ColumnPath.get(descriptor.getPath());
- currentChunkType = descriptor.getType();
- currentChunkCodec = compressionCodecName;
- currentChunkValueCount = valueCount;
- currentChunkFirstDataPage = out.getPos();
- compressedLength = 0;
- uncompressedLength = 0;
- // need to know what type of stats to initialize to
- // better way to do this?
- currentStatistics = Statistics.getStatsBasedOnType(currentChunkType);
- }
-
- /**
- * writes a dictionary page page
- * @param dictionaryPage the dictionary page
- */
- public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException {
- state = state.write();
- if (DEBUG) LOG.debug(out.getPos() + ": write dictionary page: " + dictionaryPage.getDictionarySize() + " values");
- currentChunkDictionaryPageOffset = out.getPos();
- int uncompressedSize = dictionaryPage.getUncompressedSize();
- int compressedPageSize = (int)dictionaryPage.getBytes().size(); // TODO: fix casts
- metadataConverter.writeDictionaryPageHeader(
- uncompressedSize,
- compressedPageSize,
- dictionaryPage.getDictionarySize(),
- dictionaryPage.getEncoding(),
- out);
- long headerSize = out.getPos() - currentChunkDictionaryPageOffset;
- this.uncompressedLength += uncompressedSize + headerSize;
- this.compressedLength += compressedPageSize + headerSize;
- if (DEBUG) LOG.debug(out.getPos() + ": write dictionary page content " + compressedPageSize);
- dictionaryPage.getBytes().writeAllTo(out);
- currentEncodings.add(dictionaryPage.getEncoding());
- }
-
-
- /**
- * writes a single page
- * @param valueCount count of values
- * @param uncompressedPageSize the size of the data once uncompressed
- * @param bytes the compressed data for the page without header
- * @param rlEncoding encoding of the repetition level
- * @param dlEncoding encoding of the definition level
- * @param valuesEncoding encoding of values
- */
- @Deprecated
- public void writeDataPage(
- int valueCount, int uncompressedPageSize,
- BytesInput bytes,
- parquet.column.Encoding rlEncoding,
- parquet.column.Encoding dlEncoding,
- parquet.column.Encoding valuesEncoding) throws IOException {
- state = state.write();
- long beforeHeader = out.getPos();
- if (DEBUG) LOG.debug(beforeHeader + ": write data page: " + valueCount + " values");
- int compressedPageSize = (int)bytes.size();
- metadataConverter.writeDataPageHeader(
- uncompressedPageSize, compressedPageSize,
- valueCount,
- rlEncoding,
- dlEncoding,
- valuesEncoding,
- out);
- long headerSize = out.getPos() - beforeHeader;
- this.uncompressedLength += uncompressedPageSize + headerSize;
- this.compressedLength += compressedPageSize + headerSize;
- if (DEBUG) LOG.debug(out.getPos() + ": write data page content " + compressedPageSize);
- bytes.writeAllTo(out);
- currentEncodings.add(rlEncoding);
- currentEncodings.add(dlEncoding);
- currentEncodings.add(valuesEncoding);
- }
-
- /**
- * writes a single page
- * @param valueCount count of values
- * @param uncompressedPageSize the size of the data once uncompressed
- * @param bytes the compressed data for the page without header
- * @param rlEncoding encoding of the repetition level
- * @param dlEncoding encoding of the definition level
- * @param valuesEncoding encoding of values
- */
- public void writeDataPage(
- int valueCount, int uncompressedPageSize,
- BytesInput bytes,
- Statistics statistics,
- parquet.column.Encoding rlEncoding,
- parquet.column.Encoding dlEncoding,
- parquet.column.Encoding valuesEncoding) throws IOException {
- state = state.write();
- long beforeHeader = out.getPos();
- if (DEBUG) LOG.debug(beforeHeader + ": write data page: " + valueCount + " values");
- int compressedPageSize = (int)bytes.size();
- metadataConverter.writeDataPageHeader(
- uncompressedPageSize, compressedPageSize,
- valueCount,
- statistics,
- rlEncoding,
- dlEncoding,
- valuesEncoding,
- out);
- long headerSize = out.getPos() - beforeHeader;
- this.uncompressedLength += uncompressedPageSize + headerSize;
- this.compressedLength += compressedPageSize + headerSize;
- if (DEBUG) LOG.debug(out.getPos() + ": write data page content " + compressedPageSize);
- bytes.writeAllTo(out);
- currentStatistics.mergeStatistics(statistics);
- currentEncodings.add(rlEncoding);
- currentEncodings.add(dlEncoding);
- currentEncodings.add(valuesEncoding);
- }
-
- /**
- * writes a number of pages at once
- * @param bytes bytes to be written including page headers
- * @param uncompressedTotalPageSize total uncompressed size (without page headers)
- * @param compressedTotalPageSize total compressed size (without page headers)
- * @throws IOException
- */
- void writeDataPages(BytesInput bytes,
- long uncompressedTotalPageSize,
- long compressedTotalPageSize,
- Statistics totalStats,
- List<parquet.column.Encoding> encodings) throws IOException {
- state = state.write();
- if (DEBUG) LOG.debug(out.getPos() + ": write data pages");
- long headersSize = bytes.size() - compressedTotalPageSize;
- this.uncompressedLength += uncompressedTotalPageSize + headersSize;
- this.compressedLength += compressedTotalPageSize + headersSize;
- if (DEBUG) LOG.debug(out.getPos() + ": write data pages content");
- bytes.writeAllTo(out);
- currentEncodings.addAll(encodings);
- currentStatistics = totalStats;
- }
-
- /**
- * end a column (once all rep, def and data have been written)
- * @throws IOException
- */
- public void endColumn() throws IOException {
- state = state.endColumn();
- if (DEBUG) LOG.debug(out.getPos() + ": end column");
- currentBlock.addColumn(ColumnChunkMetaData.get(
- currentChunkPath,
- currentChunkType,
- currentChunkCodec,
- currentEncodings,
- currentStatistics,
- currentChunkFirstDataPage,
- currentChunkDictionaryPageOffset,
- currentChunkValueCount,
- compressedLength,
- uncompressedLength));
- if (DEBUG) LOG.info("ended Column chumk: " + currentColumn);
- currentColumn = null;
- this.currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + uncompressedLength);
- this.uncompressedLength = 0;
- this.compressedLength = 0;
- }
-
- /**
- * ends a block once all column chunks have been written
- * @throws IOException
- */
- public void endBlock() throws IOException {
- state = state.endBlock();
- if (DEBUG) LOG.debug(out.getPos() + ": end block");
- currentBlock.setRowCount(currentRecordCount);
- blocks.add(currentBlock);
- currentBlock = null;
- }
-
- /**
- * ends a file once all blocks have been written.
- * closes the file.
- * @param extraMetaData the extra meta data to write in the footer
- * @throws IOException
- */
- public void end(Map<String, String> extraMetaData) throws IOException {
- state = state.end();
- if (DEBUG) LOG.debug(out.getPos() + ": end");
- ParquetMetadata footer = new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks);
- serializeFooter(footer, out);
- out.close();
- }
-
- private static void serializeFooter(ParquetMetadata footer, FSDataOutputStream out) throws IOException {
- long footerIndex = out.getPos();
- parquet.format.FileMetaData parquetMetadata = new ParquetMetadataConverter().toParquetMetadata(CURRENT_VERSION, footer);
- writeFileMetaData(parquetMetadata, out);
- if (DEBUG) LOG.debug(out.getPos() + ": footer length = " + (out.getPos() - footerIndex));
- BytesUtils.writeIntLittleEndian(out, (int)(out.getPos() - footerIndex));
- out.write(MAGIC);
- }
-
- /**
- * writes a _metadata file
- * @param configuration the configuration to use to get the FileSystem
- * @param outputPath the directory to write the _metadata file to
- * @param footers the list of footers to merge
- * @throws IOException
- */
- public static void writeMetadataFile(Configuration configuration, Path outputPath, List<Footer> footers) throws IOException {
- Path metaDataPath = new Path(outputPath, PARQUET_METADATA_FILE);
- FileSystem fs = outputPath.getFileSystem(configuration);
- outputPath = outputPath.makeQualified(fs);
- FSDataOutputStream metadata = fs.create(metaDataPath);
- metadata.write(MAGIC);
- ParquetMetadata metadataFooter = mergeFooters(outputPath, footers);
- serializeFooter(metadataFooter, metadata);
- metadata.close();
- }
-
- private static ParquetMetadata mergeFooters(Path root, List<Footer> footers) {
- String rootPath = root.toString();
- GlobalMetaData fileMetaData = null;
- List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
- for (Footer footer : footers) {
- String path = footer.getFile().toString();
- if (!path.startsWith(rootPath)) {
- throw new ParquetEncodingException(path + " invalid: all the files must be contained in the root " + root);
- }
- path = path.substring(rootPath.length());
- while (path.startsWith("/")) {
- path = path.substring(1);
- }
- fileMetaData = mergeInto(footer.getParquetMetadata().getFileMetaData(), fileMetaData);
- for (BlockMetaData block : footer.getParquetMetadata().getBlocks()) {
- block.setPath(path);
- blocks.add(block);
- }
- }
- return new ParquetMetadata(fileMetaData.merge(), blocks);
- }
-
- /**
- * @return the current position in the underlying file
- * @throws IOException
- */
- public long getPos() throws IOException {
- return out.getPos();
- }
-
- /**
- * Will merge the metadata of all the footers together
- * @param footers the list files footers to merge
- * @return the global meta data for all the footers
- */
- static GlobalMetaData getGlobalMetaData(List<Footer> footers) {
- GlobalMetaData fileMetaData = null;
- for (Footer footer : footers) {
- ParquetMetadata currentMetadata = footer.getParquetMetadata();
- fileMetaData = mergeInto(currentMetadata.getFileMetaData(), fileMetaData);
- }
- return fileMetaData;
- }
-
- /**
- * Will return the result of merging toMerge into mergedMetadata
- * @param toMerge the metadata toMerge
- * @param mergedMetadata the reference metadata to merge into
- * @return the result of the merge
- */
- static GlobalMetaData mergeInto(
- FileMetaData toMerge,
- GlobalMetaData mergedMetadata) {
- MessageType schema = null;
- Map<String, Set<String>> newKeyValues = new HashMap<String, Set<String>>();
- Set<String> createdBy = new HashSet<String>();
- if (mergedMetadata != null) {
- schema = mergedMetadata.getSchema();
- newKeyValues.putAll(mergedMetadata.getKeyValueMetaData());
- createdBy.addAll(mergedMetadata.getCreatedBy());
- }
- if ((schema == null && toMerge.getSchema() != null)
- || (schema != null && !schema.equals(toMerge.getSchema()))) {
- schema = mergeInto(toMerge.getSchema(), schema);
- }
- for (Entry<String, String> entry : toMerge.getKeyValueMetaData().entrySet()) {
- Set<String> values = newKeyValues.get(entry.getKey());
- if (values == null) {
- values = new HashSet<String>();
- newKeyValues.put(entry.getKey(), values);
- }
- values.add(entry.getValue());
- }
- createdBy.add(toMerge.getCreatedBy());
- return new GlobalMetaData(
- schema,
- newKeyValues,
- createdBy);
- }
-
- /**
- * will return the result of merging toMerge into mergedSchema
- * @param toMerge the schema to merge into mergedSchema
- * @param mergedSchema the schema to append the fields to
- * @return the resulting schema
- */
- static MessageType mergeInto(MessageType toMerge, MessageType mergedSchema) {
- if (mergedSchema == null) {
- return toMerge;
- }
- return mergedSchema.union(toMerge);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java
deleted file mode 100644
index 0fb2c3a..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.thirdparty.parquet;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import parquet.filter.UnboundRecordFilter;
-import parquet.hadoop.*;
-import parquet.hadoop.api.InitContext;
-import parquet.hadoop.api.ReadSupport;
-import parquet.hadoop.api.ReadSupport.ReadContext;
-import parquet.hadoop.metadata.BlockMetaData;
-import parquet.hadoop.metadata.GlobalMetaData;
-import parquet.schema.MessageType;
-
-/**
- * Read records from a Parquet file.
- */
-public class ParquetReader<T> implements Closeable {
-
- private ReadSupport<T> readSupport;
- private UnboundRecordFilter filter;
- private Configuration conf;
- private ReadContext readContext;
- private Iterator<Footer> footersIterator;
- private InternalParquetRecordReader<T> reader;
- private GlobalMetaData globalMetaData;
-
- /**
- * @param file the file to read
- * @param readSupport to materialize records
- * @throws IOException
- */
- public ParquetReader(Path file, ReadSupport<T> readSupport) throws IOException {
- this(file, readSupport, null);
- }
-
- /**
- * @param conf the configuration
- * @param file the file to read
- * @param readSupport to materialize records
- * @throws IOException
- */
- public ParquetReader(Configuration conf, Path file, ReadSupport<T> readSupport) throws IOException {
- this(conf, file, readSupport, null);
- }
-
- /**
- * @param file the file to read
- * @param readSupport to materialize records
- * @param filter the filter to use to filter records
- * @throws IOException
- */
- public ParquetReader(Path file, ReadSupport<T> readSupport, UnboundRecordFilter filter) throws IOException {
- this(new Configuration(), file, readSupport, filter);
- }
-
- /**
- * @param conf the configuration
- * @param file the file to read
- * @param readSupport to materialize records
- * @param filter the filter to use to filter records
- * @throws IOException
- */
- public ParquetReader(Configuration conf, Path file, ReadSupport<T> readSupport, UnboundRecordFilter filter) throws IOException {
- this.readSupport = readSupport;
- this.filter = filter;
- this.conf = conf;
-
- FileSystem fs = file.getFileSystem(conf);
- List<FileStatus> statuses = Arrays.asList(fs.listStatus(file));
- List<Footer> footers = ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(conf, statuses);
- this.footersIterator = footers.iterator();
- globalMetaData = ParquetFileWriter.getGlobalMetaData(footers);
-
- List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
- for (Footer footer : footers) {
- blocks.addAll(footer.getParquetMetadata().getBlocks());
- }
-
- MessageType schema = globalMetaData.getSchema();
- Map<String, Set<String>> extraMetadata = globalMetaData.getKeyValueMetaData();
- readContext = readSupport.init(new InitContext(conf, extraMetadata, schema));
- }
-
- /**
- * @return the next record or null if finished
- * @throws IOException
- */
- public T read() throws IOException {
- try {
- if (reader != null && reader.nextKeyValue()) {
- return reader.getCurrentValue();
- } else {
- initReader();
- return reader == null ? null : read();
- }
- } catch (InterruptedException e) {
- throw new IOException(e);
- }
- }
-
- private void initReader() throws IOException {
- if (reader != null) {
- reader.close();
- reader = null;
- }
- if (footersIterator.hasNext()) {
- Footer footer = footersIterator.next();
- reader = new InternalParquetRecordReader<T>(readSupport, filter);
- reader.initialize(
- readContext.getRequestedSchema(), globalMetaData.getSchema(), footer.getParquetMetadata().getFileMetaData().getKeyValueMetaData(),
- readContext.getReadSupportMetadata(), footer.getFile(), footer.getParquetMetadata().getBlocks(), conf);
- }
- }
-
- @Override
- public void close() throws IOException {
- if (reader != null) {
- reader.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java
deleted file mode 100644
index 0447a47..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.thirdparty.parquet;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import parquet.column.ParquetProperties;
-import parquet.hadoop.api.WriteSupport;
-import parquet.hadoop.metadata.CompressionCodecName;
-import parquet.schema.MessageType;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-public class ParquetWriter<T> implements Closeable {
-
- public static final int DEFAULT_BLOCK_SIZE = 128 * 1024 * 1024;
- public static final int DEFAULT_PAGE_SIZE = 1 * 1024 * 1024;
- public static final CompressionCodecName DEFAULT_COMPRESSION_CODEC_NAME =
- CompressionCodecName.UNCOMPRESSED;
- public static final boolean DEFAULT_IS_DICTIONARY_ENABLED = true;
- public static final boolean DEFAULT_IS_VALIDATING_ENABLED = false;
- public static final ParquetProperties.WriterVersion DEFAULT_WRITER_VERSION =
- ParquetProperties.WriterVersion.PARQUET_1_0;
-
- private final InternalParquetRecordWriter<T> writer;
-
- /**
- * Create a new ParquetWriter.
- * (with dictionary encoding enabled and validation off)
- *
- * @param file the file to create
- * @param writeSupport the implementation to write a record to a RecordConsumer
- * @param compressionCodecName the compression codec to use
- * @param blockSize the block size threshold
- * @param pageSize the page size threshold
- * @throws java.io.IOException
- * @see #ParquetWriter(org.apache.hadoop.fs.Path, parquet.hadoop.api.WriteSupport, CompressionCodecName, int, int, boolean, boolean)
- */
- public ParquetWriter(Path file, WriteSupport<T> writeSupport, CompressionCodecName compressionCodecName, int blockSize, int pageSize) throws IOException {
- this(file, writeSupport, compressionCodecName, blockSize, pageSize,
- DEFAULT_IS_DICTIONARY_ENABLED, DEFAULT_IS_VALIDATING_ENABLED);
- }
-
- /**
- * Create a new ParquetWriter.
- *
- * @param file the file to create
- * @param writeSupport the implementation to write a record to a RecordConsumer
- * @param compressionCodecName the compression codec to use
- * @param blockSize the block size threshold
- * @param pageSize the page size threshold (both data and dictionary)
- * @param enableDictionary to turn dictionary encoding on
- * @param validating to turn on validation using the schema
- * @throws IOException
- * @see #ParquetWriter(Path, WriteSupport, CompressionCodecName, int, int, int, boolean, boolean)
- */
- public ParquetWriter(
- Path file,
- WriteSupport<T> writeSupport,
- CompressionCodecName compressionCodecName,
- int blockSize,
- int pageSize,
- boolean enableDictionary,
- boolean validating) throws IOException {
- this(file, writeSupport, compressionCodecName, blockSize, pageSize, pageSize, enableDictionary, validating);
- }
-
- /**
- * Create a new ParquetWriter.
- *
- * @param file the file to create
- * @param writeSupport the implementation to write a record to a RecordConsumer
- * @param compressionCodecName the compression codec to use
- * @param blockSize the block size threshold
- * @param pageSize the page size threshold
- * @param dictionaryPageSize the page size threshold for the dictionary pages
- * @param enableDictionary to turn dictionary encoding on
- * @param validating to turn on validation using the schema
- * @throws IOException
- * @see #ParquetWriter(Path, WriteSupport, CompressionCodecName, int, int, int, boolean, boolean, parquet.column.ParquetProperties.WriterVersion)
- */
- public ParquetWriter(
- Path file,
- WriteSupport<T> writeSupport,
- CompressionCodecName compressionCodecName,
- int blockSize,
- int pageSize,
- int dictionaryPageSize,
- boolean enableDictionary,
- boolean validating) throws IOException {
- this(file, writeSupport, compressionCodecName, blockSize, pageSize,
- dictionaryPageSize, enableDictionary, validating,
- DEFAULT_WRITER_VERSION);
- }
-
- /**
- * Create a new ParquetWriter.
- *
- * Directly instantiates a Hadoop {@link org.apache.hadoop.conf.Configuration} which reads
- * configuration from the classpath.
- *
- * @param file the file to create
- * @param writeSupport the implementation to write a record to a RecordConsumer
- * @param compressionCodecName the compression codec to use
- * @param blockSize the block size threshold
- * @param pageSize the page size threshold
- * @param dictionaryPageSize the page size threshold for the dictionary pages
- * @param enableDictionary to turn dictionary encoding on
- * @param validating to turn on validation using the schema
- * @param writerVersion version of parquetWriter from {@link ParquetProperties.WriterVersion}
- * @throws IOException
- * @see #ParquetWriter(Path, WriteSupport, CompressionCodecName, int, int, int, boolean, boolean, parquet.column.ParquetProperties.WriterVersion, org.apache.hadoop.conf.Configuration)
- */
- public ParquetWriter(
- Path file,
- WriteSupport<T> writeSupport,
- CompressionCodecName compressionCodecName,
- int blockSize,
- int pageSize,
- int dictionaryPageSize,
- boolean enableDictionary,
- boolean validating,
- ParquetProperties.WriterVersion writerVersion) throws IOException {
- this(file, writeSupport, compressionCodecName, blockSize, pageSize, dictionaryPageSize, enableDictionary, validating, writerVersion, new Configuration());
- }
-
- /**
- * Create a new ParquetWriter.
- *
- * @param file the file to create
- * @param writeSupport the implementation to write a record to a RecordConsumer
- * @param compressionCodecName the compression codec to use
- * @param blockSize the block size threshold
- * @param pageSize the page size threshold
- * @param dictionaryPageSize the page size threshold for the dictionary pages
- * @param enableDictionary to turn dictionary encoding on
- * @param validating to turn on validation using the schema
- * @param writerVersion version of parquetWriter from {@link ParquetProperties.WriterVersion}
- * @param conf Hadoop configuration to use while accessing the filesystem
- * @throws IOException
- */
- public ParquetWriter(
- Path file,
- WriteSupport<T> writeSupport,
- CompressionCodecName compressionCodecName,
- int blockSize,
- int pageSize,
- int dictionaryPageSize,
- boolean enableDictionary,
- boolean validating,
- ParquetProperties.WriterVersion writerVersion,
- Configuration conf) throws IOException {
-
- WriteSupport.WriteContext writeContext = writeSupport.init(conf);
- MessageType schema = writeContext.getSchema();
-
- ParquetFileWriter fileWriter = new ParquetFileWriter(conf, schema, file);
- fileWriter.start();
-
- CodecFactory codecFactory = new CodecFactory(conf);
- CodecFactory.BytesCompressor compressor = codecFactory.getCompressor(compressionCodecName, 0);
- this.writer = new InternalParquetRecordWriter<T>(
- fileWriter,
- writeSupport,
- schema,
- writeContext.getExtraMetaData(),
- blockSize,
- pageSize,
- compressor,
- dictionaryPageSize,
- enableDictionary,
- validating,
- writerVersion);
- }
-
- /**
- * Create a new ParquetWriter. The default block size is 50 MB.The default
- * page size is 1 MB. Default compression is no compression. Dictionary encoding is disabled.
- *
- * @param file the file to create
- * @param writeSupport the implementation to write a record to a RecordConsumer
- * @throws IOException
- */
- public ParquetWriter(Path file, WriteSupport<T> writeSupport) throws IOException {
- this(file, writeSupport, DEFAULT_COMPRESSION_CODEC_NAME, DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE);
- }
-
- public void write(T object) throws IOException {
- try {
- writer.write(object);
- } catch (InterruptedException e) {
- throw new IOException(e);
- }
- }
-
- public long getEstimatedWrittenSize() throws IOException {
- return this.writer.getEstimatedWrittenSize();
- }
-
- @Override
- public void close() throws IOException {
- try {
- writer.close();
- } catch (InterruptedException e) {
- throw new IOException(e);
- }
- }}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
deleted file mode 100644
index c1835df..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SchemaUtil;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.tuple.offheap.HeapTuple;
-import org.apache.tajo.tuple.offheap.OffHeapRowWriter;
-import org.apache.tajo.tuple.offheap.ZeroCopyTuple;
-import org.apache.tajo.unit.StorageUnit;
-import org.apache.tajo.util.Deallocatable;
-import org.apache.tajo.util.FileUtil;
-import org.apache.tajo.util.UnsafeUtil;
-import sun.misc.Unsafe;
-import sun.nio.ch.DirectBuffer;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
-public class BaseTupleBuilder extends OffHeapRowWriter implements TupleBuilder, Deallocatable {
- private static final Log LOG = LogFactory.getLog(BaseTupleBuilder.class);
-
- private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
-
- // buffer
- private ByteBuffer buffer;
- private long address;
-
- public BaseTupleBuilder(Schema schema) {
- super(SchemaUtil.toDataTypes(schema));
- buffer = ByteBuffer.allocateDirect(64 * StorageUnit.KB).order(ByteOrder.nativeOrder());
- address = UnsafeUtil.getAddress(buffer);
- }
-
- @Override
- public long address() {
- return address;
- }
-
- public void ensureSize(int size) {
- if (buffer.remaining() - size < 0) { // check the remain size
- // enlarge new buffer and copy writing data
- int newBlockSize = UnsafeUtil.alignedSize(buffer.capacity() * 2);
- ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newBlockSize);
- long newAddress = ((DirectBuffer)newByteBuf).address();
- UNSAFE.copyMemory(this.address, newAddress, buffer.limit());
- LOG.debug("Increase the buffer size to " + FileUtil.humanReadableByteCount(newBlockSize, false));
-
- // release existing buffer and replace variables
- UnsafeUtil.free(buffer);
- buffer = newByteBuf;
- address = newAddress;
- }
- }
-
- @Override
- public int position() {
- return 0;
- }
-
- @Override
- public void forward(int length) {
- }
-
- @Override
- public void endRow() {
- super.endRow();
- buffer.position(0).limit(offset());
- }
-
- @Override
- public Tuple build() {
- return buildToHeapTuple();
- }
-
- public HeapTuple buildToHeapTuple() {
- byte [] bytes = new byte[buffer.limit()];
- UNSAFE.copyMemory(null, address, bytes, UnsafeUtil.ARRAY_BOOLEAN_BASE_OFFSET, buffer.limit());
- return new HeapTuple(bytes, dataTypes());
- }
-
- public ZeroCopyTuple buildToZeroCopyTuple() {
- ZeroCopyTuple zcTuple = new ZeroCopyTuple();
- zcTuple.set(buffer, 0, buffer.limit(), dataTypes());
- return zcTuple;
- }
-
- public void release() {
- UnsafeUtil.free(buffer);
- buffer = null;
- address = 0;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/RowBlockReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/RowBlockReader.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/RowBlockReader.java
deleted file mode 100644
index be734e1..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/RowBlockReader.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple;
-
-import org.apache.tajo.storage.Tuple;
-
-public interface RowBlockReader<T extends Tuple> {
-
- /**
- * Return for each tuple
- *
- * @return True if tuple block is filled with tuples. Otherwise, It will return false.
- */
- public boolean next(T tuple);
-
- public void reset();
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/TupleBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/TupleBuilder.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/TupleBuilder.java
deleted file mode 100644
index c43c018..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/TupleBuilder.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple;
-
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.tuple.offheap.RowWriter;
-
-public interface TupleBuilder extends RowWriter {
- public Tuple build();
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java
deleted file mode 100644
index 9662d5a..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import org.apache.tajo.util.Deallocatable;
-import org.apache.tajo.util.UnsafeUtil;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
-import static org.apache.tajo.common.TajoDataTypes.DataType;
-
-public class DirectBufTuple extends UnSafeTuple implements Deallocatable {
- private ByteBuffer bb;
-
- public DirectBufTuple(int length, DataType[] types) {
- bb = ByteBuffer.allocateDirect(length).order(ByteOrder.nativeOrder());
- set(bb, 0, length, types);
- }
-
- @Override
- public void release() {
- UnsafeUtil.free(bb);
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java
deleted file mode 100644
index a327123..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-/**
- * Fixed size limit specification
- */
-public class FixedSizeLimitSpec extends ResizableLimitSpec {
- public FixedSizeLimitSpec(long size) {
- super(size, size);
- }
-
- public FixedSizeLimitSpec(long size, float allowedOverflowRatio) {
- super(size, size, allowedOverflowRatio);
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
deleted file mode 100644
index 33f9f1c..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.Message;
-
-import org.apache.tajo.datum.*;
-import org.apache.tajo.exception.UnsupportedException;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
-import org.apache.tajo.util.SizeOf;
-import org.apache.tajo.util.StringUtils;
-import org.apache.tajo.util.UnsafeUtil;
-
-import sun.misc.Unsafe;
-
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-
-import static org.apache.tajo.common.TajoDataTypes.DataType;
-
-public class HeapTuple implements Tuple {
- private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
- private static final long BASE_OFFSET = UnsafeUtil.ARRAY_BYTE_BASE_OFFSET;
-
- private final byte [] data;
- private final DataType [] types;
-
- public HeapTuple(final byte [] bytes, final DataType [] types) {
- this.data = bytes;
- this.types = types;
- }
-
- @Override
- public int size() {
- return data.length;
- }
-
- public ByteBuffer nioBuffer() {
- return ByteBuffer.wrap(data);
- }
-
- private int getFieldOffset(int fieldId) {
- return UNSAFE.getInt(data, BASE_OFFSET + SizeOf.SIZE_OF_INT + (fieldId * SizeOf.SIZE_OF_INT));
- }
-
- private int checkNullAndGetOffset(int fieldId) {
- int offset = getFieldOffset(fieldId);
- if (offset == OffHeapRowBlock.NULL_FIELD_OFFSET) {
- throw new RuntimeException("Invalid Field Access: " + fieldId);
- }
- return offset;
- }
-
- @Override
- public boolean contains(int fieldid) {
- return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET;
- }
-
- @Override
- public boolean isNull(int fieldid) {
- return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET;
- }
-
- @Override
- public boolean isNotNull(int fieldid) {
- return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET;
- }
-
- @Override
- public void clear() {
- // nothing to do
- }
-
- @Override
- public void put(int fieldId, Datum value) {
- throw new UnsupportedException("UnSafeTuple does not support put(int, Datum).");
- }
-
- @Override
- public void put(int fieldId, Datum[] values) {
- throw new UnsupportedException("UnSafeTuple does not support put(int, Datum []).");
- }
-
- @Override
- public void put(int fieldId, Tuple tuple) {
- throw new UnsupportedException("UnSafeTuple does not support put(int, Tuple).");
- }
-
- @Override
- public void put(Datum[] values) {
- throw new UnsupportedException("UnSafeTuple does not support put(Datum []).");
- }
-
- @Override
- public Datum get(int fieldId) {
- if (isNull(fieldId)) {
- return NullDatum.get();
- }
-
- switch (types[fieldId].getType()) {
- case BOOLEAN:
- return DatumFactory.createBool(getBool(fieldId));
- case INT1:
- case INT2:
- return DatumFactory.createInt2(getInt2(fieldId));
- case INT4:
- return DatumFactory.createInt4(getInt4(fieldId));
- case INT8:
- return DatumFactory.createInt8(getInt4(fieldId));
- case FLOAT4:
- return DatumFactory.createFloat4(getFloat4(fieldId));
- case FLOAT8:
- return DatumFactory.createFloat8(getFloat8(fieldId));
- case TEXT:
- return DatumFactory.createText(getText(fieldId));
- case TIMESTAMP:
- return DatumFactory.createTimestamp(getInt8(fieldId));
- case DATE:
- return DatumFactory.createDate(getInt4(fieldId));
- case TIME:
- return DatumFactory.createTime(getInt8(fieldId));
- case INTERVAL:
- return getInterval(fieldId);
- case INET4:
- return DatumFactory.createInet4(getInt4(fieldId));
- case PROTOBUF:
- return getProtobufDatum(fieldId);
- default:
- throw new UnsupportedException("Unknown type: " + types[fieldId]);
- }
- }
-
- @Override
- public void setOffset(long offset) {
- }
-
- @Override
- public long getOffset() {
- return 0;
- }
-
- @Override
- public boolean getBool(int fieldId) {
- return UNSAFE.getByte(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)) == 0x01;
- }
-
- @Override
- public byte getByte(int fieldId) {
- return UNSAFE.getByte(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
- }
-
- @Override
- public char getChar(int fieldId) {
- return UNSAFE.getChar(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
- }
-
- @Override
- public byte[] getBytes(int fieldId) {
- long pos = checkNullAndGetOffset(fieldId);
- int len = UNSAFE.getInt(data, BASE_OFFSET + pos);
- pos += SizeOf.SIZE_OF_INT;
-
- byte [] bytes = new byte[len];
- UNSAFE.copyMemory(data, BASE_OFFSET + pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
- return bytes;
- }
-
- @Override
- public short getInt2(int fieldId) {
- return UNSAFE.getShort(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
- }
-
- @Override
- public int getInt4(int fieldId) {
- return UNSAFE.getInt(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
- }
-
- @Override
- public long getInt8(int fieldId) {
- return UNSAFE.getLong(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
- }
-
- @Override
- public float getFloat4(int fieldId) {
- return UNSAFE.getFloat(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
- }
-
- @Override
- public double getFloat8(int fieldId) {
- return UNSAFE.getDouble(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
- }
-
- @Override
- public String getText(int fieldId) {
- return new String(getBytes(fieldId));
- }
-
- public IntervalDatum getInterval(int fieldId) {
- long pos = checkNullAndGetOffset(fieldId);
- int months = UNSAFE.getInt(data, BASE_OFFSET + pos);
- pos += SizeOf.SIZE_OF_INT;
- long millisecs = UNSAFE.getLong(data, BASE_OFFSET + pos);
- return new IntervalDatum(months, millisecs);
- }
-
- @Override
- public Datum getProtobufDatum(int fieldId) {
- byte [] bytes = getBytes(fieldId);
-
- ProtobufDatumFactory factory = ProtobufDatumFactory.get(types[fieldId].getCode());
- Message.Builder builder = factory.newBuilder();
- try {
- builder.mergeFrom(bytes);
- } catch (InvalidProtocolBufferException e) {
- return NullDatum.get();
- }
-
- return new ProtobufDatum(builder.build());
- }
-
- @Override
- public char[] getUnicodeChars(int fieldId) {
- long pos = checkNullAndGetOffset(fieldId);
- int len = UNSAFE.getInt(data, BASE_OFFSET + pos);
- pos += SizeOf.SIZE_OF_INT;
-
- byte [] bytes = new byte[len];
- UNSAFE.copyMemory(data, BASE_OFFSET + pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
- return StringUtils.convertBytesToChars(bytes, Charset.forName("UTF-8"));
- }
-
- @Override
- public Tuple clone() throws CloneNotSupportedException {
- return this;
- }
-
- @Override
- public Datum[] getValues() {
- Datum [] datums = new Datum[size()];
- for (int i = 0; i < size(); i++) {
- if (contains(i)) {
- datums[i] = get(i);
- } else {
- datums[i] = NullDatum.get();
- }
- }
- return datums;
- }
-
- @Override
- public String toString() {
- return VTuple.toDisplayString(getValues());
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java
deleted file mode 100644
index 2f8e349..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.util.Deallocatable;
-import org.apache.tajo.util.FileUtil;
-import org.apache.tajo.util.UnsafeUtil;
-import sun.misc.Unsafe;
-import sun.nio.ch.DirectBuffer;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
-public class OffHeapMemory implements Deallocatable {
- private static final Log LOG = LogFactory.getLog(OffHeapMemory.class);
-
- protected static final Unsafe UNSAFE = UnsafeUtil.unsafe;
-
- protected ByteBuffer buffer;
- protected int memorySize;
- protected ResizableLimitSpec limitSpec;
- protected long address;
-
- @VisibleForTesting
- protected OffHeapMemory(ByteBuffer buffer, ResizableLimitSpec limitSpec) {
- this.buffer = buffer;
- this.address = ((DirectBuffer) buffer).address();
- this.memorySize = buffer.limit();
- this.limitSpec = limitSpec;
- }
-
- public OffHeapMemory(ResizableLimitSpec limitSpec) {
- this(ByteBuffer.allocateDirect((int) limitSpec.initialSize()).order(ByteOrder.nativeOrder()), limitSpec);
- }
-
- public long address() {
- return address;
- }
-
- public long size() {
- return memorySize;
- }
-
- public void resize(int newSize) {
- Preconditions.checkArgument(newSize > 0, "Size must be greater than 0 bytes");
-
- if (newSize > limitSpec.limit()) {
- throw new RuntimeException("Resize cannot exceed the size limit");
- }
-
- if (newSize < memorySize) {
- LOG.warn("The size reduction is ignored.");
- }
-
- int newBlockSize = UnsafeUtil.alignedSize(newSize);
- ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newBlockSize);
- long newAddress = ((DirectBuffer)newByteBuf).address();
-
- UNSAFE.copyMemory(this.address, newAddress, memorySize);
-
- UnsafeUtil.free(buffer);
- this.memorySize = newSize;
- this.buffer = newByteBuf;
- this.address = newAddress;
- }
-
- public java.nio.Buffer nioBuffer() {
- return (ByteBuffer) buffer.position(0).limit(memorySize);
- }
-
- @Override
- public void release() {
- UnsafeUtil.free(this.buffer);
- this.buffer = null;
- this.address = 0;
- this.memorySize = 0;
- }
-
- public String toString() {
- return "memory=" + FileUtil.humanReadableByteCount(memorySize, false) + "," + limitSpec;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java
deleted file mode 100644
index 689efb7..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SchemaUtil;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.util.Deallocatable;
-import org.apache.tajo.util.FileUtil;
-import org.apache.tajo.util.SizeOf;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-
-import static org.apache.tajo.common.TajoDataTypes.DataType;
-
-public class OffHeapRowBlock extends OffHeapMemory implements Deallocatable {
- private static final Log LOG = LogFactory.getLog(OffHeapRowBlock.class);
-
- public static final int NULL_FIELD_OFFSET = -1;
-
- DataType [] dataTypes;
-
- // Basic States
- private int maxRowNum = Integer.MAX_VALUE; // optional
- private int rowNum;
- protected int position = 0;
-
- private OffHeapRowBlockWriter builder;
-
- private OffHeapRowBlock(ByteBuffer buffer, Schema schema, ResizableLimitSpec limitSpec) {
- super(buffer, limitSpec);
- initialize(schema);
- }
-
- public OffHeapRowBlock(Schema schema, ResizableLimitSpec limitSpec) {
- super(limitSpec);
- initialize(schema);
- }
-
- private void initialize(Schema schema) {
- dataTypes = SchemaUtil.toDataTypes(schema);
-
- this.builder = new OffHeapRowBlockWriter(this);
- }
-
- @VisibleForTesting
- public OffHeapRowBlock(Schema schema, int bytes) {
- this(schema, new ResizableLimitSpec(bytes));
- }
-
- @VisibleForTesting
- public OffHeapRowBlock(Schema schema, ByteBuffer buffer) {
- this(buffer, schema, ResizableLimitSpec.DEFAULT_LIMIT);
- }
-
- public void position(int pos) {
- this.position = pos;
- }
-
- public void clear() {
- this.position = 0;
- this.rowNum = 0;
-
- builder.clear();
- }
-
- @Override
- public ByteBuffer nioBuffer() {
- return (ByteBuffer) buffer.position(0).limit(position);
- }
-
- public int position() {
- return position;
- }
-
- public long usedMem() {
- return position;
- }
-
- /**
- * Ensure that this buffer has enough remaining space to add the size.
- * Creates and copies to a new buffer if necessary
- *
- * @param size Size to add
- */
- public void ensureSize(int size) {
- if (remain() - size < 0) {
- if (!limitSpec.canIncrease(memorySize)) {
- throw new RuntimeException("Cannot increase RowBlock anymore.");
- }
-
- int newBlockSize = limitSpec.increasedSize(memorySize);
- resize(newBlockSize);
- LOG.info("Increase DirectRowBlock to " + FileUtil.humanReadableByteCount(newBlockSize, false));
- }
- }
-
- public long remain() {
- return memorySize - position - builder.offset();
- }
-
- public int maxRowNum() {
- return maxRowNum;
- }
- public int rows() {
- return rowNum;
- }
-
- public void setRows(int rowNum) {
- this.rowNum = rowNum;
- }
-
- public boolean copyFromChannel(FileChannel channel, TableStats stats) throws IOException {
- if (channel.position() < channel.size()) {
- clear();
-
- buffer.clear();
- channel.read(buffer);
- memorySize = buffer.position();
-
- while (position < memorySize) {
- long recordPtr = address + position;
-
- if (remain() < SizeOf.SIZE_OF_INT) {
- channel.position(channel.position() - remain());
- memorySize = (int) (memorySize - remain());
- return true;
- }
-
- int recordSize = UNSAFE.getInt(recordPtr);
-
- if (remain() < recordSize) {
- channel.position(channel.position() - remain());
- memorySize = (int) (memorySize - remain());
- return true;
- }
-
- position += recordSize;
- rowNum++;
- }
-
- return true;
- } else {
- return false;
- }
- }
-
- public RowWriter getWriter() {
- return builder;
- }
-
- public OffHeapRowBlockReader getReader() {
- return new OffHeapRowBlockReader(this);
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java
deleted file mode 100644
index 4a9313f..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import org.apache.tajo.tuple.RowBlockReader;
-import org.apache.tajo.util.UnsafeUtil;
-import sun.misc.Unsafe;
-
-public class OffHeapRowBlockReader implements RowBlockReader<ZeroCopyTuple> {
- private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
- OffHeapRowBlock rowBlock;
-
- // Read States
- private int curRowIdxForRead;
- private int curPosForRead;
-
- public OffHeapRowBlockReader(OffHeapRowBlock rowBlock) {
- this.rowBlock = rowBlock;
- }
-
- public long remainForRead() {
- return rowBlock.memorySize - curPosForRead;
- }
-
- @Override
- public boolean next(ZeroCopyTuple tuple) {
- if (curRowIdxForRead < rowBlock.rows()) {
-
- long recordStartPtr = rowBlock.address() + curPosForRead;
- int recordLen = UNSAFE.getInt(recordStartPtr);
- tuple.set(rowBlock.buffer, curPosForRead, recordLen, rowBlock.dataTypes);
-
- curPosForRead += recordLen;
- curRowIdxForRead++;
-
- return true;
- } else {
- return false;
- }
- }
-
- @Override
- public void reset() {
- curPosForRead = 0;
- curRowIdxForRead = 0;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java
deleted file mode 100644
index dbc3188..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import com.google.common.collect.Lists;
-import org.apache.tajo.storage.Tuple;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-
-public class OffHeapRowBlockUtils {
-
- public static List<Tuple> sort(OffHeapRowBlock rowBlock, Comparator<Tuple> comparator) {
- List<Tuple> tupleList = Lists.newArrayList();
- ZeroCopyTuple zcTuple = new ZeroCopyTuple();
- OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
- while(reader.next(zcTuple)) {
- tupleList.add(zcTuple);
- zcTuple = new ZeroCopyTuple();
- }
- Collections.sort(tupleList, comparator);
- return tupleList;
- }
-
- public static Tuple[] sortToArray(OffHeapRowBlock rowBlock, Comparator<Tuple> comparator) {
- Tuple[] tuples = new Tuple[rowBlock.rows()];
- ZeroCopyTuple zcTuple = new ZeroCopyTuple();
- OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
- for (int i = 0; i < rowBlock.rows() && reader.next(zcTuple); i++) {
- tuples[i] = zcTuple;
- zcTuple = new ZeroCopyTuple();
- }
- Arrays.sort(tuples, comparator);
- return tuples;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java
deleted file mode 100644
index d177e0c..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import org.apache.tajo.common.TajoDataTypes;
-
-public class OffHeapRowBlockWriter extends OffHeapRowWriter {
- OffHeapRowBlock rowBlock;
-
- OffHeapRowBlockWriter(OffHeapRowBlock rowBlock) {
- super(rowBlock.dataTypes);
- this.rowBlock = rowBlock;
- }
-
- public long address() {
- return rowBlock.address();
- }
-
- public int position() {
- return rowBlock.position();
- }
-
- @Override
- public void forward(int length) {
- rowBlock.position(position() + length);
- }
-
- public void ensureSize(int size) {
- rowBlock.ensureSize(size);
- }
-
- @Override
- public void endRow() {
- super.endRow();
- rowBlock.setRows(rowBlock.rows() + 1);
- }
-
- @Override
- public TajoDataTypes.DataType[] dataTypes() {
- return rowBlock.dataTypes;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java
deleted file mode 100644
index 85c7e0b..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.datum.IntervalDatum;
-import org.apache.tajo.datum.ProtobufDatum;
-import org.apache.tajo.datum.TextDatum;
-import org.apache.tajo.util.SizeOf;
-import org.apache.tajo.util.UnsafeUtil;
-
-/**
- *
- * Row Record Structure
- *
- * | row length (4 bytes) | field 1 offset | field 2 offset | ... | field N offset| field 1 | field 2| ... | field N |
- * 4 bytes 4 bytes 4 bytes
- *
- */
-public abstract class OffHeapRowWriter implements RowWriter {
- /** record size + offset list */
- private final int headerSize;
- /** field offsets */
- private final int [] fieldOffsets;
- private final TajoDataTypes.DataType [] dataTypes;
-
- private int curFieldIdx;
- private int curOffset;
-
- public OffHeapRowWriter(final TajoDataTypes.DataType [] dataTypes) {
- this.dataTypes = dataTypes;
- fieldOffsets = new int[dataTypes.length];
- headerSize = SizeOf.SIZE_OF_INT * (dataTypes.length + 1);
- }
-
- public void clear() {
- curOffset = 0;
- curFieldIdx = 0;
- }
-
- public long recordStartAddr() {
- return address() + position();
- }
-
- public abstract long address();
-
- public abstract void ensureSize(int size);
-
- public int offset() {
- return curOffset;
- }
-
- /**
- * Current position
- *
- * @return The position
- */
- public abstract int position();
-
- /**
- * Forward the address;
- *
- * @param length Length to be forwarded
- */
- public abstract void forward(int length);
-
- @Override
- public TajoDataTypes.DataType[] dataTypes() {
- return dataTypes;
- }
-
- public boolean startRow() {
- curOffset = headerSize;
- curFieldIdx = 0;
- return true;
- }
-
- public void endRow() {
- long rowHeaderPos = address() + position();
- OffHeapMemory.UNSAFE.putInt(rowHeaderPos, curOffset);
- rowHeaderPos += SizeOf.SIZE_OF_INT;
-
- for (int i = 0; i < curFieldIdx; i++) {
- OffHeapMemory.UNSAFE.putInt(rowHeaderPos, fieldOffsets[i]);
- rowHeaderPos += SizeOf.SIZE_OF_INT;
- }
- for (int i = curFieldIdx; i < dataTypes.length; i++) {
- OffHeapMemory.UNSAFE.putInt(rowHeaderPos, OffHeapRowBlock.NULL_FIELD_OFFSET);
- rowHeaderPos += SizeOf.SIZE_OF_INT;
- }
-
- // rowOffset is equivalent to a byte length of this row.
- forward(curOffset);
- }
-
- public void skipField() {
- fieldOffsets[curFieldIdx++] = OffHeapRowBlock.NULL_FIELD_OFFSET;
- }
-
- private void forwardField() {
- fieldOffsets[curFieldIdx++] = curOffset;
- }
-
- public void putBool(boolean val) {
- ensureSize(SizeOf.SIZE_OF_BOOL);
- forwardField();
-
- OffHeapMemory.UNSAFE.putByte(recordStartAddr() + curOffset, (byte) (val ? 0x01 : 0x00));
-
- curOffset += SizeOf.SIZE_OF_BOOL;
- }
-
- public void putInt2(short val) {
- ensureSize(SizeOf.SIZE_OF_SHORT);
- forwardField();
-
- OffHeapMemory.UNSAFE.putShort(recordStartAddr() + curOffset, val);
- curOffset += SizeOf.SIZE_OF_SHORT;
- }
-
- public void putInt4(int val) {
- ensureSize(SizeOf.SIZE_OF_INT);
- forwardField();
-
- OffHeapMemory.UNSAFE.putInt(recordStartAddr() + curOffset, val);
- curOffset += SizeOf.SIZE_OF_INT;
- }
-
- public void putInt8(long val) {
- ensureSize(SizeOf.SIZE_OF_LONG);
- forwardField();
-
- OffHeapMemory.UNSAFE.putLong(recordStartAddr() + curOffset, val);
- curOffset += SizeOf.SIZE_OF_LONG;
- }
-
- public void putFloat4(float val) {
- ensureSize(SizeOf.SIZE_OF_FLOAT);
- forwardField();
-
- OffHeapMemory.UNSAFE.putFloat(recordStartAddr() + curOffset, val);
- curOffset += SizeOf.SIZE_OF_FLOAT;
- }
-
- public void putFloat8(double val) {
- ensureSize(SizeOf.SIZE_OF_DOUBLE);
- forwardField();
-
- OffHeapMemory.UNSAFE.putDouble(recordStartAddr() + curOffset, val);
- curOffset += SizeOf.SIZE_OF_DOUBLE;
- }
-
- public void putText(String val) {
- byte[] bytes = val.getBytes(TextDatum.DEFAULT_CHARSET);
- putText(bytes);
- }
-
- public void putText(byte[] val) {
- int bytesLen = val.length;
-
- ensureSize(SizeOf.SIZE_OF_INT + bytesLen);
- forwardField();
-
- OffHeapMemory.UNSAFE.putInt(recordStartAddr() + curOffset, bytesLen);
- curOffset += SizeOf.SIZE_OF_INT;
-
- OffHeapMemory.UNSAFE.copyMemory(val, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, null,
- recordStartAddr() + curOffset, bytesLen);
- curOffset += bytesLen;
- }
-
- public void putBlob(byte[] val) {
- int bytesLen = val.length;
-
- ensureSize(SizeOf.SIZE_OF_INT + bytesLen);
- forwardField();
-
- OffHeapMemory.UNSAFE.putInt(recordStartAddr() + curOffset, bytesLen);
- curOffset += SizeOf.SIZE_OF_INT;
-
- OffHeapMemory.UNSAFE.copyMemory(val, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, null,
- recordStartAddr() + curOffset, bytesLen);
- curOffset += bytesLen;
- }
-
- public void putTimestamp(long val) {
- putInt8(val);
- }
-
- public void putDate(int val) {
- putInt4(val);
- }
-
- public void putTime(long val) {
- putInt8(val);
- }
-
- public void putInterval(IntervalDatum val) {
- ensureSize(SizeOf.SIZE_OF_INT + SizeOf.SIZE_OF_LONG);
- forwardField();
-
- long offset = recordStartAddr() + curOffset;
- OffHeapMemory.UNSAFE.putInt(offset, val.getMonths());
- offset += SizeOf.SIZE_OF_INT;
- OffHeapMemory.UNSAFE.putLong(offset, val.getMilliSeconds());
- curOffset += SizeOf.SIZE_OF_INT + SizeOf.SIZE_OF_LONG;
- }
-
- public void putInet4(int val) {
- putInt4(val);
- }
-
- public void putProtoDatum(ProtobufDatum val) {
- putBlob(val.asByteArray());
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java
deleted file mode 100644
index 14e67b2..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.util.FileUtil;
-
-/**
- * It specifies the maximum size or increasing ratio. In addition,
- * it guarantees that all numbers are less than or equal to Integer.MAX_VALUE 2^31
- * due to ByteBuffer.
- */
-public class ResizableLimitSpec {
- private final Log LOG = LogFactory.getLog(ResizableLimitSpec.class);
-
- public static final int MAX_SIZE_BYTES = Integer.MAX_VALUE;
- public static final ResizableLimitSpec DEFAULT_LIMIT = new ResizableLimitSpec(Integer.MAX_VALUE);
-
- private final long initSize;
- private final long limitBytes;
- private final float incRatio;
- private final float allowedOVerflowRatio;
- private final static float DEFAULT_ALLOWED_OVERFLOW_RATIO = 0.1f;
- private final static float DEFAULT_INCREASE_RATIO = 1.0f;
-
- public ResizableLimitSpec(long initSize) {
- this(initSize, MAX_SIZE_BYTES, DEFAULT_ALLOWED_OVERFLOW_RATIO);
- }
-
- public ResizableLimitSpec(long initSize, long limitBytes) {
- this(initSize, limitBytes, DEFAULT_ALLOWED_OVERFLOW_RATIO);
- }
-
- public ResizableLimitSpec(long initSize, long limitBytes, float allowedOverflow) {
- this(initSize, limitBytes, allowedOverflow, DEFAULT_INCREASE_RATIO);
- }
-
- public ResizableLimitSpec(long initSize, long limitBytes, float allowedOverflowRatio, float incRatio) {
- Preconditions.checkArgument(initSize > 0, "initial size must be greater than 0 bytes.");
- Preconditions.checkArgument(initSize <= MAX_SIZE_BYTES, "The maximum initial size is 2GB.");
- Preconditions.checkArgument(limitBytes > 0, "The limit size must be greater than 0 bytes.");
- Preconditions.checkArgument(limitBytes <= MAX_SIZE_BYTES, "The maximum limit size is 2GB.");
- Preconditions.checkArgument(incRatio > 0.0f, "Increase Ratio must be greater than 0.");
-
- if (initSize == limitBytes) {
- long overflowedSize = (long) (initSize + (initSize * allowedOverflowRatio));
-
- if (overflowedSize > Integer.MAX_VALUE) {
- overflowedSize = Integer.MAX_VALUE;
- }
-
- this.initSize = overflowedSize;
- this.limitBytes = overflowedSize;
- } else {
- this.initSize = initSize;
- limitBytes = (long) (limitBytes + (limitBytes * allowedOverflowRatio));
-
- if (limitBytes > Integer.MAX_VALUE) {
- this.limitBytes = Integer.MAX_VALUE;
- } else {
- this.limitBytes = limitBytes;
- }
- }
-
- this.allowedOVerflowRatio = allowedOverflowRatio;
- this.incRatio = incRatio;
- }
-
- public long initialSize() {
- return initSize;
- }
-
- public long limit() {
- return limitBytes;
- }
-
- public float remainRatio(long currentSize) {
- Preconditions.checkArgument(currentSize > 0, "Size must be greater than 0 bytes.");
- if (currentSize > Integer.MAX_VALUE) {
- currentSize = Integer.MAX_VALUE;
- }
- return (float)currentSize / (float)limitBytes;
- }
-
- public boolean canIncrease(long currentSize) {
- return remain(currentSize) > 0;
- }
-
- public long remain(long currentSize) {
- Preconditions.checkArgument(currentSize > 0, "Size must be greater than 0 bytes.");
- return limitBytes > Integer.MAX_VALUE ? Integer.MAX_VALUE - currentSize : limitBytes - currentSize;
- }
-
- public int increasedSize(int currentSize) {
- if (currentSize < initSize) {
- return (int) initSize;
- }
-
- if (currentSize > Integer.MAX_VALUE) {
- LOG.warn("Current size already exceeds the maximum size (" + Integer.MAX_VALUE + " bytes)");
- return Integer.MAX_VALUE;
- }
- long nextSize = (long) (currentSize + ((float) currentSize * incRatio));
-
- if (nextSize > limitBytes) {
- LOG.info("Increasing reaches size limit (" + FileUtil.humanReadableByteCount(limitBytes, false) + ")");
- nextSize = limitBytes;
- }
-
- if (nextSize > Integer.MAX_VALUE) {
- LOG.info("Increasing reaches maximum size (" + FileUtil.humanReadableByteCount(Integer.MAX_VALUE, false) + ")");
- nextSize = Integer.MAX_VALUE;
- }
-
- return (int) nextSize;
- }
-
- @Override
- public String toString() {
- return "init=" + FileUtil.humanReadableByteCount(initSize, false) + ",limit="
- + FileUtil.humanReadableByteCount(limitBytes, false) + ",overflow_ratio=" + allowedOVerflowRatio
- + ",inc_ratio=" + incRatio;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java
deleted file mode 100644
index a2b2561..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.datum.IntervalDatum;
-import org.apache.tajo.datum.ProtobufDatum;
-
-/**
- * The call sequence should be as follows:
- *
- * <pre>
- * startRow() --> skipField() or putXXX --> endRow()
- * </pre>
- *
- * The total number of skipField and putXXX invocations must be equivalent to the number of fields.
- */
-public interface RowWriter {
-
- public TajoDataTypes.DataType [] dataTypes();
-
- public boolean startRow();
-
- public void endRow();
-
- public void skipField();
-
- public void putBool(boolean val);
-
- public void putInt2(short val);
-
- public void putInt4(int val);
-
- public void putInt8(long val);
-
- public void putFloat4(float val);
-
- public void putFloat8(double val);
-
- public void putText(String val);
-
- public void putText(byte[] val);
-
- public void putBlob(byte[] val);
-
- public void putTimestamp(long val);
-
- public void putTime(long val);
-
- public void putDate(int val);
-
- public void putInterval(IntervalDatum val);
-
- public void putInet4(int val);
-
- public void putProtoDatum(ProtobufDatum datum);
-}