You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hj...@apache.org on 2014/12/03 06:30:34 UTC
[19/30] tajo git commit: TAJO-1122: Refactor the tajo-storage project
structure.
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java
deleted file mode 100644
index 0fb2c3a..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.thirdparty.parquet;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import parquet.filter.UnboundRecordFilter;
-import parquet.hadoop.*;
-import parquet.hadoop.api.InitContext;
-import parquet.hadoop.api.ReadSupport;
-import parquet.hadoop.api.ReadSupport.ReadContext;
-import parquet.hadoop.metadata.BlockMetaData;
-import parquet.hadoop.metadata.GlobalMetaData;
-import parquet.schema.MessageType;
-
-/**
- * Read records from a Parquet file.
- */
-public class ParquetReader<T> implements Closeable {
-
- private ReadSupport<T> readSupport;
- private UnboundRecordFilter filter;
- private Configuration conf;
- private ReadContext readContext;
- private Iterator<Footer> footersIterator;
- private InternalParquetRecordReader<T> reader;
- private GlobalMetaData globalMetaData;
-
- /**
- * @param file the file to read
- * @param readSupport to materialize records
- * @throws IOException
- */
- public ParquetReader(Path file, ReadSupport<T> readSupport) throws IOException {
- this(file, readSupport, null);
- }
-
- /**
- * @param conf the configuration
- * @param file the file to read
- * @param readSupport to materialize records
- * @throws IOException
- */
- public ParquetReader(Configuration conf, Path file, ReadSupport<T> readSupport) throws IOException {
- this(conf, file, readSupport, null);
- }
-
- /**
- * @param file the file to read
- * @param readSupport to materialize records
- * @param filter the filter to use to filter records
- * @throws IOException
- */
- public ParquetReader(Path file, ReadSupport<T> readSupport, UnboundRecordFilter filter) throws IOException {
- this(new Configuration(), file, readSupport, filter);
- }
-
- /**
- * @param conf the configuration
- * @param file the file to read
- * @param readSupport to materialize records
- * @param filter the filter to use to filter records
- * @throws IOException
- */
- public ParquetReader(Configuration conf, Path file, ReadSupport<T> readSupport, UnboundRecordFilter filter) throws IOException {
- this.readSupport = readSupport;
- this.filter = filter;
- this.conf = conf;
-
- FileSystem fs = file.getFileSystem(conf);
- List<FileStatus> statuses = Arrays.asList(fs.listStatus(file));
- List<Footer> footers = ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(conf, statuses);
- this.footersIterator = footers.iterator();
- globalMetaData = ParquetFileWriter.getGlobalMetaData(footers);
-
- List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
- for (Footer footer : footers) {
- blocks.addAll(footer.getParquetMetadata().getBlocks());
- }
-
- MessageType schema = globalMetaData.getSchema();
- Map<String, Set<String>> extraMetadata = globalMetaData.getKeyValueMetaData();
- readContext = readSupport.init(new InitContext(conf, extraMetadata, schema));
- }
-
- /**
- * @return the next record or null if finished
- * @throws IOException
- */
- public T read() throws IOException {
- try {
- if (reader != null && reader.nextKeyValue()) {
- return reader.getCurrentValue();
- } else {
- initReader();
- return reader == null ? null : read();
- }
- } catch (InterruptedException e) {
- throw new IOException(e);
- }
- }
-
- private void initReader() throws IOException {
- if (reader != null) {
- reader.close();
- reader = null;
- }
- if (footersIterator.hasNext()) {
- Footer footer = footersIterator.next();
- reader = new InternalParquetRecordReader<T>(readSupport, filter);
- reader.initialize(
- readContext.getRequestedSchema(), globalMetaData.getSchema(), footer.getParquetMetadata().getFileMetaData().getKeyValueMetaData(),
- readContext.getReadSupportMetadata(), footer.getFile(), footer.getParquetMetadata().getBlocks(), conf);
- }
- }
-
- @Override
- public void close() throws IOException {
- if (reader != null) {
- reader.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java
deleted file mode 100644
index 0447a47..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.thirdparty.parquet;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import parquet.column.ParquetProperties;
-import parquet.hadoop.api.WriteSupport;
-import parquet.hadoop.metadata.CompressionCodecName;
-import parquet.schema.MessageType;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-public class ParquetWriter<T> implements Closeable {
-
- public static final int DEFAULT_BLOCK_SIZE = 128 * 1024 * 1024;
- public static final int DEFAULT_PAGE_SIZE = 1 * 1024 * 1024;
- public static final CompressionCodecName DEFAULT_COMPRESSION_CODEC_NAME =
- CompressionCodecName.UNCOMPRESSED;
- public static final boolean DEFAULT_IS_DICTIONARY_ENABLED = true;
- public static final boolean DEFAULT_IS_VALIDATING_ENABLED = false;
- public static final ParquetProperties.WriterVersion DEFAULT_WRITER_VERSION =
- ParquetProperties.WriterVersion.PARQUET_1_0;
-
- private final InternalParquetRecordWriter<T> writer;
-
- /**
- * Create a new ParquetWriter.
- * (with dictionary encoding enabled and validation off)
- *
- * @param file the file to create
- * @param writeSupport the implementation to write a record to a RecordConsumer
- * @param compressionCodecName the compression codec to use
- * @param blockSize the block size threshold
- * @param pageSize the page size threshold
- * @throws java.io.IOException
- * @see #ParquetWriter(org.apache.hadoop.fs.Path, parquet.hadoop.api.WriteSupport, CompressionCodecName, int, int, boolean, boolean)
- */
- public ParquetWriter(Path file, WriteSupport<T> writeSupport, CompressionCodecName compressionCodecName, int blockSize, int pageSize) throws IOException {
- this(file, writeSupport, compressionCodecName, blockSize, pageSize,
- DEFAULT_IS_DICTIONARY_ENABLED, DEFAULT_IS_VALIDATING_ENABLED);
- }
-
- /**
- * Create a new ParquetWriter.
- *
- * @param file the file to create
- * @param writeSupport the implementation to write a record to a RecordConsumer
- * @param compressionCodecName the compression codec to use
- * @param blockSize the block size threshold
- * @param pageSize the page size threshold (both data and dictionary)
- * @param enableDictionary to turn dictionary encoding on
- * @param validating to turn on validation using the schema
- * @throws IOException
- * @see #ParquetWriter(Path, WriteSupport, CompressionCodecName, int, int, int, boolean, boolean)
- */
- public ParquetWriter(
- Path file,
- WriteSupport<T> writeSupport,
- CompressionCodecName compressionCodecName,
- int blockSize,
- int pageSize,
- boolean enableDictionary,
- boolean validating) throws IOException {
- this(file, writeSupport, compressionCodecName, blockSize, pageSize, pageSize, enableDictionary, validating);
- }
-
- /**
- * Create a new ParquetWriter.
- *
- * @param file the file to create
- * @param writeSupport the implementation to write a record to a RecordConsumer
- * @param compressionCodecName the compression codec to use
- * @param blockSize the block size threshold
- * @param pageSize the page size threshold
- * @param dictionaryPageSize the page size threshold for the dictionary pages
- * @param enableDictionary to turn dictionary encoding on
- * @param validating to turn on validation using the schema
- * @throws IOException
- * @see #ParquetWriter(Path, WriteSupport, CompressionCodecName, int, int, int, boolean, boolean, parquet.column.ParquetProperties.WriterVersion)
- */
- public ParquetWriter(
- Path file,
- WriteSupport<T> writeSupport,
- CompressionCodecName compressionCodecName,
- int blockSize,
- int pageSize,
- int dictionaryPageSize,
- boolean enableDictionary,
- boolean validating) throws IOException {
- this(file, writeSupport, compressionCodecName, blockSize, pageSize,
- dictionaryPageSize, enableDictionary, validating,
- DEFAULT_WRITER_VERSION);
- }
-
- /**
- * Create a new ParquetWriter.
- *
- * Directly instantiates a Hadoop {@link org.apache.hadoop.conf.Configuration} which reads
- * configuration from the classpath.
- *
- * @param file the file to create
- * @param writeSupport the implementation to write a record to a RecordConsumer
- * @param compressionCodecName the compression codec to use
- * @param blockSize the block size threshold
- * @param pageSize the page size threshold
- * @param dictionaryPageSize the page size threshold for the dictionary pages
- * @param enableDictionary to turn dictionary encoding on
- * @param validating to turn on validation using the schema
- * @param writerVersion version of parquetWriter from {@link ParquetProperties.WriterVersion}
- * @throws IOException
- * @see #ParquetWriter(Path, WriteSupport, CompressionCodecName, int, int, int, boolean, boolean, parquet.column.ParquetProperties.WriterVersion, org.apache.hadoop.conf.Configuration)
- */
- public ParquetWriter(
- Path file,
- WriteSupport<T> writeSupport,
- CompressionCodecName compressionCodecName,
- int blockSize,
- int pageSize,
- int dictionaryPageSize,
- boolean enableDictionary,
- boolean validating,
- ParquetProperties.WriterVersion writerVersion) throws IOException {
- this(file, writeSupport, compressionCodecName, blockSize, pageSize, dictionaryPageSize, enableDictionary, validating, writerVersion, new Configuration());
- }
-
- /**
- * Create a new ParquetWriter.
- *
- * @param file the file to create
- * @param writeSupport the implementation to write a record to a RecordConsumer
- * @param compressionCodecName the compression codec to use
- * @param blockSize the block size threshold
- * @param pageSize the page size threshold
- * @param dictionaryPageSize the page size threshold for the dictionary pages
- * @param enableDictionary to turn dictionary encoding on
- * @param validating to turn on validation using the schema
- * @param writerVersion version of parquetWriter from {@link ParquetProperties.WriterVersion}
- * @param conf Hadoop configuration to use while accessing the filesystem
- * @throws IOException
- */
- public ParquetWriter(
- Path file,
- WriteSupport<T> writeSupport,
- CompressionCodecName compressionCodecName,
- int blockSize,
- int pageSize,
- int dictionaryPageSize,
- boolean enableDictionary,
- boolean validating,
- ParquetProperties.WriterVersion writerVersion,
- Configuration conf) throws IOException {
-
- WriteSupport.WriteContext writeContext = writeSupport.init(conf);
- MessageType schema = writeContext.getSchema();
-
- ParquetFileWriter fileWriter = new ParquetFileWriter(conf, schema, file);
- fileWriter.start();
-
- CodecFactory codecFactory = new CodecFactory(conf);
- CodecFactory.BytesCompressor compressor = codecFactory.getCompressor(compressionCodecName, 0);
- this.writer = new InternalParquetRecordWriter<T>(
- fileWriter,
- writeSupport,
- schema,
- writeContext.getExtraMetaData(),
- blockSize,
- pageSize,
- compressor,
- dictionaryPageSize,
- enableDictionary,
- validating,
- writerVersion);
- }
-
- /**
- * Create a new ParquetWriter. The default block size is 50 MB.The default
- * page size is 1 MB. Default compression is no compression. Dictionary encoding is disabled.
- *
- * @param file the file to create
- * @param writeSupport the implementation to write a record to a RecordConsumer
- * @throws IOException
- */
- public ParquetWriter(Path file, WriteSupport<T> writeSupport) throws IOException {
- this(file, writeSupport, DEFAULT_COMPRESSION_CODEC_NAME, DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE);
- }
-
- public void write(T object) throws IOException {
- try {
- writer.write(object);
- } catch (InterruptedException e) {
- throw new IOException(e);
- }
- }
-
- public long getEstimatedWrittenSize() throws IOException {
- return this.writer.getEstimatedWrittenSize();
- }
-
- @Override
- public void close() throws IOException {
- try {
- writer.close();
- } catch (InterruptedException e) {
- throw new IOException(e);
- }
- }}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
deleted file mode 100644
index c1835df..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SchemaUtil;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.tuple.offheap.HeapTuple;
-import org.apache.tajo.tuple.offheap.OffHeapRowWriter;
-import org.apache.tajo.tuple.offheap.ZeroCopyTuple;
-import org.apache.tajo.unit.StorageUnit;
-import org.apache.tajo.util.Deallocatable;
-import org.apache.tajo.util.FileUtil;
-import org.apache.tajo.util.UnsafeUtil;
-import sun.misc.Unsafe;
-import sun.nio.ch.DirectBuffer;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
-public class BaseTupleBuilder extends OffHeapRowWriter implements TupleBuilder, Deallocatable {
- private static final Log LOG = LogFactory.getLog(BaseTupleBuilder.class);
-
- private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
-
- // buffer
- private ByteBuffer buffer;
- private long address;
-
- public BaseTupleBuilder(Schema schema) {
- super(SchemaUtil.toDataTypes(schema));
- buffer = ByteBuffer.allocateDirect(64 * StorageUnit.KB).order(ByteOrder.nativeOrder());
- address = UnsafeUtil.getAddress(buffer);
- }
-
- @Override
- public long address() {
- return address;
- }
-
- public void ensureSize(int size) {
- if (buffer.remaining() - size < 0) { // check the remain size
- // enlarge new buffer and copy writing data
- int newBlockSize = UnsafeUtil.alignedSize(buffer.capacity() * 2);
- ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newBlockSize);
- long newAddress = ((DirectBuffer)newByteBuf).address();
- UNSAFE.copyMemory(this.address, newAddress, buffer.limit());
- LOG.debug("Increase the buffer size to " + FileUtil.humanReadableByteCount(newBlockSize, false));
-
- // release existing buffer and replace variables
- UnsafeUtil.free(buffer);
- buffer = newByteBuf;
- address = newAddress;
- }
- }
-
- @Override
- public int position() {
- return 0;
- }
-
- @Override
- public void forward(int length) {
- }
-
- @Override
- public void endRow() {
- super.endRow();
- buffer.position(0).limit(offset());
- }
-
- @Override
- public Tuple build() {
- return buildToHeapTuple();
- }
-
- public HeapTuple buildToHeapTuple() {
- byte [] bytes = new byte[buffer.limit()];
- UNSAFE.copyMemory(null, address, bytes, UnsafeUtil.ARRAY_BOOLEAN_BASE_OFFSET, buffer.limit());
- return new HeapTuple(bytes, dataTypes());
- }
-
- public ZeroCopyTuple buildToZeroCopyTuple() {
- ZeroCopyTuple zcTuple = new ZeroCopyTuple();
- zcTuple.set(buffer, 0, buffer.limit(), dataTypes());
- return zcTuple;
- }
-
- public void release() {
- UnsafeUtil.free(buffer);
- buffer = null;
- address = 0;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/tuple/RowBlockReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/RowBlockReader.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/RowBlockReader.java
deleted file mode 100644
index be734e1..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/RowBlockReader.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple;
-
-import org.apache.tajo.storage.Tuple;
-
-public interface RowBlockReader<T extends Tuple> {
-
- /**
- * Return for each tuple
- *
- * @return True if tuple block is filled with tuples. Otherwise, It will return false.
- */
- public boolean next(T tuple);
-
- public void reset();
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/tuple/TupleBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/TupleBuilder.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/TupleBuilder.java
deleted file mode 100644
index c43c018..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/TupleBuilder.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple;
-
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.tuple.offheap.RowWriter;
-
-public interface TupleBuilder extends RowWriter {
- public Tuple build();
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java
deleted file mode 100644
index 9662d5a..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import org.apache.tajo.util.Deallocatable;
-import org.apache.tajo.util.UnsafeUtil;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
-import static org.apache.tajo.common.TajoDataTypes.DataType;
-
-public class DirectBufTuple extends UnSafeTuple implements Deallocatable {
- private ByteBuffer bb;
-
- public DirectBufTuple(int length, DataType[] types) {
- bb = ByteBuffer.allocateDirect(length).order(ByteOrder.nativeOrder());
- set(bb, 0, length, types);
- }
-
- @Override
- public void release() {
- UnsafeUtil.free(bb);
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java
deleted file mode 100644
index a327123..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-/**
- * Fixed size limit specification
- */
-public class FixedSizeLimitSpec extends ResizableLimitSpec {
- public FixedSizeLimitSpec(long size) {
- super(size, size);
- }
-
- public FixedSizeLimitSpec(long size, float allowedOverflowRatio) {
- super(size, size, allowedOverflowRatio);
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
deleted file mode 100644
index 33f9f1c..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.Message;
-
-import org.apache.tajo.datum.*;
-import org.apache.tajo.exception.UnsupportedException;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
-import org.apache.tajo.util.SizeOf;
-import org.apache.tajo.util.StringUtils;
-import org.apache.tajo.util.UnsafeUtil;
-
-import sun.misc.Unsafe;
-
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-
-import static org.apache.tajo.common.TajoDataTypes.DataType;
-
-public class HeapTuple implements Tuple {
- private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
- private static final long BASE_OFFSET = UnsafeUtil.ARRAY_BYTE_BASE_OFFSET;
-
- private final byte [] data;
- private final DataType [] types;
-
- public HeapTuple(final byte [] bytes, final DataType [] types) {
- this.data = bytes;
- this.types = types;
- }
-
- @Override
- public int size() {
- return data.length;
- }
-
- public ByteBuffer nioBuffer() {
- return ByteBuffer.wrap(data);
- }
-
- private int getFieldOffset(int fieldId) {
- return UNSAFE.getInt(data, BASE_OFFSET + SizeOf.SIZE_OF_INT + (fieldId * SizeOf.SIZE_OF_INT));
- }
-
- private int checkNullAndGetOffset(int fieldId) {
- int offset = getFieldOffset(fieldId);
- if (offset == OffHeapRowBlock.NULL_FIELD_OFFSET) {
- throw new RuntimeException("Invalid Field Access: " + fieldId);
- }
- return offset;
- }
-
- @Override
- public boolean contains(int fieldid) {
- return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET;
- }
-
- @Override
- public boolean isNull(int fieldid) {
- return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET;
- }
-
- @Override
- public boolean isNotNull(int fieldid) {
- return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET;
- }
-
- @Override
- public void clear() {
- // nothing to do
- }
-
- @Override
- public void put(int fieldId, Datum value) {
- throw new UnsupportedException("UnSafeTuple does not support put(int, Datum).");
- }
-
- @Override
- public void put(int fieldId, Datum[] values) {
- throw new UnsupportedException("UnSafeTuple does not support put(int, Datum []).");
- }
-
- @Override
- public void put(int fieldId, Tuple tuple) {
- throw new UnsupportedException("UnSafeTuple does not support put(int, Tuple).");
- }
-
- @Override
- public void put(Datum[] values) {
- throw new UnsupportedException("UnSafeTuple does not support put(Datum []).");
- }
-
- @Override
- public Datum get(int fieldId) {
- if (isNull(fieldId)) {
- return NullDatum.get();
- }
-
- switch (types[fieldId].getType()) {
- case BOOLEAN:
- return DatumFactory.createBool(getBool(fieldId));
- case INT1:
- case INT2:
- return DatumFactory.createInt2(getInt2(fieldId));
- case INT4:
- return DatumFactory.createInt4(getInt4(fieldId));
- case INT8:
- return DatumFactory.createInt8(getInt4(fieldId));
- case FLOAT4:
- return DatumFactory.createFloat4(getFloat4(fieldId));
- case FLOAT8:
- return DatumFactory.createFloat8(getFloat8(fieldId));
- case TEXT:
- return DatumFactory.createText(getText(fieldId));
- case TIMESTAMP:
- return DatumFactory.createTimestamp(getInt8(fieldId));
- case DATE:
- return DatumFactory.createDate(getInt4(fieldId));
- case TIME:
- return DatumFactory.createTime(getInt8(fieldId));
- case INTERVAL:
- return getInterval(fieldId);
- case INET4:
- return DatumFactory.createInet4(getInt4(fieldId));
- case PROTOBUF:
- return getProtobufDatum(fieldId);
- default:
- throw new UnsupportedException("Unknown type: " + types[fieldId]);
- }
- }
-
- @Override
- public void setOffset(long offset) {
- }
-
- @Override
- public long getOffset() {
- return 0;
- }
-
- @Override
- public boolean getBool(int fieldId) {
- return UNSAFE.getByte(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)) == 0x01;
- }
-
- @Override
- public byte getByte(int fieldId) {
- return UNSAFE.getByte(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
- }
-
- @Override
- public char getChar(int fieldId) {
- return UNSAFE.getChar(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
- }
-
- @Override
- public byte[] getBytes(int fieldId) {
- long pos = checkNullAndGetOffset(fieldId);
- int len = UNSAFE.getInt(data, BASE_OFFSET + pos);
- pos += SizeOf.SIZE_OF_INT;
-
- byte [] bytes = new byte[len];
- UNSAFE.copyMemory(data, BASE_OFFSET + pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
- return bytes;
- }
-
- @Override
- public short getInt2(int fieldId) {
- return UNSAFE.getShort(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
- }
-
- @Override
- public int getInt4(int fieldId) {
- return UNSAFE.getInt(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
- }
-
- @Override
- public long getInt8(int fieldId) {
- return UNSAFE.getLong(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
- }
-
- @Override
- public float getFloat4(int fieldId) {
- return UNSAFE.getFloat(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
- }
-
- @Override
- public double getFloat8(int fieldId) {
- return UNSAFE.getDouble(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
- }
-
- @Override
- public String getText(int fieldId) {
- return new String(getBytes(fieldId));
- }
-
- public IntervalDatum getInterval(int fieldId) {
- long pos = checkNullAndGetOffset(fieldId);
- int months = UNSAFE.getInt(data, BASE_OFFSET + pos);
- pos += SizeOf.SIZE_OF_INT;
- long millisecs = UNSAFE.getLong(data, BASE_OFFSET + pos);
- return new IntervalDatum(months, millisecs);
- }
-
- @Override
- public Datum getProtobufDatum(int fieldId) {
- byte [] bytes = getBytes(fieldId);
-
- ProtobufDatumFactory factory = ProtobufDatumFactory.get(types[fieldId].getCode());
- Message.Builder builder = factory.newBuilder();
- try {
- builder.mergeFrom(bytes);
- } catch (InvalidProtocolBufferException e) {
- return NullDatum.get();
- }
-
- return new ProtobufDatum(builder.build());
- }
-
- @Override
- public char[] getUnicodeChars(int fieldId) {
- long pos = checkNullAndGetOffset(fieldId);
- int len = UNSAFE.getInt(data, BASE_OFFSET + pos);
- pos += SizeOf.SIZE_OF_INT;
-
- byte [] bytes = new byte[len];
- UNSAFE.copyMemory(data, BASE_OFFSET + pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
- return StringUtils.convertBytesToChars(bytes, Charset.forName("UTF-8"));
- }
-
- @Override
- public Tuple clone() throws CloneNotSupportedException {
- return this;
- }
-
- @Override
- public Datum[] getValues() {
- Datum [] datums = new Datum[size()];
- for (int i = 0; i < size(); i++) {
- if (contains(i)) {
- datums[i] = get(i);
- } else {
- datums[i] = NullDatum.get();
- }
- }
- return datums;
- }
-
- @Override
- public String toString() {
- return VTuple.toDisplayString(getValues());
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java
deleted file mode 100644
index 2f8e349..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.util.Deallocatable;
-import org.apache.tajo.util.FileUtil;
-import org.apache.tajo.util.UnsafeUtil;
-import sun.misc.Unsafe;
-import sun.nio.ch.DirectBuffer;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
-public class OffHeapMemory implements Deallocatable {
- private static final Log LOG = LogFactory.getLog(OffHeapMemory.class);
-
- protected static final Unsafe UNSAFE = UnsafeUtil.unsafe;
-
- protected ByteBuffer buffer;
- protected int memorySize;
- protected ResizableLimitSpec limitSpec;
- protected long address;
-
- @VisibleForTesting
- protected OffHeapMemory(ByteBuffer buffer, ResizableLimitSpec limitSpec) {
- this.buffer = buffer;
- this.address = ((DirectBuffer) buffer).address();
- this.memorySize = buffer.limit();
- this.limitSpec = limitSpec;
- }
-
- public OffHeapMemory(ResizableLimitSpec limitSpec) {
- this(ByteBuffer.allocateDirect((int) limitSpec.initialSize()).order(ByteOrder.nativeOrder()), limitSpec);
- }
-
- public long address() {
- return address;
- }
-
- public long size() {
- return memorySize;
- }
-
- public void resize(int newSize) {
- Preconditions.checkArgument(newSize > 0, "Size must be greater than 0 bytes");
-
- if (newSize > limitSpec.limit()) {
- throw new RuntimeException("Resize cannot exceed the size limit");
- }
-
- if (newSize < memorySize) {
- LOG.warn("The size reduction is ignored.");
- }
-
- int newBlockSize = UnsafeUtil.alignedSize(newSize);
- ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newBlockSize);
- long newAddress = ((DirectBuffer)newByteBuf).address();
-
- UNSAFE.copyMemory(this.address, newAddress, memorySize);
-
- UnsafeUtil.free(buffer);
- this.memorySize = newSize;
- this.buffer = newByteBuf;
- this.address = newAddress;
- }
-
- public java.nio.Buffer nioBuffer() {
- return (ByteBuffer) buffer.position(0).limit(memorySize);
- }
-
- @Override
- public void release() {
- UnsafeUtil.free(this.buffer);
- this.buffer = null;
- this.address = 0;
- this.memorySize = 0;
- }
-
- public String toString() {
- return "memory=" + FileUtil.humanReadableByteCount(memorySize, false) + "," + limitSpec;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java
deleted file mode 100644
index 689efb7..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SchemaUtil;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.util.Deallocatable;
-import org.apache.tajo.util.FileUtil;
-import org.apache.tajo.util.SizeOf;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-
-import static org.apache.tajo.common.TajoDataTypes.DataType;
-
-public class OffHeapRowBlock extends OffHeapMemory implements Deallocatable {
- private static final Log LOG = LogFactory.getLog(OffHeapRowBlock.class);
-
- public static final int NULL_FIELD_OFFSET = -1;
-
- DataType [] dataTypes;
-
- // Basic States
- private int maxRowNum = Integer.MAX_VALUE; // optional
- private int rowNum;
- protected int position = 0;
-
- private OffHeapRowBlockWriter builder;
-
- private OffHeapRowBlock(ByteBuffer buffer, Schema schema, ResizableLimitSpec limitSpec) {
- super(buffer, limitSpec);
- initialize(schema);
- }
-
- public OffHeapRowBlock(Schema schema, ResizableLimitSpec limitSpec) {
- super(limitSpec);
- initialize(schema);
- }
-
- private void initialize(Schema schema) {
- dataTypes = SchemaUtil.toDataTypes(schema);
-
- this.builder = new OffHeapRowBlockWriter(this);
- }
-
- @VisibleForTesting
- public OffHeapRowBlock(Schema schema, int bytes) {
- this(schema, new ResizableLimitSpec(bytes));
- }
-
- @VisibleForTesting
- public OffHeapRowBlock(Schema schema, ByteBuffer buffer) {
- this(buffer, schema, ResizableLimitSpec.DEFAULT_LIMIT);
- }
-
- public void position(int pos) {
- this.position = pos;
- }
-
- public void clear() {
- this.position = 0;
- this.rowNum = 0;
-
- builder.clear();
- }
-
- @Override
- public ByteBuffer nioBuffer() {
- return (ByteBuffer) buffer.position(0).limit(position);
- }
-
- public int position() {
- return position;
- }
-
- public long usedMem() {
- return position;
- }
-
- /**
- * Ensure that this buffer has enough remaining space to add the size.
- * Creates and copies to a new buffer if necessary
- *
- * @param size Size to add
- */
- public void ensureSize(int size) {
- if (remain() - size < 0) {
- if (!limitSpec.canIncrease(memorySize)) {
- throw new RuntimeException("Cannot increase RowBlock anymore.");
- }
-
- int newBlockSize = limitSpec.increasedSize(memorySize);
- resize(newBlockSize);
- LOG.info("Increase DirectRowBlock to " + FileUtil.humanReadableByteCount(newBlockSize, false));
- }
- }
-
- public long remain() {
- return memorySize - position - builder.offset();
- }
-
- public int maxRowNum() {
- return maxRowNum;
- }
- public int rows() {
- return rowNum;
- }
-
- public void setRows(int rowNum) {
- this.rowNum = rowNum;
- }
-
- public boolean copyFromChannel(FileChannel channel, TableStats stats) throws IOException {
- if (channel.position() < channel.size()) {
- clear();
-
- buffer.clear();
- channel.read(buffer);
- memorySize = buffer.position();
-
- while (position < memorySize) {
- long recordPtr = address + position;
-
- if (remain() < SizeOf.SIZE_OF_INT) {
- channel.position(channel.position() - remain());
- memorySize = (int) (memorySize - remain());
- return true;
- }
-
- int recordSize = UNSAFE.getInt(recordPtr);
-
- if (remain() < recordSize) {
- channel.position(channel.position() - remain());
- memorySize = (int) (memorySize - remain());
- return true;
- }
-
- position += recordSize;
- rowNum++;
- }
-
- return true;
- } else {
- return false;
- }
- }
-
- public RowWriter getWriter() {
- return builder;
- }
-
- public OffHeapRowBlockReader getReader() {
- return new OffHeapRowBlockReader(this);
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java
deleted file mode 100644
index 4a9313f..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import org.apache.tajo.tuple.RowBlockReader;
-import org.apache.tajo.util.UnsafeUtil;
-import sun.misc.Unsafe;
-
-public class OffHeapRowBlockReader implements RowBlockReader<ZeroCopyTuple> {
- private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
- OffHeapRowBlock rowBlock;
-
- // Read States
- private int curRowIdxForRead;
- private int curPosForRead;
-
- public OffHeapRowBlockReader(OffHeapRowBlock rowBlock) {
- this.rowBlock = rowBlock;
- }
-
- public long remainForRead() {
- return rowBlock.memorySize - curPosForRead;
- }
-
- @Override
- public boolean next(ZeroCopyTuple tuple) {
- if (curRowIdxForRead < rowBlock.rows()) {
-
- long recordStartPtr = rowBlock.address() + curPosForRead;
- int recordLen = UNSAFE.getInt(recordStartPtr);
- tuple.set(rowBlock.buffer, curPosForRead, recordLen, rowBlock.dataTypes);
-
- curPosForRead += recordLen;
- curRowIdxForRead++;
-
- return true;
- } else {
- return false;
- }
- }
-
- @Override
- public void reset() {
- curPosForRead = 0;
- curRowIdxForRead = 0;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java
deleted file mode 100644
index dbc3188..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import com.google.common.collect.Lists;
-import org.apache.tajo.storage.Tuple;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-
-public class OffHeapRowBlockUtils {
-
- public static List<Tuple> sort(OffHeapRowBlock rowBlock, Comparator<Tuple> comparator) {
- List<Tuple> tupleList = Lists.newArrayList();
- ZeroCopyTuple zcTuple = new ZeroCopyTuple();
- OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
- while(reader.next(zcTuple)) {
- tupleList.add(zcTuple);
- zcTuple = new ZeroCopyTuple();
- }
- Collections.sort(tupleList, comparator);
- return tupleList;
- }
-
- public static Tuple[] sortToArray(OffHeapRowBlock rowBlock, Comparator<Tuple> comparator) {
- Tuple[] tuples = new Tuple[rowBlock.rows()];
- ZeroCopyTuple zcTuple = new ZeroCopyTuple();
- OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
- for (int i = 0; i < rowBlock.rows() && reader.next(zcTuple); i++) {
- tuples[i] = zcTuple;
- zcTuple = new ZeroCopyTuple();
- }
- Arrays.sort(tuples, comparator);
- return tuples;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java
deleted file mode 100644
index d177e0c..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import org.apache.tajo.common.TajoDataTypes;
-
-public class OffHeapRowBlockWriter extends OffHeapRowWriter {
- OffHeapRowBlock rowBlock;
-
- OffHeapRowBlockWriter(OffHeapRowBlock rowBlock) {
- super(rowBlock.dataTypes);
- this.rowBlock = rowBlock;
- }
-
- public long address() {
- return rowBlock.address();
- }
-
- public int position() {
- return rowBlock.position();
- }
-
- @Override
- public void forward(int length) {
- rowBlock.position(position() + length);
- }
-
- public void ensureSize(int size) {
- rowBlock.ensureSize(size);
- }
-
- @Override
- public void endRow() {
- super.endRow();
- rowBlock.setRows(rowBlock.rows() + 1);
- }
-
- @Override
- public TajoDataTypes.DataType[] dataTypes() {
- return rowBlock.dataTypes;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java
deleted file mode 100644
index 85c7e0b..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.datum.IntervalDatum;
-import org.apache.tajo.datum.ProtobufDatum;
-import org.apache.tajo.datum.TextDatum;
-import org.apache.tajo.util.SizeOf;
-import org.apache.tajo.util.UnsafeUtil;
-
-/**
- *
- * Row Record Structure
- *
- * | row length (4 bytes) | field 1 offset | field 2 offset | ... | field N offset| field 1 | field 2| ... | field N |
- * 4 bytes 4 bytes 4 bytes
- *
- */
-public abstract class OffHeapRowWriter implements RowWriter {
- /** record size + offset list */
- private final int headerSize;
- /** field offsets */
- private final int [] fieldOffsets;
- private final TajoDataTypes.DataType [] dataTypes;
-
- private int curFieldIdx;
- private int curOffset;
-
- public OffHeapRowWriter(final TajoDataTypes.DataType [] dataTypes) {
- this.dataTypes = dataTypes;
- fieldOffsets = new int[dataTypes.length];
- headerSize = SizeOf.SIZE_OF_INT * (dataTypes.length + 1);
- }
-
- public void clear() {
- curOffset = 0;
- curFieldIdx = 0;
- }
-
- public long recordStartAddr() {
- return address() + position();
- }
-
- public abstract long address();
-
- public abstract void ensureSize(int size);
-
- public int offset() {
- return curOffset;
- }
-
- /**
- * Current position
- *
- * @return The position
- */
- public abstract int position();
-
- /**
- * Forward the address;
- *
- * @param length Length to be forwarded
- */
- public abstract void forward(int length);
-
- @Override
- public TajoDataTypes.DataType[] dataTypes() {
- return dataTypes;
- }
-
- public boolean startRow() {
- curOffset = headerSize;
- curFieldIdx = 0;
- return true;
- }
-
- public void endRow() {
- long rowHeaderPos = address() + position();
- OffHeapMemory.UNSAFE.putInt(rowHeaderPos, curOffset);
- rowHeaderPos += SizeOf.SIZE_OF_INT;
-
- for (int i = 0; i < curFieldIdx; i++) {
- OffHeapMemory.UNSAFE.putInt(rowHeaderPos, fieldOffsets[i]);
- rowHeaderPos += SizeOf.SIZE_OF_INT;
- }
- for (int i = curFieldIdx; i < dataTypes.length; i++) {
- OffHeapMemory.UNSAFE.putInt(rowHeaderPos, OffHeapRowBlock.NULL_FIELD_OFFSET);
- rowHeaderPos += SizeOf.SIZE_OF_INT;
- }
-
- // rowOffset is equivalent to a byte length of this row.
- forward(curOffset);
- }
-
- public void skipField() {
- fieldOffsets[curFieldIdx++] = OffHeapRowBlock.NULL_FIELD_OFFSET;
- }
-
- private void forwardField() {
- fieldOffsets[curFieldIdx++] = curOffset;
- }
-
- public void putBool(boolean val) {
- ensureSize(SizeOf.SIZE_OF_BOOL);
- forwardField();
-
- OffHeapMemory.UNSAFE.putByte(recordStartAddr() + curOffset, (byte) (val ? 0x01 : 0x00));
-
- curOffset += SizeOf.SIZE_OF_BOOL;
- }
-
- public void putInt2(short val) {
- ensureSize(SizeOf.SIZE_OF_SHORT);
- forwardField();
-
- OffHeapMemory.UNSAFE.putShort(recordStartAddr() + curOffset, val);
- curOffset += SizeOf.SIZE_OF_SHORT;
- }
-
- public void putInt4(int val) {
- ensureSize(SizeOf.SIZE_OF_INT);
- forwardField();
-
- OffHeapMemory.UNSAFE.putInt(recordStartAddr() + curOffset, val);
- curOffset += SizeOf.SIZE_OF_INT;
- }
-
- public void putInt8(long val) {
- ensureSize(SizeOf.SIZE_OF_LONG);
- forwardField();
-
- OffHeapMemory.UNSAFE.putLong(recordStartAddr() + curOffset, val);
- curOffset += SizeOf.SIZE_OF_LONG;
- }
-
- public void putFloat4(float val) {
- ensureSize(SizeOf.SIZE_OF_FLOAT);
- forwardField();
-
- OffHeapMemory.UNSAFE.putFloat(recordStartAddr() + curOffset, val);
- curOffset += SizeOf.SIZE_OF_FLOAT;
- }
-
- public void putFloat8(double val) {
- ensureSize(SizeOf.SIZE_OF_DOUBLE);
- forwardField();
-
- OffHeapMemory.UNSAFE.putDouble(recordStartAddr() + curOffset, val);
- curOffset += SizeOf.SIZE_OF_DOUBLE;
- }
-
- public void putText(String val) {
- byte[] bytes = val.getBytes(TextDatum.DEFAULT_CHARSET);
- putText(bytes);
- }
-
- public void putText(byte[] val) {
- int bytesLen = val.length;
-
- ensureSize(SizeOf.SIZE_OF_INT + bytesLen);
- forwardField();
-
- OffHeapMemory.UNSAFE.putInt(recordStartAddr() + curOffset, bytesLen);
- curOffset += SizeOf.SIZE_OF_INT;
-
- OffHeapMemory.UNSAFE.copyMemory(val, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, null,
- recordStartAddr() + curOffset, bytesLen);
- curOffset += bytesLen;
- }
-
- public void putBlob(byte[] val) {
- int bytesLen = val.length;
-
- ensureSize(SizeOf.SIZE_OF_INT + bytesLen);
- forwardField();
-
- OffHeapMemory.UNSAFE.putInt(recordStartAddr() + curOffset, bytesLen);
- curOffset += SizeOf.SIZE_OF_INT;
-
- OffHeapMemory.UNSAFE.copyMemory(val, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, null,
- recordStartAddr() + curOffset, bytesLen);
- curOffset += bytesLen;
- }
-
- public void putTimestamp(long val) {
- putInt8(val);
- }
-
- public void putDate(int val) {
- putInt4(val);
- }
-
- public void putTime(long val) {
- putInt8(val);
- }
-
- public void putInterval(IntervalDatum val) {
- ensureSize(SizeOf.SIZE_OF_INT + SizeOf.SIZE_OF_LONG);
- forwardField();
-
- long offset = recordStartAddr() + curOffset;
- OffHeapMemory.UNSAFE.putInt(offset, val.getMonths());
- offset += SizeOf.SIZE_OF_INT;
- OffHeapMemory.UNSAFE.putLong(offset, val.getMilliSeconds());
- curOffset += SizeOf.SIZE_OF_INT + SizeOf.SIZE_OF_LONG;
- }
-
- public void putInet4(int val) {
- putInt4(val);
- }
-
- public void putProtoDatum(ProtobufDatum val) {
- putBlob(val.asByteArray());
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java
deleted file mode 100644
index 14e67b2..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.util.FileUtil;
-
-/**
- * It specifies the maximum size or increasing ratio. In addition,
- * it guarantees that all numbers are less than or equal to Integer.MAX_VALUE 2^31
- * due to ByteBuffer.
- */
-public class ResizableLimitSpec {
- private final Log LOG = LogFactory.getLog(ResizableLimitSpec.class);
-
- public static final int MAX_SIZE_BYTES = Integer.MAX_VALUE;
- public static final ResizableLimitSpec DEFAULT_LIMIT = new ResizableLimitSpec(Integer.MAX_VALUE);
-
- private final long initSize;
- private final long limitBytes;
- private final float incRatio;
- private final float allowedOVerflowRatio;
- private final static float DEFAULT_ALLOWED_OVERFLOW_RATIO = 0.1f;
- private final static float DEFAULT_INCREASE_RATIO = 1.0f;
-
- public ResizableLimitSpec(long initSize) {
- this(initSize, MAX_SIZE_BYTES, DEFAULT_ALLOWED_OVERFLOW_RATIO);
- }
-
- public ResizableLimitSpec(long initSize, long limitBytes) {
- this(initSize, limitBytes, DEFAULT_ALLOWED_OVERFLOW_RATIO);
- }
-
- public ResizableLimitSpec(long initSize, long limitBytes, float allowedOverflow) {
- this(initSize, limitBytes, allowedOverflow, DEFAULT_INCREASE_RATIO);
- }
-
- public ResizableLimitSpec(long initSize, long limitBytes, float allowedOverflowRatio, float incRatio) {
- Preconditions.checkArgument(initSize > 0, "initial size must be greater than 0 bytes.");
- Preconditions.checkArgument(initSize <= MAX_SIZE_BYTES, "The maximum initial size is 2GB.");
- Preconditions.checkArgument(limitBytes > 0, "The limit size must be greater than 0 bytes.");
- Preconditions.checkArgument(limitBytes <= MAX_SIZE_BYTES, "The maximum limit size is 2GB.");
- Preconditions.checkArgument(incRatio > 0.0f, "Increase Ratio must be greater than 0.");
-
- if (initSize == limitBytes) {
- long overflowedSize = (long) (initSize + (initSize * allowedOverflowRatio));
-
- if (overflowedSize > Integer.MAX_VALUE) {
- overflowedSize = Integer.MAX_VALUE;
- }
-
- this.initSize = overflowedSize;
- this.limitBytes = overflowedSize;
- } else {
- this.initSize = initSize;
- limitBytes = (long) (limitBytes + (limitBytes * allowedOverflowRatio));
-
- if (limitBytes > Integer.MAX_VALUE) {
- this.limitBytes = Integer.MAX_VALUE;
- } else {
- this.limitBytes = limitBytes;
- }
- }
-
- this.allowedOVerflowRatio = allowedOverflowRatio;
- this.incRatio = incRatio;
- }
-
- public long initialSize() {
- return initSize;
- }
-
- public long limit() {
- return limitBytes;
- }
-
- public float remainRatio(long currentSize) {
- Preconditions.checkArgument(currentSize > 0, "Size must be greater than 0 bytes.");
- if (currentSize > Integer.MAX_VALUE) {
- currentSize = Integer.MAX_VALUE;
- }
- return (float)currentSize / (float)limitBytes;
- }
-
- public boolean canIncrease(long currentSize) {
- return remain(currentSize) > 0;
- }
-
- public long remain(long currentSize) {
- Preconditions.checkArgument(currentSize > 0, "Size must be greater than 0 bytes.");
- return limitBytes > Integer.MAX_VALUE ? Integer.MAX_VALUE - currentSize : limitBytes - currentSize;
- }
-
- public int increasedSize(int currentSize) {
- if (currentSize < initSize) {
- return (int) initSize;
- }
-
- if (currentSize > Integer.MAX_VALUE) {
- LOG.warn("Current size already exceeds the maximum size (" + Integer.MAX_VALUE + " bytes)");
- return Integer.MAX_VALUE;
- }
- long nextSize = (long) (currentSize + ((float) currentSize * incRatio));
-
- if (nextSize > limitBytes) {
- LOG.info("Increasing reaches size limit (" + FileUtil.humanReadableByteCount(limitBytes, false) + ")");
- nextSize = limitBytes;
- }
-
- if (nextSize > Integer.MAX_VALUE) {
- LOG.info("Increasing reaches maximum size (" + FileUtil.humanReadableByteCount(Integer.MAX_VALUE, false) + ")");
- nextSize = Integer.MAX_VALUE;
- }
-
- return (int) nextSize;
- }
-
- @Override
- public String toString() {
- return "init=" + FileUtil.humanReadableByteCount(initSize, false) + ",limit="
- + FileUtil.humanReadableByteCount(limitBytes, false) + ",overflow_ratio=" + allowedOVerflowRatio
- + ",inc_ratio=" + incRatio;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java
deleted file mode 100644
index a2b2561..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.datum.IntervalDatum;
-import org.apache.tajo.datum.ProtobufDatum;
-
-/**
- * The call sequence should be as follows:
- *
- * <pre>
- * startRow() --> skipField() or putXXX --> endRow()
- * </pre>
- *
- * The total number of skipField and putXXX invocations must be equivalent to the number of fields.
- */
-public interface RowWriter {
-
- public TajoDataTypes.DataType [] dataTypes();
-
- public boolean startRow();
-
- public void endRow();
-
- public void skipField();
-
- public void putBool(boolean val);
-
- public void putInt2(short val);
-
- public void putInt4(int val);
-
- public void putInt8(long val);
-
- public void putFloat4(float val);
-
- public void putFloat8(double val);
-
- public void putText(String val);
-
- public void putText(byte[] val);
-
- public void putBlob(byte[] val);
-
- public void putTimestamp(long val);
-
- public void putTime(long val);
-
- public void putDate(int val);
-
- public void putInterval(IntervalDatum val);
-
- public void putInet4(int val);
-
- public void putProtoDatum(ProtobufDatum datum);
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
deleted file mode 100644
index b742e6d..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
+++ /dev/null
@@ -1,311 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import com.google.common.base.Preconditions;
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.Message;
-
-import org.apache.tajo.datum.*;
-import org.apache.tajo.exception.UnsupportedException;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
-import org.apache.tajo.util.SizeOf;
-import org.apache.tajo.util.StringUtils;
-import org.apache.tajo.util.UnsafeUtil;
-
-import sun.misc.Unsafe;
-import sun.nio.ch.DirectBuffer;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.nio.charset.Charset;
-
-import static org.apache.tajo.common.TajoDataTypes.DataType;
-
-public abstract class UnSafeTuple implements Tuple {
- private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
-
- private DirectBuffer bb;
- private int relativePos;
- private int length;
- private DataType [] types;
-
- protected void set(ByteBuffer bb, int relativePos, int length, DataType [] types) {
- this.bb = (DirectBuffer) bb;
- this.relativePos = relativePos;
- this.length = length;
- this.types = types;
- }
-
- void set(ByteBuffer bb, DataType [] types) {
- set(bb, 0, bb.limit(), types);
- }
-
- @Override
- public int size() {
- return types.length;
- }
-
- public ByteBuffer nioBuffer() {
- return ((ByteBuffer)((ByteBuffer)bb).duplicate().position(relativePos).limit(relativePos + length)).slice();
- }
-
- public HeapTuple toHeapTuple() {
- byte [] bytes = new byte[length];
- UNSAFE.copyMemory(null, bb.address() + relativePos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, length);
- return new HeapTuple(bytes, types);
- }
-
- public void copyFrom(UnSafeTuple tuple) {
- Preconditions.checkNotNull(tuple);
-
- ((ByteBuffer) bb).clear();
- if (length < tuple.length) {
- UnsafeUtil.free((ByteBuffer) bb);
- bb = (DirectBuffer) ByteBuffer.allocateDirect(tuple.length).order(ByteOrder.nativeOrder());
- this.relativePos = 0;
- this.length = tuple.length;
- }
-
- ((ByteBuffer) bb).put(tuple.nioBuffer());
- }
-
- private int getFieldOffset(int fieldId) {
- return UNSAFE.getInt(bb.address() + relativePos + SizeOf.SIZE_OF_INT + (fieldId * SizeOf.SIZE_OF_INT));
- }
-
- public long getFieldAddr(int fieldId) {
- int fieldOffset = getFieldOffset(fieldId);
- if (fieldOffset == -1) {
- throw new RuntimeException("Invalid Field Access: " + fieldId);
- }
- return bb.address() + relativePos + fieldOffset;
- }
-
- @Override
- public boolean contains(int fieldid) {
- return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET;
- }
-
- @Override
- public boolean isNull(int fieldid) {
- return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET;
- }
-
- @Override
- public boolean isNotNull(int fieldid) {
- return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET;
- }
-
- @Override
- public void clear() {
- // nothing to do
- }
-
- @Override
- public void put(int fieldId, Datum value) {
- throw new UnsupportedException("UnSafeTuple does not support put(int, Datum).");
- }
-
- @Override
- public void put(int fieldId, Datum[] values) {
- throw new UnsupportedException("UnSafeTuple does not support put(int, Datum []).");
- }
-
- @Override
- public void put(int fieldId, Tuple tuple) {
- throw new UnsupportedException("UnSafeTuple does not support put(int, Tuple).");
- }
-
- @Override
- public void put(Datum[] values) {
- throw new UnsupportedException("UnSafeTuple does not support put(Datum []).");
- }
-
- @Override
- public Datum get(int fieldId) {
- if (isNull(fieldId)) {
- return NullDatum.get();
- }
-
- switch (types[fieldId].getType()) {
- case BOOLEAN:
- return DatumFactory.createBool(getBool(fieldId));
- case INT1:
- case INT2:
- return DatumFactory.createInt2(getInt2(fieldId));
- case INT4:
- return DatumFactory.createInt4(getInt4(fieldId));
- case INT8:
- return DatumFactory.createInt8(getInt4(fieldId));
- case FLOAT4:
- return DatumFactory.createFloat4(getFloat4(fieldId));
- case FLOAT8:
- return DatumFactory.createFloat8(getFloat8(fieldId));
- case TEXT:
- return DatumFactory.createText(getText(fieldId));
- case TIMESTAMP:
- return DatumFactory.createTimestamp(getInt8(fieldId));
- case DATE:
- return DatumFactory.createDate(getInt4(fieldId));
- case TIME:
- return DatumFactory.createTime(getInt8(fieldId));
- case INTERVAL:
- return getInterval(fieldId);
- case INET4:
- return DatumFactory.createInet4(getInt4(fieldId));
- case PROTOBUF:
- return getProtobufDatum(fieldId);
- default:
- throw new UnsupportedException("Unknown type: " + types[fieldId]);
- }
- }
-
- @Override
- public void setOffset(long offset) {
- }
-
- @Override
- public long getOffset() {
- return 0;
- }
-
- @Override
- public boolean getBool(int fieldId) {
- return UNSAFE.getByte(getFieldAddr(fieldId)) == 0x01;
- }
-
- @Override
- public byte getByte(int fieldId) {
- return UNSAFE.getByte(getFieldAddr(fieldId));
- }
-
- @Override
- public char getChar(int fieldId) {
- return UNSAFE.getChar(getFieldAddr(fieldId));
- }
-
- @Override
- public byte[] getBytes(int fieldId) {
- long pos = getFieldAddr(fieldId);
- int len = UNSAFE.getInt(pos);
- pos += SizeOf.SIZE_OF_INT;
-
- byte [] bytes = new byte[len];
- UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
- return bytes;
- }
-
- @Override
- public short getInt2(int fieldId) {
- long addr = getFieldAddr(fieldId);
- return UNSAFE.getShort(addr);
- }
-
- @Override
- public int getInt4(int fieldId) {
- return UNSAFE.getInt(getFieldAddr(fieldId));
- }
-
- @Override
- public long getInt8(int fieldId) {
- return UNSAFE.getLong(getFieldAddr(fieldId));
- }
-
- @Override
- public float getFloat4(int fieldId) {
- return UNSAFE.getFloat(getFieldAddr(fieldId));
- }
-
- @Override
- public double getFloat8(int fieldId) {
- return UNSAFE.getDouble(getFieldAddr(fieldId));
- }
-
- @Override
- public String getText(int fieldId) {
- long pos = getFieldAddr(fieldId);
- int len = UNSAFE.getInt(pos);
- pos += SizeOf.SIZE_OF_INT;
-
- byte [] bytes = new byte[len];
- UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
- return new String(bytes);
- }
-
- public IntervalDatum getInterval(int fieldId) {
- long pos = getFieldAddr(fieldId);
- int months = UNSAFE.getInt(pos);
- pos += SizeOf.SIZE_OF_INT;
- long millisecs = UNSAFE.getLong(pos);
- return new IntervalDatum(months, millisecs);
- }
-
- @Override
- public Datum getProtobufDatum(int fieldId) {
- byte [] bytes = getBytes(fieldId);
-
- ProtobufDatumFactory factory = ProtobufDatumFactory.get(types[fieldId].getCode());
- Message.Builder builder = factory.newBuilder();
- try {
- builder.mergeFrom(bytes);
- } catch (InvalidProtocolBufferException e) {
- return NullDatum.get();
- }
-
- return new ProtobufDatum(builder.build());
- }
-
- @Override
- public char[] getUnicodeChars(int fieldId) {
- long pos = getFieldAddr(fieldId);
- int len = UNSAFE.getInt(pos);
- pos += SizeOf.SIZE_OF_INT;
-
- byte [] bytes = new byte[len];
- UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
- return StringUtils.convertBytesToChars(bytes, Charset.forName("UTF-8"));
- }
-
- @Override
- public Tuple clone() throws CloneNotSupportedException {
- return toHeapTuple();
- }
-
- @Override
- public Datum[] getValues() {
- Datum [] datums = new Datum[size()];
- for (int i = 0; i < size(); i++) {
- if (contains(i)) {
- datums[i] = get(i);
- } else {
- datums[i] = NullDatum.get();
- }
- }
- return datums;
- }
-
- @Override
- public String toString() {
- return VTuple.toDisplayString(getValues());
- }
-
- public abstract void release();
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java
deleted file mode 100644
index 73e1e2f..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import com.google.common.primitives.Longs;
-import com.google.common.primitives.UnsignedLongs;
-import org.apache.tajo.util.SizeOf;
-import org.apache.tajo.util.UnsafeUtil;
-import sun.misc.Unsafe;
-
-import java.nio.ByteOrder;
-
-/**
- * It directly access UTF bytes in UnSafeTuple without any copy. It is used by compiled TupleComparator.
- */
-public class UnSafeTupleBytesComparator {
- private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
-
- static final boolean littleEndian =
- ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN);
-
- public static int compare(long ptr1, long ptr2) {
- int lstrLen = UNSAFE.getInt(ptr1);
- int rstrLen = UNSAFE.getInt(ptr2);
-
- ptr1 += SizeOf.SIZE_OF_INT;
- ptr2 += SizeOf.SIZE_OF_INT;
-
- int minLength = Math.min(lstrLen, rstrLen);
- int minWords = minLength / Longs.BYTES;
-
- /*
- * Compare 8 bytes at a time. Benchmarking shows comparing 8 bytes at a
- * time is no slower than comparing 4 bytes at a time even on 32-bit.
- * On the other hand, it is substantially faster on 64-bit.
- */
- for (int i = 0; i < minWords * Longs.BYTES; i += Longs.BYTES) {
- long lw = UNSAFE.getLong(ptr1);
- long rw = UNSAFE.getLong(ptr2);
- long diff = lw ^ rw;
-
- if (diff != 0) {
- if (!littleEndian) {
- return UnsignedLongs.compare(lw, rw);
- }
-
- // Use binary search
- int n = 0;
- int y;
- int x = (int) diff;
- if (x == 0) {
- x = (int) (diff >>> 32);
- n = 32;
- }
-
- y = x << 16;
- if (y == 0) {
- n += 16;
- } else {
- x = y;
- }
-
- y = x << 8;
- if (y == 0) {
- n += 8;
- }
- return (int) (((lw >>> n) & 0xFFL) - ((rw >>> n) & 0xFFL));
- }
-
- ptr1 += SizeOf.SIZE_OF_LONG;
- ptr2 += SizeOf.SIZE_OF_LONG;
- }
-
- // The epilogue to cover the last (minLength % 8) elements.
- for (int i = minWords * Longs.BYTES; i < minLength; i++) {
- int result = UNSAFE.getByte(ptr1++) - UNSAFE.getByte(ptr2++);
- if (result != 0) {
- return result;
- }
- }
- return lstrLen - rstrLen;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java
deleted file mode 100644
index 51dbb29..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import java.nio.ByteBuffer;
-
-import static org.apache.tajo.common.TajoDataTypes.DataType;
-
-public class ZeroCopyTuple extends UnSafeTuple {
-
- public void set(ByteBuffer bb, int relativePos, int length, DataType [] types) {
- super.set(bb, relativePos, length, types);
- }
-
- @Override
- public void release() {
- // nothing to do
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/proto/IndexProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/proto/IndexProtos.proto b/tajo-storage/src/main/proto/IndexProtos.proto
deleted file mode 100644
index f5c8a08..0000000
--- a/tajo-storage/src/main/proto/IndexProtos.proto
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-option java_package = "org.apache.tajo.index";
-option java_outer_classname = "IndexProtos";
-option optimize_for = SPEED;
-option java_generic_services = false;
-option java_generate_equals_and_hash = true;
-
-import "CatalogProtos.proto";
-
-message TupleComparatorProto {
- required SchemaProto schema = 1;
- repeated SortSpecProto sortSpecs = 2;
- repeated TupleComparatorSpecProto compSpecs = 3;
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/proto/StorageFragmentProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/proto/StorageFragmentProtos.proto b/tajo-storage/src/main/proto/StorageFragmentProtos.proto
deleted file mode 100644
index dd79d74..0000000
--- a/tajo-storage/src/main/proto/StorageFragmentProtos.proto
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-option java_package = "org.apache.tajo.storage.fragment";
-option java_outer_classname = "StorageFragmentProtos";
-option optimize_for = SPEED;
-option java_generic_services = false;
-option java_generate_equals_and_hash = true;
-
-import "CatalogProtos.proto";
-
-message HBaseFragmentProto {
- required string tableName = 1;
- required string hbaseTableName = 2;
- required bytes startRow = 3;
- required bytes stopRow = 4;
- required bool last = 5;
- required int64 length = 6;
- optional string regionLocation = 7;
-}
\ No newline at end of file