You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/12/12 09:22:34 UTC
[20/45] tajo git commit: TAJO-1233: Merge hbase_storage branch to the
master branch. (Hyoungjun Kim via hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
new file mode 100644
index 0000000..c1835df
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
@@ -0,0 +1,112 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SchemaUtil;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.tuple.offheap.HeapTuple;
+import org.apache.tajo.tuple.offheap.OffHeapRowWriter;
+import org.apache.tajo.tuple.offheap.ZeroCopyTuple;
+import org.apache.tajo.unit.StorageUnit;
+import org.apache.tajo.util.Deallocatable;
+import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.UnsafeUtil;
+import sun.misc.Unsafe;
+import sun.nio.ch.DirectBuffer;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+public class BaseTupleBuilder extends OffHeapRowWriter implements TupleBuilder, Deallocatable {
+ private static final Log LOG = LogFactory.getLog(BaseTupleBuilder.class);
+
+ private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
+
+ // buffer
+ private ByteBuffer buffer;
+ private long address;
+
+ public BaseTupleBuilder(Schema schema) {
+ super(SchemaUtil.toDataTypes(schema));
+ buffer = ByteBuffer.allocateDirect(64 * StorageUnit.KB).order(ByteOrder.nativeOrder());
+ address = UnsafeUtil.getAddress(buffer);
+ }
+
+ @Override
+ public long address() {
+ return address;
+ }
+
+ public void ensureSize(int size) {
+ if (buffer.remaining() - size < 0) { // check the remain size
+ // enlarge new buffer and copy writing data
+ int newBlockSize = UnsafeUtil.alignedSize(buffer.capacity() * 2);
+ ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newBlockSize);
+ long newAddress = ((DirectBuffer)newByteBuf).address();
+ UNSAFE.copyMemory(this.address, newAddress, buffer.limit());
+ LOG.debug("Increase the buffer size to " + FileUtil.humanReadableByteCount(newBlockSize, false));
+
+ // release existing buffer and replace variables
+ UnsafeUtil.free(buffer);
+ buffer = newByteBuf;
+ address = newAddress;
+ }
+ }
+
+ @Override
+ public int position() {
+ return 0;
+ }
+
+ @Override
+ public void forward(int length) {
+ }
+
+ @Override
+ public void endRow() {
+ super.endRow();
+ buffer.position(0).limit(offset());
+ }
+
+ @Override
+ public Tuple build() {
+ return buildToHeapTuple();
+ }
+
+ public HeapTuple buildToHeapTuple() {
+ byte [] bytes = new byte[buffer.limit()];
+ UNSAFE.copyMemory(null, address, bytes, UnsafeUtil.ARRAY_BOOLEAN_BASE_OFFSET, buffer.limit());
+ return new HeapTuple(bytes, dataTypes());
+ }
+
+ public ZeroCopyTuple buildToZeroCopyTuple() {
+ ZeroCopyTuple zcTuple = new ZeroCopyTuple();
+ zcTuple.set(buffer, 0, buffer.limit(), dataTypes());
+ return zcTuple;
+ }
+
+ public void release() {
+ UnsafeUtil.free(buffer);
+ buffer = null;
+ address = 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/RowBlockReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/RowBlockReader.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/RowBlockReader.java
new file mode 100644
index 0000000..be734e1
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/RowBlockReader.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple;
+
+import org.apache.tajo.storage.Tuple;
+
+public interface RowBlockReader<T extends Tuple> {
+
+ /**
+ * Return for each tuple
+ *
+ * @return True if tuple block is filled with tuples. Otherwise, It will return false.
+ */
+ public boolean next(T tuple);
+
+ public void reset();
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/TupleBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/TupleBuilder.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/TupleBuilder.java
new file mode 100644
index 0000000..c43c018
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/TupleBuilder.java
@@ -0,0 +1,26 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple;
+
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.tuple.offheap.RowWriter;
+
+public interface TupleBuilder extends RowWriter {
+ public Tuple build();
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java
new file mode 100644
index 0000000..9662d5a
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import org.apache.tajo.util.Deallocatable;
+import org.apache.tajo.util.UnsafeUtil;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.tajo.common.TajoDataTypes.DataType;
+
+public class DirectBufTuple extends UnSafeTuple implements Deallocatable {
+ private ByteBuffer bb;
+
+ public DirectBufTuple(int length, DataType[] types) {
+ bb = ByteBuffer.allocateDirect(length).order(ByteOrder.nativeOrder());
+ set(bb, 0, length, types);
+ }
+
+ @Override
+ public void release() {
+ UnsafeUtil.free(bb);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java
new file mode 100644
index 0000000..a327123
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+/**
+ * Fixed size limit specification
+ */
+public class FixedSizeLimitSpec extends ResizableLimitSpec {
+ public FixedSizeLimitSpec(long size) {
+ super(size, size);
+ }
+
+ public FixedSizeLimitSpec(long size, float allowedOverflowRatio) {
+ super(size, size, allowedOverflowRatio);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
new file mode 100644
index 0000000..33f9f1c
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
@@ -0,0 +1,272 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+
+import org.apache.tajo.datum.*;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.util.SizeOf;
+import org.apache.tajo.util.StringUtils;
+import org.apache.tajo.util.UnsafeUtil;
+
+import sun.misc.Unsafe;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+
+import static org.apache.tajo.common.TajoDataTypes.DataType;
+
+public class HeapTuple implements Tuple {
+ private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
+ private static final long BASE_OFFSET = UnsafeUtil.ARRAY_BYTE_BASE_OFFSET;
+
+ private final byte [] data;
+ private final DataType [] types;
+
+ public HeapTuple(final byte [] bytes, final DataType [] types) {
+ this.data = bytes;
+ this.types = types;
+ }
+
+ @Override
+ public int size() {
+ return data.length;
+ }
+
+ public ByteBuffer nioBuffer() {
+ return ByteBuffer.wrap(data);
+ }
+
+ private int getFieldOffset(int fieldId) {
+ return UNSAFE.getInt(data, BASE_OFFSET + SizeOf.SIZE_OF_INT + (fieldId * SizeOf.SIZE_OF_INT));
+ }
+
+ private int checkNullAndGetOffset(int fieldId) {
+ int offset = getFieldOffset(fieldId);
+ if (offset == OffHeapRowBlock.NULL_FIELD_OFFSET) {
+ throw new RuntimeException("Invalid Field Access: " + fieldId);
+ }
+ return offset;
+ }
+
+ @Override
+ public boolean contains(int fieldid) {
+ return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET;
+ }
+
+ @Override
+ public boolean isNull(int fieldid) {
+ return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET;
+ }
+
+ @Override
+ public boolean isNotNull(int fieldid) {
+ return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET;
+ }
+
+ @Override
+ public void clear() {
+ // nothing to do
+ }
+
+ @Override
+ public void put(int fieldId, Datum value) {
+ throw new UnsupportedException("UnSafeTuple does not support put(int, Datum).");
+ }
+
+ @Override
+ public void put(int fieldId, Datum[] values) {
+ throw new UnsupportedException("UnSafeTuple does not support put(int, Datum []).");
+ }
+
+ @Override
+ public void put(int fieldId, Tuple tuple) {
+ throw new UnsupportedException("UnSafeTuple does not support put(int, Tuple).");
+ }
+
+ @Override
+ public void put(Datum[] values) {
+ throw new UnsupportedException("UnSafeTuple does not support put(Datum []).");
+ }
+
+ @Override
+ public Datum get(int fieldId) {
+ if (isNull(fieldId)) {
+ return NullDatum.get();
+ }
+
+ switch (types[fieldId].getType()) {
+ case BOOLEAN:
+ return DatumFactory.createBool(getBool(fieldId));
+ case INT1:
+ case INT2:
+ return DatumFactory.createInt2(getInt2(fieldId));
+ case INT4:
+ return DatumFactory.createInt4(getInt4(fieldId));
+ case INT8:
+ return DatumFactory.createInt8(getInt4(fieldId));
+ case FLOAT4:
+ return DatumFactory.createFloat4(getFloat4(fieldId));
+ case FLOAT8:
+ return DatumFactory.createFloat8(getFloat8(fieldId));
+ case TEXT:
+ return DatumFactory.createText(getText(fieldId));
+ case TIMESTAMP:
+ return DatumFactory.createTimestamp(getInt8(fieldId));
+ case DATE:
+ return DatumFactory.createDate(getInt4(fieldId));
+ case TIME:
+ return DatumFactory.createTime(getInt8(fieldId));
+ case INTERVAL:
+ return getInterval(fieldId);
+ case INET4:
+ return DatumFactory.createInet4(getInt4(fieldId));
+ case PROTOBUF:
+ return getProtobufDatum(fieldId);
+ default:
+ throw new UnsupportedException("Unknown type: " + types[fieldId]);
+ }
+ }
+
+ @Override
+ public void setOffset(long offset) {
+ }
+
+ @Override
+ public long getOffset() {
+ return 0;
+ }
+
+ @Override
+ public boolean getBool(int fieldId) {
+ return UNSAFE.getByte(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)) == 0x01;
+ }
+
+ @Override
+ public byte getByte(int fieldId) {
+ return UNSAFE.getByte(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
+ }
+
+ @Override
+ public char getChar(int fieldId) {
+ return UNSAFE.getChar(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
+ }
+
+ @Override
+ public byte[] getBytes(int fieldId) {
+ long pos = checkNullAndGetOffset(fieldId);
+ int len = UNSAFE.getInt(data, BASE_OFFSET + pos);
+ pos += SizeOf.SIZE_OF_INT;
+
+ byte [] bytes = new byte[len];
+ UNSAFE.copyMemory(data, BASE_OFFSET + pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
+ return bytes;
+ }
+
+ @Override
+ public short getInt2(int fieldId) {
+ return UNSAFE.getShort(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
+ }
+
+ @Override
+ public int getInt4(int fieldId) {
+ return UNSAFE.getInt(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
+ }
+
+ @Override
+ public long getInt8(int fieldId) {
+ return UNSAFE.getLong(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
+ }
+
+ @Override
+ public float getFloat4(int fieldId) {
+ return UNSAFE.getFloat(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
+ }
+
+ @Override
+ public double getFloat8(int fieldId) {
+ return UNSAFE.getDouble(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
+ }
+
+ @Override
+ public String getText(int fieldId) {
+ return new String(getBytes(fieldId));
+ }
+
+ public IntervalDatum getInterval(int fieldId) {
+ long pos = checkNullAndGetOffset(fieldId);
+ int months = UNSAFE.getInt(data, BASE_OFFSET + pos);
+ pos += SizeOf.SIZE_OF_INT;
+ long millisecs = UNSAFE.getLong(data, BASE_OFFSET + pos);
+ return new IntervalDatum(months, millisecs);
+ }
+
+ @Override
+ public Datum getProtobufDatum(int fieldId) {
+ byte [] bytes = getBytes(fieldId);
+
+ ProtobufDatumFactory factory = ProtobufDatumFactory.get(types[fieldId].getCode());
+ Message.Builder builder = factory.newBuilder();
+ try {
+ builder.mergeFrom(bytes);
+ } catch (InvalidProtocolBufferException e) {
+ return NullDatum.get();
+ }
+
+ return new ProtobufDatum(builder.build());
+ }
+
+ @Override
+ public char[] getUnicodeChars(int fieldId) {
+ long pos = checkNullAndGetOffset(fieldId);
+ int len = UNSAFE.getInt(data, BASE_OFFSET + pos);
+ pos += SizeOf.SIZE_OF_INT;
+
+ byte [] bytes = new byte[len];
+ UNSAFE.copyMemory(data, BASE_OFFSET + pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
+ return StringUtils.convertBytesToChars(bytes, Charset.forName("UTF-8"));
+ }
+
+ @Override
+ public Tuple clone() throws CloneNotSupportedException {
+ return this;
+ }
+
+ @Override
+ public Datum[] getValues() {
+ Datum [] datums = new Datum[size()];
+ for (int i = 0; i < size(); i++) {
+ if (contains(i)) {
+ datums[i] = get(i);
+ } else {
+ datums[i] = NullDatum.get();
+ }
+ }
+ return datums;
+ }
+
+ @Override
+ public String toString() {
+ return VTuple.toDisplayString(getValues());
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java
new file mode 100644
index 0000000..2f8e349
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.util.Deallocatable;
+import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.UnsafeUtil;
+import sun.misc.Unsafe;
+import sun.nio.ch.DirectBuffer;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+public class OffHeapMemory implements Deallocatable {
+ private static final Log LOG = LogFactory.getLog(OffHeapMemory.class);
+
+ protected static final Unsafe UNSAFE = UnsafeUtil.unsafe;
+
+ protected ByteBuffer buffer;
+ protected int memorySize;
+ protected ResizableLimitSpec limitSpec;
+ protected long address;
+
+ @VisibleForTesting
+ protected OffHeapMemory(ByteBuffer buffer, ResizableLimitSpec limitSpec) {
+ this.buffer = buffer;
+ this.address = ((DirectBuffer) buffer).address();
+ this.memorySize = buffer.limit();
+ this.limitSpec = limitSpec;
+ }
+
+ public OffHeapMemory(ResizableLimitSpec limitSpec) {
+ this(ByteBuffer.allocateDirect((int) limitSpec.initialSize()).order(ByteOrder.nativeOrder()), limitSpec);
+ }
+
+ public long address() {
+ return address;
+ }
+
+ public long size() {
+ return memorySize;
+ }
+
+ public void resize(int newSize) {
+ Preconditions.checkArgument(newSize > 0, "Size must be greater than 0 bytes");
+
+ if (newSize > limitSpec.limit()) {
+ throw new RuntimeException("Resize cannot exceed the size limit");
+ }
+
+ if (newSize < memorySize) {
+ LOG.warn("The size reduction is ignored.");
+ }
+
+ int newBlockSize = UnsafeUtil.alignedSize(newSize);
+ ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newBlockSize);
+ long newAddress = ((DirectBuffer)newByteBuf).address();
+
+ UNSAFE.copyMemory(this.address, newAddress, memorySize);
+
+ UnsafeUtil.free(buffer);
+ this.memorySize = newSize;
+ this.buffer = newByteBuf;
+ this.address = newAddress;
+ }
+
+ public java.nio.Buffer nioBuffer() {
+ return (ByteBuffer) buffer.position(0).limit(memorySize);
+ }
+
+ @Override
+ public void release() {
+ UnsafeUtil.free(this.buffer);
+ this.buffer = null;
+ this.address = 0;
+ this.memorySize = 0;
+ }
+
+ public String toString() {
+ return "memory=" + FileUtil.humanReadableByteCount(memorySize, false) + "," + limitSpec;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java
new file mode 100644
index 0000000..689efb7
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java
@@ -0,0 +1,176 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SchemaUtil;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.util.Deallocatable;
+import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.SizeOf;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import static org.apache.tajo.common.TajoDataTypes.DataType;
+
+public class OffHeapRowBlock extends OffHeapMemory implements Deallocatable {
+ private static final Log LOG = LogFactory.getLog(OffHeapRowBlock.class);
+
+ public static final int NULL_FIELD_OFFSET = -1;
+
+ DataType [] dataTypes;
+
+ // Basic States
+ private int maxRowNum = Integer.MAX_VALUE; // optional
+ private int rowNum;
+ protected int position = 0;
+
+ private OffHeapRowBlockWriter builder;
+
+ private OffHeapRowBlock(ByteBuffer buffer, Schema schema, ResizableLimitSpec limitSpec) {
+ super(buffer, limitSpec);
+ initialize(schema);
+ }
+
+ public OffHeapRowBlock(Schema schema, ResizableLimitSpec limitSpec) {
+ super(limitSpec);
+ initialize(schema);
+ }
+
+ private void initialize(Schema schema) {
+ dataTypes = SchemaUtil.toDataTypes(schema);
+
+ this.builder = new OffHeapRowBlockWriter(this);
+ }
+
+ @VisibleForTesting
+ public OffHeapRowBlock(Schema schema, int bytes) {
+ this(schema, new ResizableLimitSpec(bytes));
+ }
+
+ @VisibleForTesting
+ public OffHeapRowBlock(Schema schema, ByteBuffer buffer) {
+ this(buffer, schema, ResizableLimitSpec.DEFAULT_LIMIT);
+ }
+
+ public void position(int pos) {
+ this.position = pos;
+ }
+
+ public void clear() {
+ this.position = 0;
+ this.rowNum = 0;
+
+ builder.clear();
+ }
+
+ @Override
+ public ByteBuffer nioBuffer() {
+ return (ByteBuffer) buffer.position(0).limit(position);
+ }
+
+ public int position() {
+ return position;
+ }
+
+ public long usedMem() {
+ return position;
+ }
+
+ /**
+ * Ensure that this buffer has enough remaining space to add the size.
+ * Creates and copies to a new buffer if necessary
+ *
+ * @param size Size to add
+ */
+ public void ensureSize(int size) {
+ if (remain() - size < 0) {
+ if (!limitSpec.canIncrease(memorySize)) {
+ throw new RuntimeException("Cannot increase RowBlock anymore.");
+ }
+
+ int newBlockSize = limitSpec.increasedSize(memorySize);
+ resize(newBlockSize);
+ LOG.info("Increase DirectRowBlock to " + FileUtil.humanReadableByteCount(newBlockSize, false));
+ }
+ }
+
+ public long remain() {
+ return memorySize - position - builder.offset();
+ }
+
+ public int maxRowNum() {
+ return maxRowNum;
+ }
+ public int rows() {
+ return rowNum;
+ }
+
+ public void setRows(int rowNum) {
+ this.rowNum = rowNum;
+ }
+
+ public boolean copyFromChannel(FileChannel channel, TableStats stats) throws IOException {
+ if (channel.position() < channel.size()) {
+ clear();
+
+ buffer.clear();
+ channel.read(buffer);
+ memorySize = buffer.position();
+
+ while (position < memorySize) {
+ long recordPtr = address + position;
+
+ if (remain() < SizeOf.SIZE_OF_INT) {
+ channel.position(channel.position() - remain());
+ memorySize = (int) (memorySize - remain());
+ return true;
+ }
+
+ int recordSize = UNSAFE.getInt(recordPtr);
+
+ if (remain() < recordSize) {
+ channel.position(channel.position() - remain());
+ memorySize = (int) (memorySize - remain());
+ return true;
+ }
+
+ position += recordSize;
+ rowNum++;
+ }
+
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ public RowWriter getWriter() {
+ return builder;
+ }
+
+ public OffHeapRowBlockReader getReader() {
+ return new OffHeapRowBlockReader(this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java
new file mode 100644
index 0000000..4a9313f
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java
@@ -0,0 +1,63 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import org.apache.tajo.tuple.RowBlockReader;
+import org.apache.tajo.util.UnsafeUtil;
+import sun.misc.Unsafe;
+
+public class OffHeapRowBlockReader implements RowBlockReader<ZeroCopyTuple> {
+ private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
+ OffHeapRowBlock rowBlock;
+
+ // Read States
+ private int curRowIdxForRead;
+ private int curPosForRead;
+
+ public OffHeapRowBlockReader(OffHeapRowBlock rowBlock) {
+ this.rowBlock = rowBlock;
+ }
+
+ public long remainForRead() {
+ return rowBlock.memorySize - curPosForRead;
+ }
+
+ @Override
+ public boolean next(ZeroCopyTuple tuple) {
+ if (curRowIdxForRead < rowBlock.rows()) {
+
+ long recordStartPtr = rowBlock.address() + curPosForRead;
+ int recordLen = UNSAFE.getInt(recordStartPtr);
+ tuple.set(rowBlock.buffer, curPosForRead, recordLen, rowBlock.dataTypes);
+
+ curPosForRead += recordLen;
+ curRowIdxForRead++;
+
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public void reset() {
+ curPosForRead = 0;
+ curRowIdxForRead = 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java
new file mode 100644
index 0000000..dbc3188
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import com.google.common.collect.Lists;
+import org.apache.tajo.storage.Tuple;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+public class OffHeapRowBlockUtils {
+
+ public static List<Tuple> sort(OffHeapRowBlock rowBlock, Comparator<Tuple> comparator) {
+ List<Tuple> tupleList = Lists.newArrayList();
+ ZeroCopyTuple zcTuple = new ZeroCopyTuple();
+ OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
+ while(reader.next(zcTuple)) {
+ tupleList.add(zcTuple);
+ zcTuple = new ZeroCopyTuple();
+ }
+ Collections.sort(tupleList, comparator);
+ return tupleList;
+ }
+
+ public static Tuple[] sortToArray(OffHeapRowBlock rowBlock, Comparator<Tuple> comparator) {
+ Tuple[] tuples = new Tuple[rowBlock.rows()];
+ ZeroCopyTuple zcTuple = new ZeroCopyTuple();
+ OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
+ for (int i = 0; i < rowBlock.rows() && reader.next(zcTuple); i++) {
+ tuples[i] = zcTuple;
+ zcTuple = new ZeroCopyTuple();
+ }
+ Arrays.sort(tuples, comparator);
+ return tuples;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java
new file mode 100644
index 0000000..d177e0c
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import org.apache.tajo.common.TajoDataTypes;
+
+public class OffHeapRowBlockWriter extends OffHeapRowWriter {
+ OffHeapRowBlock rowBlock;
+
+ OffHeapRowBlockWriter(OffHeapRowBlock rowBlock) {
+ super(rowBlock.dataTypes);
+ this.rowBlock = rowBlock;
+ }
+
+ public long address() {
+ return rowBlock.address();
+ }
+
+ public int position() {
+ return rowBlock.position();
+ }
+
+ @Override
+ public void forward(int length) {
+ rowBlock.position(position() + length);
+ }
+
+ public void ensureSize(int size) {
+ rowBlock.ensureSize(size);
+ }
+
+ @Override
+ public void endRow() {
+ super.endRow();
+ rowBlock.setRows(rowBlock.rows() + 1);
+ }
+
+ @Override
+ public TajoDataTypes.DataType[] dataTypes() {
+ return rowBlock.dataTypes;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java
new file mode 100644
index 0000000..85c7e0b
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.IntervalDatum;
+import org.apache.tajo.datum.ProtobufDatum;
+import org.apache.tajo.datum.TextDatum;
+import org.apache.tajo.util.SizeOf;
+import org.apache.tajo.util.UnsafeUtil;
+
+/**
+ *
+ * Row Record Structure
+ *
+ * | row length (4 bytes) | field 1 offset | field 2 offset | ... | field N offset| field 1 | field 2| ... | field N |
+ * 4 bytes 4 bytes 4 bytes
+ *
+ */
+public abstract class OffHeapRowWriter implements RowWriter {
+ /** record size + offset list */
+ private final int headerSize;
+ /** field offsets */
+ private final int [] fieldOffsets;
+ private final TajoDataTypes.DataType [] dataTypes;
+
+ private int curFieldIdx;
+ private int curOffset;
+
+ public OffHeapRowWriter(final TajoDataTypes.DataType [] dataTypes) {
+ this.dataTypes = dataTypes;
+ fieldOffsets = new int[dataTypes.length];
+ headerSize = SizeOf.SIZE_OF_INT * (dataTypes.length + 1);
+ }
+
+ public void clear() {
+ curOffset = 0;
+ curFieldIdx = 0;
+ }
+
+ public long recordStartAddr() {
+ return address() + position();
+ }
+
+ public abstract long address();
+
+ public abstract void ensureSize(int size);
+
+ public int offset() {
+ return curOffset;
+ }
+
+ /**
+ * Current position
+ *
+ * @return The position
+ */
+ public abstract int position();
+
+ /**
+ * Forward the address;
+ *
+ * @param length Length to be forwarded
+ */
+ public abstract void forward(int length);
+
+ @Override
+ public TajoDataTypes.DataType[] dataTypes() {
+ return dataTypes;
+ }
+
+ public boolean startRow() {
+ curOffset = headerSize;
+ curFieldIdx = 0;
+ return true;
+ }
+
+ public void endRow() {
+ long rowHeaderPos = address() + position();
+ OffHeapMemory.UNSAFE.putInt(rowHeaderPos, curOffset);
+ rowHeaderPos += SizeOf.SIZE_OF_INT;
+
+ for (int i = 0; i < curFieldIdx; i++) {
+ OffHeapMemory.UNSAFE.putInt(rowHeaderPos, fieldOffsets[i]);
+ rowHeaderPos += SizeOf.SIZE_OF_INT;
+ }
+ for (int i = curFieldIdx; i < dataTypes.length; i++) {
+ OffHeapMemory.UNSAFE.putInt(rowHeaderPos, OffHeapRowBlock.NULL_FIELD_OFFSET);
+ rowHeaderPos += SizeOf.SIZE_OF_INT;
+ }
+
+ // rowOffset is equivalent to a byte length of this row.
+ forward(curOffset);
+ }
+
+ public void skipField() {
+ fieldOffsets[curFieldIdx++] = OffHeapRowBlock.NULL_FIELD_OFFSET;
+ }
+
+ private void forwardField() {
+ fieldOffsets[curFieldIdx++] = curOffset;
+ }
+
+ public void putBool(boolean val) {
+ ensureSize(SizeOf.SIZE_OF_BOOL);
+ forwardField();
+
+ OffHeapMemory.UNSAFE.putByte(recordStartAddr() + curOffset, (byte) (val ? 0x01 : 0x00));
+
+ curOffset += SizeOf.SIZE_OF_BOOL;
+ }
+
+ public void putInt2(short val) {
+ ensureSize(SizeOf.SIZE_OF_SHORT);
+ forwardField();
+
+ OffHeapMemory.UNSAFE.putShort(recordStartAddr() + curOffset, val);
+ curOffset += SizeOf.SIZE_OF_SHORT;
+ }
+
+ public void putInt4(int val) {
+ ensureSize(SizeOf.SIZE_OF_INT);
+ forwardField();
+
+ OffHeapMemory.UNSAFE.putInt(recordStartAddr() + curOffset, val);
+ curOffset += SizeOf.SIZE_OF_INT;
+ }
+
+ public void putInt8(long val) {
+ ensureSize(SizeOf.SIZE_OF_LONG);
+ forwardField();
+
+ OffHeapMemory.UNSAFE.putLong(recordStartAddr() + curOffset, val);
+ curOffset += SizeOf.SIZE_OF_LONG;
+ }
+
+ public void putFloat4(float val) {
+ ensureSize(SizeOf.SIZE_OF_FLOAT);
+ forwardField();
+
+ OffHeapMemory.UNSAFE.putFloat(recordStartAddr() + curOffset, val);
+ curOffset += SizeOf.SIZE_OF_FLOAT;
+ }
+
+ public void putFloat8(double val) {
+ ensureSize(SizeOf.SIZE_OF_DOUBLE);
+ forwardField();
+
+ OffHeapMemory.UNSAFE.putDouble(recordStartAddr() + curOffset, val);
+ curOffset += SizeOf.SIZE_OF_DOUBLE;
+ }
+
+ public void putText(String val) {
+ byte[] bytes = val.getBytes(TextDatum.DEFAULT_CHARSET);
+ putText(bytes);
+ }
+
+ public void putText(byte[] val) {
+ int bytesLen = val.length;
+
+ ensureSize(SizeOf.SIZE_OF_INT + bytesLen);
+ forwardField();
+
+ OffHeapMemory.UNSAFE.putInt(recordStartAddr() + curOffset, bytesLen);
+ curOffset += SizeOf.SIZE_OF_INT;
+
+ OffHeapMemory.UNSAFE.copyMemory(val, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, null,
+ recordStartAddr() + curOffset, bytesLen);
+ curOffset += bytesLen;
+ }
+
+ public void putBlob(byte[] val) {
+ int bytesLen = val.length;
+
+ ensureSize(SizeOf.SIZE_OF_INT + bytesLen);
+ forwardField();
+
+ OffHeapMemory.UNSAFE.putInt(recordStartAddr() + curOffset, bytesLen);
+ curOffset += SizeOf.SIZE_OF_INT;
+
+ OffHeapMemory.UNSAFE.copyMemory(val, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, null,
+ recordStartAddr() + curOffset, bytesLen);
+ curOffset += bytesLen;
+ }
+
+ public void putTimestamp(long val) {
+ putInt8(val);
+ }
+
+ public void putDate(int val) {
+ putInt4(val);
+ }
+
+ public void putTime(long val) {
+ putInt8(val);
+ }
+
+ public void putInterval(IntervalDatum val) {
+ ensureSize(SizeOf.SIZE_OF_INT + SizeOf.SIZE_OF_LONG);
+ forwardField();
+
+ long offset = recordStartAddr() + curOffset;
+ OffHeapMemory.UNSAFE.putInt(offset, val.getMonths());
+ offset += SizeOf.SIZE_OF_INT;
+ OffHeapMemory.UNSAFE.putLong(offset, val.getMilliSeconds());
+ curOffset += SizeOf.SIZE_OF_INT + SizeOf.SIZE_OF_LONG;
+ }
+
+ public void putInet4(int val) {
+ putInt4(val);
+ }
+
+ public void putProtoDatum(ProtobufDatum val) {
+ putBlob(val.asByteArray());
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java
new file mode 100644
index 0000000..14e67b2
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.util.FileUtil;
+
+/**
+ * It specifies the maximum size or increasing ratio. In addition,
+ * it guarantees that all numbers are less than or equal to Integer.MAX_VALUE 2^31
+ * due to ByteBuffer.
+ */
+public class ResizableLimitSpec {
+ private final Log LOG = LogFactory.getLog(ResizableLimitSpec.class);
+
+ public static final int MAX_SIZE_BYTES = Integer.MAX_VALUE;
+ public static final ResizableLimitSpec DEFAULT_LIMIT = new ResizableLimitSpec(Integer.MAX_VALUE);
+
+ private final long initSize;
+ private final long limitBytes;
+ private final float incRatio;
+ private final float allowedOVerflowRatio;
+ private final static float DEFAULT_ALLOWED_OVERFLOW_RATIO = 0.1f;
+ private final static float DEFAULT_INCREASE_RATIO = 1.0f;
+
+ public ResizableLimitSpec(long initSize) {
+ this(initSize, MAX_SIZE_BYTES, DEFAULT_ALLOWED_OVERFLOW_RATIO);
+ }
+
+ public ResizableLimitSpec(long initSize, long limitBytes) {
+ this(initSize, limitBytes, DEFAULT_ALLOWED_OVERFLOW_RATIO);
+ }
+
+ public ResizableLimitSpec(long initSize, long limitBytes, float allowedOverflow) {
+ this(initSize, limitBytes, allowedOverflow, DEFAULT_INCREASE_RATIO);
+ }
+
+ public ResizableLimitSpec(long initSize, long limitBytes, float allowedOverflowRatio, float incRatio) {
+ Preconditions.checkArgument(initSize > 0, "initial size must be greater than 0 bytes.");
+ Preconditions.checkArgument(initSize <= MAX_SIZE_BYTES, "The maximum initial size is 2GB.");
+ Preconditions.checkArgument(limitBytes > 0, "The limit size must be greater than 0 bytes.");
+ Preconditions.checkArgument(limitBytes <= MAX_SIZE_BYTES, "The maximum limit size is 2GB.");
+ Preconditions.checkArgument(incRatio > 0.0f, "Increase Ratio must be greater than 0.");
+
+ if (initSize == limitBytes) {
+ long overflowedSize = (long) (initSize + (initSize * allowedOverflowRatio));
+
+ if (overflowedSize > Integer.MAX_VALUE) {
+ overflowedSize = Integer.MAX_VALUE;
+ }
+
+ this.initSize = overflowedSize;
+ this.limitBytes = overflowedSize;
+ } else {
+ this.initSize = initSize;
+ limitBytes = (long) (limitBytes + (limitBytes * allowedOverflowRatio));
+
+ if (limitBytes > Integer.MAX_VALUE) {
+ this.limitBytes = Integer.MAX_VALUE;
+ } else {
+ this.limitBytes = limitBytes;
+ }
+ }
+
+ this.allowedOVerflowRatio = allowedOverflowRatio;
+ this.incRatio = incRatio;
+ }
+
+ public long initialSize() {
+ return initSize;
+ }
+
+ public long limit() {
+ return limitBytes;
+ }
+
+ public float remainRatio(long currentSize) {
+ Preconditions.checkArgument(currentSize > 0, "Size must be greater than 0 bytes.");
+ if (currentSize > Integer.MAX_VALUE) {
+ currentSize = Integer.MAX_VALUE;
+ }
+ return (float)currentSize / (float)limitBytes;
+ }
+
+ public boolean canIncrease(long currentSize) {
+ return remain(currentSize) > 0;
+ }
+
+ public long remain(long currentSize) {
+ Preconditions.checkArgument(currentSize > 0, "Size must be greater than 0 bytes.");
+ return limitBytes > Integer.MAX_VALUE ? Integer.MAX_VALUE - currentSize : limitBytes - currentSize;
+ }
+
+ public int increasedSize(int currentSize) {
+ if (currentSize < initSize) {
+ return (int) initSize;
+ }
+
+ if (currentSize > Integer.MAX_VALUE) {
+ LOG.warn("Current size already exceeds the maximum size (" + Integer.MAX_VALUE + " bytes)");
+ return Integer.MAX_VALUE;
+ }
+ long nextSize = (long) (currentSize + ((float) currentSize * incRatio));
+
+ if (nextSize > limitBytes) {
+ LOG.info("Increasing reaches size limit (" + FileUtil.humanReadableByteCount(limitBytes, false) + ")");
+ nextSize = limitBytes;
+ }
+
+ if (nextSize > Integer.MAX_VALUE) {
+ LOG.info("Increasing reaches maximum size (" + FileUtil.humanReadableByteCount(Integer.MAX_VALUE, false) + ")");
+ nextSize = Integer.MAX_VALUE;
+ }
+
+ return (int) nextSize;
+ }
+
+ @Override
+ public String toString() {
+ return "init=" + FileUtil.humanReadableByteCount(initSize, false) + ",limit="
+ + FileUtil.humanReadableByteCount(limitBytes, false) + ",overflow_ratio=" + allowedOVerflowRatio
+ + ",inc_ratio=" + incRatio;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java
new file mode 100644
index 0000000..a2b2561
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java
@@ -0,0 +1,73 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.IntervalDatum;
+import org.apache.tajo.datum.ProtobufDatum;
+
+/**
+ * The call sequence should be as follows:
+ *
+ * <pre>
+ * startRow() --> skipField() or putXXX --> endRow()
+ * </pre>
+ *
+ * The total number of skipField and putXXX invocations must be equivalent to the number of fields.
+ */
+public interface RowWriter {
+
+ public TajoDataTypes.DataType [] dataTypes();
+
+ public boolean startRow();
+
+ public void endRow();
+
+ public void skipField();
+
+ public void putBool(boolean val);
+
+ public void putInt2(short val);
+
+ public void putInt4(int val);
+
+ public void putInt8(long val);
+
+ public void putFloat4(float val);
+
+ public void putFloat8(double val);
+
+ public void putText(String val);
+
+ public void putText(byte[] val);
+
+ public void putBlob(byte[] val);
+
+ public void putTimestamp(long val);
+
+ public void putTime(long val);
+
+ public void putDate(int val);
+
+ public void putInterval(IntervalDatum val);
+
+ public void putInet4(int val);
+
+ public void putProtoDatum(ProtobufDatum datum);
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
new file mode 100644
index 0000000..b742e6d
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
@@ -0,0 +1,311 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+
+import org.apache.tajo.datum.*;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.util.SizeOf;
+import org.apache.tajo.util.StringUtils;
+import org.apache.tajo.util.UnsafeUtil;
+
+import sun.misc.Unsafe;
+import sun.nio.ch.DirectBuffer;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.Charset;
+
+import static org.apache.tajo.common.TajoDataTypes.DataType;
+
+public abstract class UnSafeTuple implements Tuple {
+ private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
+
+ private DirectBuffer bb;
+ private int relativePos;
+ private int length;
+ private DataType [] types;
+
+ protected void set(ByteBuffer bb, int relativePos, int length, DataType [] types) {
+ this.bb = (DirectBuffer) bb;
+ this.relativePos = relativePos;
+ this.length = length;
+ this.types = types;
+ }
+
+ void set(ByteBuffer bb, DataType [] types) {
+ set(bb, 0, bb.limit(), types);
+ }
+
+ @Override
+ public int size() {
+ return types.length;
+ }
+
+ public ByteBuffer nioBuffer() {
+ return ((ByteBuffer)((ByteBuffer)bb).duplicate().position(relativePos).limit(relativePos + length)).slice();
+ }
+
+ public HeapTuple toHeapTuple() {
+ byte [] bytes = new byte[length];
+ UNSAFE.copyMemory(null, bb.address() + relativePos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, length);
+ return new HeapTuple(bytes, types);
+ }
+
+ public void copyFrom(UnSafeTuple tuple) {
+ Preconditions.checkNotNull(tuple);
+
+ ((ByteBuffer) bb).clear();
+ if (length < tuple.length) {
+ UnsafeUtil.free((ByteBuffer) bb);
+ bb = (DirectBuffer) ByteBuffer.allocateDirect(tuple.length).order(ByteOrder.nativeOrder());
+ this.relativePos = 0;
+ this.length = tuple.length;
+ }
+
+ ((ByteBuffer) bb).put(tuple.nioBuffer());
+ }
+
+ private int getFieldOffset(int fieldId) {
+ return UNSAFE.getInt(bb.address() + relativePos + SizeOf.SIZE_OF_INT + (fieldId * SizeOf.SIZE_OF_INT));
+ }
+
+ public long getFieldAddr(int fieldId) {
+ int fieldOffset = getFieldOffset(fieldId);
+ if (fieldOffset == -1) {
+ throw new RuntimeException("Invalid Field Access: " + fieldId);
+ }
+ return bb.address() + relativePos + fieldOffset;
+ }
+
+ @Override
+ public boolean contains(int fieldid) {
+ return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET;
+ }
+
+ @Override
+ public boolean isNull(int fieldid) {
+ return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET;
+ }
+
+ @Override
+ public boolean isNotNull(int fieldid) {
+ return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET;
+ }
+
+ @Override
+ public void clear() {
+ // nothing to do
+ }
+
+ @Override
+ public void put(int fieldId, Datum value) {
+ throw new UnsupportedException("UnSafeTuple does not support put(int, Datum).");
+ }
+
+ @Override
+ public void put(int fieldId, Datum[] values) {
+ throw new UnsupportedException("UnSafeTuple does not support put(int, Datum []).");
+ }
+
+ @Override
+ public void put(int fieldId, Tuple tuple) {
+ throw new UnsupportedException("UnSafeTuple does not support put(int, Tuple).");
+ }
+
+ @Override
+ public void put(Datum[] values) {
+ throw new UnsupportedException("UnSafeTuple does not support put(Datum []).");
+ }
+
+ @Override
+ public Datum get(int fieldId) {
+ if (isNull(fieldId)) {
+ return NullDatum.get();
+ }
+
+ switch (types[fieldId].getType()) {
+ case BOOLEAN:
+ return DatumFactory.createBool(getBool(fieldId));
+ case INT1:
+ case INT2:
+ return DatumFactory.createInt2(getInt2(fieldId));
+ case INT4:
+ return DatumFactory.createInt4(getInt4(fieldId));
+ case INT8:
+ return DatumFactory.createInt8(getInt4(fieldId));
+ case FLOAT4:
+ return DatumFactory.createFloat4(getFloat4(fieldId));
+ case FLOAT8:
+ return DatumFactory.createFloat8(getFloat8(fieldId));
+ case TEXT:
+ return DatumFactory.createText(getText(fieldId));
+ case TIMESTAMP:
+ return DatumFactory.createTimestamp(getInt8(fieldId));
+ case DATE:
+ return DatumFactory.createDate(getInt4(fieldId));
+ case TIME:
+ return DatumFactory.createTime(getInt8(fieldId));
+ case INTERVAL:
+ return getInterval(fieldId);
+ case INET4:
+ return DatumFactory.createInet4(getInt4(fieldId));
+ case PROTOBUF:
+ return getProtobufDatum(fieldId);
+ default:
+ throw new UnsupportedException("Unknown type: " + types[fieldId]);
+ }
+ }
+
+ @Override
+ public void setOffset(long offset) {
+ }
+
+ @Override
+ public long getOffset() {
+ return 0;
+ }
+
+ @Override
+ public boolean getBool(int fieldId) {
+ return UNSAFE.getByte(getFieldAddr(fieldId)) == 0x01;
+ }
+
+ @Override
+ public byte getByte(int fieldId) {
+ return UNSAFE.getByte(getFieldAddr(fieldId));
+ }
+
+ @Override
+ public char getChar(int fieldId) {
+ return UNSAFE.getChar(getFieldAddr(fieldId));
+ }
+
+ @Override
+ public byte[] getBytes(int fieldId) {
+ long pos = getFieldAddr(fieldId);
+ int len = UNSAFE.getInt(pos);
+ pos += SizeOf.SIZE_OF_INT;
+
+ byte [] bytes = new byte[len];
+ UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
+ return bytes;
+ }
+
+ @Override
+ public short getInt2(int fieldId) {
+ long addr = getFieldAddr(fieldId);
+ return UNSAFE.getShort(addr);
+ }
+
+ @Override
+ public int getInt4(int fieldId) {
+ return UNSAFE.getInt(getFieldAddr(fieldId));
+ }
+
+ @Override
+ public long getInt8(int fieldId) {
+ return UNSAFE.getLong(getFieldAddr(fieldId));
+ }
+
+ @Override
+ public float getFloat4(int fieldId) {
+ return UNSAFE.getFloat(getFieldAddr(fieldId));
+ }
+
+ @Override
+ public double getFloat8(int fieldId) {
+ return UNSAFE.getDouble(getFieldAddr(fieldId));
+ }
+
+ @Override
+ public String getText(int fieldId) {
+ long pos = getFieldAddr(fieldId);
+ int len = UNSAFE.getInt(pos);
+ pos += SizeOf.SIZE_OF_INT;
+
+ byte [] bytes = new byte[len];
+ UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
+ return new String(bytes);
+ }
+
+ public IntervalDatum getInterval(int fieldId) {
+ long pos = getFieldAddr(fieldId);
+ int months = UNSAFE.getInt(pos);
+ pos += SizeOf.SIZE_OF_INT;
+ long millisecs = UNSAFE.getLong(pos);
+ return new IntervalDatum(months, millisecs);
+ }
+
+ @Override
+ public Datum getProtobufDatum(int fieldId) {
+ byte [] bytes = getBytes(fieldId);
+
+ ProtobufDatumFactory factory = ProtobufDatumFactory.get(types[fieldId].getCode());
+ Message.Builder builder = factory.newBuilder();
+ try {
+ builder.mergeFrom(bytes);
+ } catch (InvalidProtocolBufferException e) {
+ return NullDatum.get();
+ }
+
+ return new ProtobufDatum(builder.build());
+ }
+
+ @Override
+ public char[] getUnicodeChars(int fieldId) {
+ long pos = getFieldAddr(fieldId);
+ int len = UNSAFE.getInt(pos);
+ pos += SizeOf.SIZE_OF_INT;
+
+ byte [] bytes = new byte[len];
+ UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
+ return StringUtils.convertBytesToChars(bytes, Charset.forName("UTF-8"));
+ }
+
+ @Override
+ public Tuple clone() throws CloneNotSupportedException {
+ return toHeapTuple();
+ }
+
+ @Override
+ public Datum[] getValues() {
+ Datum [] datums = new Datum[size()];
+ for (int i = 0; i < size(); i++) {
+ if (contains(i)) {
+ datums[i] = get(i);
+ } else {
+ datums[i] = NullDatum.get();
+ }
+ }
+ return datums;
+ }
+
+ @Override
+ public String toString() {
+ return VTuple.toDisplayString(getValues());
+ }
+
+ public abstract void release();
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java
new file mode 100644
index 0000000..73e1e2f
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java
@@ -0,0 +1,99 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import com.google.common.primitives.Longs;
+import com.google.common.primitives.UnsignedLongs;
+import org.apache.tajo.util.SizeOf;
+import org.apache.tajo.util.UnsafeUtil;
+import sun.misc.Unsafe;
+
+import java.nio.ByteOrder;
+
+/**
+ * It directly access UTF bytes in UnSafeTuple without any copy. It is used by compiled TupleComparator.
+ */
+public class UnSafeTupleBytesComparator {
+ private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
+
+ static final boolean littleEndian =
+ ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN);
+
+ public static int compare(long ptr1, long ptr2) {
+ int lstrLen = UNSAFE.getInt(ptr1);
+ int rstrLen = UNSAFE.getInt(ptr2);
+
+ ptr1 += SizeOf.SIZE_OF_INT;
+ ptr2 += SizeOf.SIZE_OF_INT;
+
+ int minLength = Math.min(lstrLen, rstrLen);
+ int minWords = minLength / Longs.BYTES;
+
+ /*
+ * Compare 8 bytes at a time. Benchmarking shows comparing 8 bytes at a
+ * time is no slower than comparing 4 bytes at a time even on 32-bit.
+ * On the other hand, it is substantially faster on 64-bit.
+ */
+ for (int i = 0; i < minWords * Longs.BYTES; i += Longs.BYTES) {
+ long lw = UNSAFE.getLong(ptr1);
+ long rw = UNSAFE.getLong(ptr2);
+ long diff = lw ^ rw;
+
+ if (diff != 0) {
+ if (!littleEndian) {
+ return UnsignedLongs.compare(lw, rw);
+ }
+
+ // Use binary search
+ int n = 0;
+ int y;
+ int x = (int) diff;
+ if (x == 0) {
+ x = (int) (diff >>> 32);
+ n = 32;
+ }
+
+ y = x << 16;
+ if (y == 0) {
+ n += 16;
+ } else {
+ x = y;
+ }
+
+ y = x << 8;
+ if (y == 0) {
+ n += 8;
+ }
+ return (int) (((lw >>> n) & 0xFFL) - ((rw >>> n) & 0xFFL));
+ }
+
+ ptr1 += SizeOf.SIZE_OF_LONG;
+ ptr2 += SizeOf.SIZE_OF_LONG;
+ }
+
+ // The epilogue to cover the last (minLength % 8) elements.
+ for (int i = minWords * Longs.BYTES; i < minLength; i++) {
+ int result = UNSAFE.getByte(ptr1++) - UNSAFE.getByte(ptr2++);
+ if (result != 0) {
+ return result;
+ }
+ }
+ return lstrLen - rstrLen;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java
new file mode 100644
index 0000000..51dbb29
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import java.nio.ByteBuffer;
+
+import static org.apache.tajo.common.TajoDataTypes.DataType;
+
+public class ZeroCopyTuple extends UnSafeTuple {
+
+ public void set(ByteBuffer bb, int relativePos, int length, DataType [] types) {
+ super.set(bb, relativePos, length, types);
+ }
+
+ @Override
+ public void release() {
+ // nothing to do
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/proto/IndexProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/proto/IndexProtos.proto b/tajo-storage/tajo-storage-common/src/main/proto/IndexProtos.proto
new file mode 100644
index 0000000..f5c8a08
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/proto/IndexProtos.proto
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tajo.index";
+option java_outer_classname = "IndexProtos";
+option optimize_for = SPEED;
+option java_generic_services = false;
+option java_generate_equals_and_hash = true;
+
+import "CatalogProtos.proto";
+
+message TupleComparatorProto {
+ required SchemaProto schema = 1;
+ repeated SortSpecProto sortSpecs = 2;
+ repeated TupleComparatorSpecProto compSpecs = 3;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
new file mode 100644
index 0000000..67033ed
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
@@ -0,0 +1,198 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<configuration>
+ <!-- Storage Manager Configuration -->
+ <property>
+ <name>tajo.storage.manager.hdfs.class</name>
+ <value>org.apache.tajo.storage.FileStorageManager</value>
+ </property>
+ <property>
+ <name>tajo.storage.manager.hbase.class</name>
+ <value>org.apache.tajo.storage.hbase.HBaseStorageManager</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.manager.concurrency.perDisk</name>
+ <value>1</value>
+ <description></description>
+ </property>
+
+ <!--- Registered Scanner Handler -->
+ <property>
+ <name>tajo.storage.scanner-handler</name>
+ <value>textfile,csv,json,raw,rcfile,row,parquet,sequencefile,avro,hbase</value>
+ </property>
+
+ <!--- Fragment Class Configurations -->
+ <property>
+ <name>tajo.storage.fragment.textfile.class</name>
+ <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ </property>
+ <property>
+ <name>tajo.storage.fragment.csv.class</name>
+ <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ </property>
+ <property>
+ <name>tajo.storage.fragment.json.class</name>
+ <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ </property>
+ <property>
+ <name>tajo.storage.fragment.raw.class</name>
+ <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ </property>
+ <property>
+ <name>tajo.storage.fragment.rcfile.class</name>
+ <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ </property>
+ <property>
+ <name>tajo.storage.fragment.row.class</name>
+ <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ </property>
+ <property>
+ <name>tajo.storage.fragment.parquet.class</name>
+ <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ </property>
+ <property>
+ <name>tajo.storage.fragment.sequencefile.class</name>
+ <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ </property>
+ <property>
+ <name>tajo.storage.fragment.avro.class</name>
+ <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ </property>
+ <property>
+ <name>tajo.storage.fragment.hbase.class</name>
+ <value>org.apache.tajo.storage.hbase.HBaseFragment</value>
+ </property>
+
+ <!--- Scanner Handler -->
+ <property>
+ <name>tajo.storage.scanner-handler.textfile.class</name>
+ <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.scanner-handler.csv.class</name>
+ <value>org.apache.tajo.storage.CSVFile$CSVScanner</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.scanner-handler.json.class</name>
+ <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.scanner-handler.raw.class</name>
+ <value>org.apache.tajo.storage.RawFile$RawFileScanner</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.scanner-handler.rcfile.class</name>
+ <value>org.apache.tajo.storage.rcfile.RCFile$RCFileScanner</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.scanner-handler.rowfile.class</name>
+ <value>org.apache.tajo.storage.RowFile$RowFileScanner</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.scanner-handler.parquet.class</name>
+ <value>org.apache.tajo.storage.parquet.ParquetScanner</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.scanner-handler.sequencefile.class</name>
+ <value>org.apache.tajo.storage.sequencefile.SequenceFileScanner</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.scanner-handler.avro.class</name>
+ <value>org.apache.tajo.storage.avro.AvroScanner</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.scanner-handler.hbase.class</name>
+ <value>org.apache.tajo.storage.hbase.HBaseScanner</value>
+ </property>
+
+ <!--- Appender Handler -->
+ <property>
+ <name>tajo.storage.appender-handler</name>
+ <value>textfile,csv,raw,rcfile,row,parquet,sequencefile,avro,hbase</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.textfile.class</name>
+ <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.csv.class</name>
+ <value>org.apache.tajo.storage.CSVFile$CSVAppender</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.json.class</name>
+ <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.raw.class</name>
+ <value>org.apache.tajo.storage.RawFile$RawFileAppender</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.rcfile.class</name>
+ <value>org.apache.tajo.storage.rcfile.RCFile$RCFileAppender</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.rowfile.class</name>
+ <value>org.apache.tajo.storage.RowFile$RowFileAppender</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.parquet.class</name>
+ <value>org.apache.tajo.storage.parquet.ParquetAppender</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.sequencefile.class</name>
+ <value>org.apache.tajo.storage.sequencefile.SequenceFileAppender</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.avro.class</name>
+ <value>org.apache.tajo.storage.avro.AvroAppender</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.hbase.class</name>
+ <value>org.apache.tajo.storage.hbase.HFileAppender</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.hfile.class</name>
+ <value>org.apache.tajo.storage.hbase.HFileAppender</value>
+ </property>
+</configuration>
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestFrameTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestFrameTuple.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestFrameTuple.java
new file mode 100644
index 0000000..0251dc7
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestFrameTuple.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestFrameTuple {
+ private Tuple tuple1;
+ private Tuple tuple2;
+
+ @Before
+ public void setUp() throws Exception {
+ tuple1 = new VTuple(11);
+ tuple1.put(new Datum[] {
+ DatumFactory.createBool(true),
+ DatumFactory.createBit((byte) 0x99),
+ DatumFactory.createChar('9'),
+ DatumFactory.createInt2((short) 17),
+ DatumFactory.createInt4(59),
+ DatumFactory.createInt8(23l),
+ DatumFactory.createFloat4(77.9f),
+ DatumFactory.createFloat8(271.9f),
+ DatumFactory.createText("hyunsik"),
+ DatumFactory.createBlob("hyunsik".getBytes()),
+ DatumFactory.createInet4("192.168.0.1")
+ });
+
+ tuple2 = new VTuple(11);
+ tuple2.put(new Datum[] {
+ DatumFactory.createBool(true),
+ DatumFactory.createBit((byte) 0x99),
+ DatumFactory.createChar('9'),
+ DatumFactory.createInt2((short) 17),
+ DatumFactory.createInt4(59),
+ DatumFactory.createInt8(23l),
+ DatumFactory.createFloat4(77.9f),
+ DatumFactory.createFloat8(271.9f),
+ DatumFactory.createText("hyunsik"),
+ DatumFactory.createBlob("hyunsik".getBytes()),
+ DatumFactory.createInet4("192.168.0.1")
+ });
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public final void testFrameTuple() {
+ Tuple frame = new FrameTuple(tuple1, tuple2);
+ assertEquals(22, frame.size());
+ for (int i = 0; i < 22; i++) {
+ assertTrue(frame.contains(i));
+ }
+
+ assertEquals(DatumFactory.createInt8(23l), frame.get(5));
+ assertEquals(DatumFactory.createInt8(23l), frame.get(16));
+ assertEquals(DatumFactory.createInet4("192.168.0.1"), frame.get(10));
+ assertEquals(DatumFactory.createInet4("192.168.0.1"), frame.get(21));
+ }
+}