You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2015/09/03 14:45:33 UTC
[3/4] tajo git commit: TAJO-1738: Improve off-heap RowBlock.
http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTuple.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTuple.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTuple.java
new file mode 100644
index 0000000..3dc8c23
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTuple.java
@@ -0,0 +1,342 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.memory;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+import io.netty.util.internal.PlatformDependent;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.*;
+import org.apache.tajo.exception.TajoRuntimeException;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.util.SizeOf;
+import org.apache.tajo.util.StringUtils;
+import org.apache.tajo.util.UnsafeUtil;
+import org.apache.tajo.util.datetime.TimeMeta;
+import sun.misc.Unsafe;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+
+import static org.apache.tajo.common.TajoDataTypes.DataType;
+
+public class UnSafeTuple extends ZeroCopyTuple {
+ private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
+
+ private long address;
+ private DataType[] types;
+
+ @Override
+ public void set(MemoryBlock memoryBlock, int relativePos, int length, DataType[] types) {
+ Preconditions.checkArgument(memoryBlock.hasAddress());
+
+ this.address = memoryBlock.address();
+ this.types = types;
+ super.set(relativePos, length);
+ }
+
+ public void set(UnSafeTuple tuple) {
+ this.address = tuple.address;
+ this.types = tuple.types;
+ super.set(tuple.getRelativePos(), tuple.getLength());
+ }
+
+ @Override
+ public int size() {
+ return types.length;
+ }
+
+ @Override
+ public TajoDataTypes.Type type(int fieldId) {
+ return types[fieldId].getType();
+ }
+
+ @Override
+ public int size(int fieldId) {
+ return PlatformDependent.getInt(getFieldAddr(fieldId));
+ }
+
+ public void writeTo(ByteBuffer bb) {
+ if (bb.remaining() < getLength()) {
+ throw new IndexOutOfBoundsException("remaining length: " + bb.remaining()
+ + ", tuple length: " + getLength());
+ }
+
+ if (getLength() > 0) {
+ if (bb.isDirect()) {
+ PlatformDependent.copyMemory(address(), PlatformDependent.directBufferAddress(bb) + bb.position(), getLength());
+ bb.position(bb.position() + getLength());
+ } else {
+ PlatformDependent.copyMemory(address(), bb.array(), bb.arrayOffset() + bb.position(), getLength());
+ bb.position(bb.position() + getLength());
+ }
+ }
+ }
+
+ public long address() {
+ return address + getRelativePos();
+ }
+
+ public HeapTuple toHeapTuple() {
+ HeapTuple heapTuple = new HeapTuple();
+ byte [] bytes = new byte[getLength()];
+ PlatformDependent.copyMemory(address(), bytes, 0, getLength());
+ heapTuple.set(bytes, types);
+ return heapTuple;
+ }
+
+ private int getFieldOffset(int fieldId) {
+ return PlatformDependent.getInt(address()+ (long)(SizeOf.SIZE_OF_INT + (fieldId * SizeOf.SIZE_OF_INT)));
+ }
+
+ public long getFieldAddr(int fieldId) {
+ int fieldOffset = getFieldOffset(fieldId);
+ if (fieldOffset == -1) {
+ throw new RuntimeException("Invalid Field Access: " + fieldId);
+ }
+ return address() + fieldOffset;
+ }
+
+ @Override
+ public boolean contains(int fieldid) {
+ return getFieldOffset(fieldid) > MemoryRowBlock.NULL_FIELD_OFFSET;
+ }
+
+ @Override
+ public boolean isBlank(int fieldid) {
+ return getFieldOffset(fieldid) == MemoryRowBlock.NULL_FIELD_OFFSET;
+ }
+
+ @Override
+ public boolean isBlankOrNull(int fieldid) {
+ return getFieldOffset(fieldid) == MemoryRowBlock.NULL_FIELD_OFFSET;
+ }
+
+ @Override
+ public void clear() {
+ // nothing to do
+ }
+
+ @Override
+ public void put(int fieldId, Datum value) {
+ throw new TajoRuntimeException(new UnsupportedException());
+ }
+
+ @Override
+ public void put(int fieldId, Tuple tuple) {
+ throw new TajoRuntimeException(new UnsupportedException());
+ }
+
+ @Override
+ public void put(Datum[] values) {
+ throw new TajoRuntimeException(new UnsupportedException());
+ }
+
+ @Override
+ public Datum asDatum(int fieldId) {
+ if (isBlankOrNull(fieldId)) {
+ return NullDatum.get();
+ }
+
+ switch (types[fieldId].getType()) {
+ case BOOLEAN:
+ return DatumFactory.createBool(getBool(fieldId));
+ case BIT:
+ return DatumFactory.createBit(getByte(fieldId));
+ case INT1:
+ case INT2:
+ return DatumFactory.createInt2(getInt2(fieldId));
+ case INT4:
+ return DatumFactory.createInt4(getInt4(fieldId));
+ case INT8:
+ return DatumFactory.createInt8(getInt8(fieldId));
+ case FLOAT4:
+ return DatumFactory.createFloat4(getFloat4(fieldId));
+ case FLOAT8:
+ return DatumFactory.createFloat8(getFloat8(fieldId));
+ case CHAR:
+ return DatumFactory.createChar(getBytes(fieldId));
+ case TEXT:
+ return DatumFactory.createText(getBytes(fieldId));
+ case BLOB:
+ return DatumFactory.createBlob(getBytes(fieldId));
+ case TIMESTAMP:
+ return DatumFactory.createTimestamp(getInt8(fieldId));
+ case DATE:
+ return DatumFactory.createDate(getInt4(fieldId));
+ case TIME:
+ return DatumFactory.createTime(getInt8(fieldId));
+ case INTERVAL:
+ return getInterval(fieldId);
+ case INET4:
+ return DatumFactory.createInet4(getInt4(fieldId));
+ case PROTOBUF:
+ return getProtobufDatum(fieldId);
+ case NULL_TYPE:
+ return NullDatum.get();
+ default:
+ throw new TajoRuntimeException(new UnsupportedException("data type '" + types[fieldId] + "'"));
+ }
+ }
+
+ @Override
+ public void clearOffset() {
+ }
+
+ @Override
+ public void setOffset(long offset) {
+ }
+
+ @Override
+ public long getOffset() {
+ return 0;
+ }
+
+ @Override
+ public boolean getBool(int fieldId) {
+ return PlatformDependent.getByte(getFieldAddr(fieldId)) == 0x01;
+ }
+
+ @Override
+ public byte getByte(int fieldId) {
+ return PlatformDependent.getByte(getFieldAddr(fieldId));
+ }
+
+ @Override
+ public char getChar(int fieldId) {
+ return UNSAFE.getChar(getFieldAddr(fieldId));
+ }
+
+ @Override
+ public byte[] getBytes(int fieldId) {
+ long pos = getFieldAddr(fieldId);
+ int len = PlatformDependent.getInt(pos);
+ pos += SizeOf.SIZE_OF_INT;
+
+ byte [] bytes = new byte[len];
+ PlatformDependent.copyMemory(pos, bytes, 0, len);
+ return bytes;
+ }
+
+ @Override
+ public byte[] getTextBytes(int fieldId) {
+ return asDatum(fieldId).asTextBytes();
+ }
+
+ @Override
+ public short getInt2(int fieldId) {
+ long addr = getFieldAddr(fieldId);
+ return PlatformDependent.getShort(addr);
+ }
+
+ @Override
+ public int getInt4(int fieldId) {
+ return PlatformDependent.getInt(getFieldAddr(fieldId));
+ }
+
+ @Override
+ public long getInt8(int fieldId) {
+ return PlatformDependent.getLong(getFieldAddr(fieldId));
+ }
+
+ @Override
+ public float getFloat4(int fieldId) {
+ return Float.intBitsToFloat(PlatformDependent.getInt(getFieldAddr(fieldId)));
+ }
+
+ @Override
+ public double getFloat8(int fieldId) {
+ return Double.longBitsToDouble(PlatformDependent.getLong(getFieldAddr(fieldId)));
+ }
+
+ @Override
+ public String getText(int fieldId) {
+ return new String(getBytes(fieldId), TextDatum.DEFAULT_CHARSET);
+ }
+
+ @Override
+ public IntervalDatum getInterval(int fieldId) {
+ long pos = getFieldAddr(fieldId);
+ int months = PlatformDependent.getInt(pos);
+ pos += SizeOf.SIZE_OF_INT;
+ long millisecs = PlatformDependent.getLong(pos);
+ return new IntervalDatum(months, millisecs);
+ }
+
+ @Override
+ public Datum getProtobufDatum(int fieldId) {
+ byte [] bytes = getBytes(fieldId);
+
+ ProtobufDatumFactory factory = ProtobufDatumFactory.get(types[fieldId]);
+ Message.Builder builder = factory.newBuilder();
+ try {
+ builder.mergeFrom(bytes);
+ } catch (InvalidProtocolBufferException e) {
+ return NullDatum.get();
+ }
+
+ return new ProtobufDatum(builder.build());
+ }
+
+ @Override
+ public char[] getUnicodeChars(int fieldId) {
+ long pos = getFieldAddr(fieldId);
+ int len = PlatformDependent.getInt(pos);
+ pos += SizeOf.SIZE_OF_INT;
+
+ byte [] bytes = new byte[len];
+ PlatformDependent.copyMemory(pos, bytes, 0, len);
+ return StringUtils.convertBytesToChars(bytes, Charset.forName("UTF-8"));
+ }
+
+ @Override
+ public TimeMeta getTimeDate(int fieldId) {
+ return asDatum(fieldId).asTimeMeta();
+ }
+
+ @Override
+ public Tuple clone() throws CloneNotSupportedException {
+ return toHeapTuple();
+ }
+
+ @Override
+ public Datum[] getValues() {
+ Datum [] datums = new Datum[size()];
+ for (int i = 0; i < size(); i++) {
+ if (contains(i)) {
+ datums[i] = asDatum(i);
+ } else {
+ datums[i] = NullDatum.get();
+ }
+ }
+ return datums;
+ }
+
+ @Override
+ public String toString() {
+ return VTuple.toDisplayString(getValues());
+ }
+
+ public void release() {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTupleBytesComparator.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTupleBytesComparator.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTupleBytesComparator.java
new file mode 100644
index 0000000..53a78a8
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTupleBytesComparator.java
@@ -0,0 +1,99 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.memory;
+
+import com.google.common.primitives.Longs;
+import com.google.common.primitives.UnsignedLongs;
+import org.apache.tajo.util.SizeOf;
+import org.apache.tajo.util.UnsafeUtil;
+import sun.misc.Unsafe;
+
+import java.nio.ByteOrder;
+
+/**
+ * It directly access UTF bytes in UnSafeTuple without any copy. It is used by compiled TupleComparator.
+ */
+public class UnSafeTupleBytesComparator {
+ private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
+
+ static final boolean littleEndian =
+ ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN);
+
+ public static int compare(long ptr1, long ptr2) {
+ int lstrLen = UNSAFE.getInt(ptr1);
+ int rstrLen = UNSAFE.getInt(ptr2);
+
+ ptr1 += SizeOf.SIZE_OF_INT;
+ ptr2 += SizeOf.SIZE_OF_INT;
+
+ int minLength = Math.min(lstrLen, rstrLen);
+ int minWords = minLength / Longs.BYTES;
+
+ /*
+ * Compare 8 bytes at a time. Benchmarking shows comparing 8 bytes at a
+ * time is no slower than comparing 4 bytes at a time even on 32-bit.
+ * On the other hand, it is substantially faster on 64-bit.
+ */
+ for (int i = 0; i < minWords * Longs.BYTES; i += Longs.BYTES) {
+ long lw = UNSAFE.getLong(ptr1);
+ long rw = UNSAFE.getLong(ptr2);
+ long diff = lw ^ rw;
+
+ if (diff != 0) {
+ if (!littleEndian) {
+ return UnsignedLongs.compare(lw, rw);
+ }
+
+ // Use binary search
+ int n = 0;
+ int y;
+ int x = (int) diff;
+ if (x == 0) {
+ x = (int) (diff >>> 32);
+ n = 32;
+ }
+
+ y = x << 16;
+ if (y == 0) {
+ n += 16;
+ } else {
+ x = y;
+ }
+
+ y = x << 8;
+ if (y == 0) {
+ n += 8;
+ }
+ return (int) (((lw >>> n) & 0xFFL) - ((rw >>> n) & 0xFFL));
+ }
+
+ ptr1 += SizeOf.SIZE_OF_LONG;
+ ptr2 += SizeOf.SIZE_OF_LONG;
+ }
+
+ // The epilogue to cover the last (minLength % 8) elements.
+ for (int i = minWords * Longs.BYTES; i < minLength; i++) {
+ int result = UNSAFE.getByte(ptr1++) - UNSAFE.getByte(ptr2++);
+ if (result != 0) {
+ return result;
+ }
+ }
+ return lstrLen - rstrLen;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ZeroCopyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ZeroCopyTuple.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ZeroCopyTuple.java
new file mode 100644
index 0000000..1f4f57e
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ZeroCopyTuple.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.memory;
+
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.storage.Tuple;
+
+public abstract class ZeroCopyTuple implements Tuple {
+
+ protected int relativePos;
+ protected int length;
+
+ public abstract void set(MemoryBlock memoryBlock, int relativePos, int length, DataType[] types);
+
+ void set(int relativePos, int length) {
+ this.relativePos = relativePos;
+ this.length = length;
+ }
+
+ public int getRelativePos() {
+ return relativePos;
+ }
+
+ public int getLength() {
+ return length;
+ }
+
+ @Override
+ public Tuple clone() throws CloneNotSupportedException {
+ return (Tuple) super.clone();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-common/src/main/java/org/apache/tajo/util/UnsafeUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/UnsafeUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/UnsafeUtil.java
index ff6072e..575e628 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/UnsafeUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/UnsafeUtil.java
@@ -18,7 +18,7 @@
package org.apache.tajo.util;
import com.google.common.base.Preconditions;
-import sun.misc.Cleaner;
+import io.netty.util.internal.PlatformDependent;
import sun.misc.Unsafe;
import sun.nio.ch.DirectBuffer;
@@ -132,12 +132,6 @@ public class UnsafeUtil {
}
public static void free(ByteBuffer bb) {
- Preconditions.checkNotNull(bb);
- Preconditions.checkState(bb.isDirect());
-
- Cleaner cleaner = ((DirectBuffer) bb).cleaner();
- if (cleaner != null) {
- cleaner.clean();
- }
+ PlatformDependent.freeDirectBuffer(bb);
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-common/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java b/tajo-common/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java
new file mode 100644
index 0000000..6ce1a6f
--- /dev/null
+++ b/tajo-common/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple;
+
+import org.apache.tajo.tuple.memory.*;
+import org.junit.Test;
+
+public class TestBaseTupleBuilder {
+
+ @Test
+ public void testBuild() {
+ BaseTupleBuilder builder = new BaseTupleBuilder(TestMemoryRowBlock.schema);
+
+ MemoryRowBlock rowBlock = TestMemoryRowBlock.createRowBlock(10248);
+ RowBlockReader reader = rowBlock.getReader();
+
+ ZeroCopyTuple inputTuple = new UnSafeTuple();
+
+ HeapTuple heapTuple;
+ ZeroCopyTuple zcTuple;
+ int i = 0;
+ while(reader.next(inputTuple)) {
+ OffHeapRowBlockUtils.convert(inputTuple, builder);
+
+ zcTuple = builder.buildToZeroCopyTuple();
+ TestMemoryRowBlock.validateTupleResult(i, zcTuple);
+
+ heapTuple = builder.buildToHeapTuple();
+ TestMemoryRowBlock.validateTupleResult(i, heapTuple);
+
+ i++;
+ }
+ builder.release();
+ rowBlock.release();
+ }
+
+ @Test
+ public void testBuildWithNull() {
+ BaseTupleBuilder builder = new BaseTupleBuilder(TestMemoryRowBlock.schema);
+
+ MemoryRowBlock rowBlock = TestMemoryRowBlock.createRowBlockWithNull(10248);
+ RowBlockReader reader = rowBlock.getReader();
+
+ ZeroCopyTuple inputTuple = new UnSafeTuple();
+
+ HeapTuple heapTuple;
+ ZeroCopyTuple zcTuple;
+ int i = 0;
+ while(reader.next(inputTuple)) {
+ OffHeapRowBlockUtils.convert(inputTuple, builder);
+
+ heapTuple = builder.buildToHeapTuple();
+ TestMemoryRowBlock.validateNullity(i, heapTuple);
+
+ zcTuple = builder.buildToZeroCopyTuple();
+ TestMemoryRowBlock.validateNullity(i, zcTuple);
+
+ i++;
+ }
+
+ builder.release();
+ rowBlock.release();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-common/src/test/java/org/apache/tajo/tuple/memory/TestHeapTuple.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/test/java/org/apache/tajo/tuple/memory/TestHeapTuple.java b/tajo-common/src/test/java/org/apache/tajo/tuple/memory/TestHeapTuple.java
new file mode 100644
index 0000000..2b45428
--- /dev/null
+++ b/tajo-common/src/test/java/org/apache/tajo/tuple/memory/TestHeapTuple.java
@@ -0,0 +1,82 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.memory;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.tajo.storage.BufferPool;
+import org.apache.tajo.tuple.RowBlockReader;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TestHeapTuple {
+
+ @Test
+ public void testHeapTupleFromOffheap() {
+ MemoryRowBlock rowBlock = TestMemoryRowBlock.createRowBlock(1024);
+ assertTrue(rowBlock.getMemory().getBuffer().isDirect());
+ assertTrue(rowBlock.getMemory().hasAddress());
+
+ RowBlockReader reader = rowBlock.getReader();
+ assertEquals(OffHeapRowBlockReader.class, reader.getClass());
+
+ UnSafeTuple zcTuple = new UnSafeTuple();
+ int i = 0;
+ while (reader.next(zcTuple)) {
+
+ HeapTuple heapTuple = zcTuple.toHeapTuple();
+ TestMemoryRowBlock.validateTupleResult(i, heapTuple);
+ TestMemoryRowBlock.validateTupleResult(i, zcTuple);
+ TestMemoryRowBlock.validateTupleResult(i, zcTuple.toHeapTuple());
+ i++;
+ }
+
+ assertEquals(rowBlock.rows(), i);
+ rowBlock.release();
+ }
+
+ @Test
+ public void testHeapTupleFromHeap() throws CloneNotSupportedException {
+ MemoryRowBlock rowBlock = TestMemoryRowBlock.createRowBlock(1024);
+ int length = rowBlock.getMemory().writerPosition();
+ //write rows to heap
+ ByteBuf heapBuffer = BufferPool.heapBuffer(length, length);
+ heapBuffer.writeBytes(rowBlock.getMemory().getBuffer());
+ assertFalse(heapBuffer.isDirect());
+
+ ResizableMemoryBlock memoryBlock =
+ new ResizableMemoryBlock(heapBuffer);
+ assertFalse(memoryBlock.hasAddress());
+
+
+ RowBlockReader reader = new HeapRowBlockReader(memoryBlock, rowBlock.getDataTypes(), rowBlock.rows());
+ assertEquals(HeapRowBlockReader.class, reader.getClass());
+ HeapTuple heapTuple = new HeapTuple();
+ int i = 0;
+ while (reader.next(heapTuple)) {
+
+ TestMemoryRowBlock.validateTupleResult(i, heapTuple);
+ TestMemoryRowBlock.validateTupleResult(i, heapTuple.clone());
+ i++;
+ }
+ assertEquals(rowBlock.rows(), i);
+ rowBlock.release();
+ memoryBlock.release();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-common/src/test/java/org/apache/tajo/tuple/memory/TestMemoryRowBlock.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/test/java/org/apache/tajo/tuple/memory/TestMemoryRowBlock.java b/tajo-common/src/test/java/org/apache/tajo/tuple/memory/TestMemoryRowBlock.java
new file mode 100644
index 0000000..a6003c7
--- /dev/null
+++ b/tajo-common/src/test/java/org/apache/tajo/tuple/memory/TestMemoryRowBlock.java
@@ -0,0 +1,595 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.memory;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.ProtobufDatum;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.tuple.RowBlockReader;
+import org.apache.tajo.unit.StorageUnit;
+import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.NumberUtil;
+import org.apache.tajo.util.ProtoUtil;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import static org.apache.tajo.common.TajoDataTypes.Type;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestMemoryRowBlock {
+ private static final Log LOG = LogFactory.getLog(TestMemoryRowBlock.class);
+ public static String UNICODE_FIELD_PREFIX = "abc_가나다_";
+ public static DataType[] schema;
+
+ static {
+ schema = new DataType[] {
+ DataType.newBuilder().setType(Type.BOOLEAN).build(),
+ DataType.newBuilder().setType(Type.INT2).build(),
+ DataType.newBuilder().setType(Type.INT4).build(),
+ DataType.newBuilder().setType(Type.INT8).build(),
+ DataType.newBuilder().setType(Type.FLOAT4).build(),
+ DataType.newBuilder().setType(Type.FLOAT8).build(),
+ DataType.newBuilder().setType(Type.TEXT).build(),
+ DataType.newBuilder().setType(Type.TIMESTAMP).build(),
+ DataType.newBuilder().setType(Type.DATE).build(),
+ DataType.newBuilder().setType(Type.TIME).build(),
+ DataType.newBuilder().setType(Type.INTERVAL).build(),
+ DataType.newBuilder().setType(Type.INET4).build(),
+ DataType.newBuilder().setType(Type.PROTOBUF).setCode(PrimitiveProtos.StringProto.class.getName()).build()
+ };
+ }
+
+ private void explainRowBlockAllocation(MemoryRowBlock rowBlock, long startTime, long endTime) {
+ LOG.info(FileUtil.humanReadableByteCount(rowBlock.capacity(), true) + " bytes allocated "
+ + (endTime - startTime) + " msec");
+ }
+
+ @Test
+ public void testPutAndReadValidation() {
+ int rowNum = 1000;
+
+ long allocStart = System.currentTimeMillis();
+ MemoryRowBlock rowBlock = new MemoryRowBlock(schema, 1024);
+ long allocEnd = System.currentTimeMillis();
+ explainRowBlockAllocation(rowBlock, allocStart, allocEnd);
+
+ RowBlockReader reader = null;
+
+ ZeroCopyTuple tuple = new UnSafeTuple();
+ long writeStart = System.currentTimeMillis();
+ for (int i = 0; i < rowNum; i++) {
+ fillRow(i, rowBlock.getWriter());
+
+ reader = rowBlock.getReader();
+ int j = 0;
+ while(reader.next(tuple)) {
+ validateTupleResult(j, tuple);
+ j++;
+ }
+ }
+
+ assertNotNull(reader);
+ long writeEnd = System.currentTimeMillis();
+ LOG.info("writing and validating take " + (writeEnd - writeStart) + " msec");
+
+ long readStart = System.currentTimeMillis();
+ tuple = new UnSafeTuple();
+ int j = 0;
+ reader.reset();
+ while(reader.next(tuple)) {
+ validateTupleResult(j, tuple);
+ j++;
+ }
+ assertEquals(rowNum, j);
+ long readEnd = System.currentTimeMillis();
+ LOG.info("reading takes " + (readEnd - readStart) + " msec");
+
+ rowBlock.release();
+ }
+
+ @Test
+ public void testNullityValidation() {
+ int rowNum = 1000;
+
+ long allocStart = System.currentTimeMillis();
+ MemoryRowBlock rowBlock = new MemoryRowBlock(schema, 1024);
+ long allocEnd = System.currentTimeMillis();
+ explainRowBlockAllocation(rowBlock, allocStart, allocEnd);
+
+ RowBlockReader reader = null;
+ ZeroCopyTuple tuple = new UnSafeTuple();
+ long writeStart = System.currentTimeMillis();
+ for (int i = 0; i < rowNum; i++) {
+
+ fillRowBlockWithNull(i, rowBlock.getWriter());
+
+ reader = rowBlock.getReader();
+ int j = 0;
+ while(reader.next(tuple)) {
+ validateNullity(j, tuple);
+
+ j++;
+ }
+ }
+
+ assertNotNull(reader);
+ long writeEnd = System.currentTimeMillis();
+ LOG.info("writing and nullity validating take " + (writeEnd - writeStart) + " msec");
+
+ long readStart = System.currentTimeMillis();
+ tuple = new UnSafeTuple();
+ int j = 0;
+ reader.reset();
+ while(reader.next(tuple)) {
+ validateNullity(j, tuple);
+
+ j++;
+ }
+ assertEquals(rowNum, j);
+ long readEnd = System.currentTimeMillis();
+ LOG.info("reading takes " + (readEnd - readStart) + " msec");
+
+ rowBlock.release();
+ }
+
+ @Test
+ public void testEmptyRow() {
+ int rowNum = 1000;
+
+ long allocStart = System.currentTimeMillis();
+ MemoryRowBlock rowBlock = new MemoryRowBlock(schema, StorageUnit.MB * 10);
+ long allocEnd = System.currentTimeMillis();
+ explainRowBlockAllocation(rowBlock, allocStart, allocEnd);
+
+ long writeStart = System.currentTimeMillis();
+ for (int i = 0; i < rowNum; i++) {
+ rowBlock.getWriter().startRow();
+ // empty columns
+ rowBlock.getWriter().endRow();
+ }
+ long writeEnd = System.currentTimeMillis();
+ LOG.info("writing tooks " + (writeEnd - writeStart) + " msec");
+
+ RowBlockReader reader = rowBlock.getReader();
+
+ long readStart = System.currentTimeMillis();
+ ZeroCopyTuple tuple = new UnSafeTuple();
+ int j = 0;
+ reader.reset();
+ while(reader.next(tuple)) {
+ j++;
+ }
+
+ assertEquals(rowNum, j);
+ long readEnd = System.currentTimeMillis();
+ LOG.info("reading takes " + (readEnd - readStart) + " msec");
+ rowBlock.release();
+
+ assertEquals(rowNum, j);
+ assertEquals(rowNum, rowBlock.rows());
+ }
+
+ @Test
+ public void testSortBenchmark() {
+ int rowNum = 1000;
+
+ MemoryRowBlock rowBlock = createRowBlock(rowNum);
+ List<ZeroCopyTuple> unSafeTuples = Lists.newArrayList();
+
+ long readStart = System.currentTimeMillis();
+ ZeroCopyTuple tuple = new UnSafeTuple();
+
+ RowBlockReader reader = rowBlock.getReader();
+ while(reader.next(tuple)) {
+ unSafeTuples.add(tuple);
+ tuple = new UnSafeTuple();
+ }
+ long readEnd = System.currentTimeMillis();
+ LOG.info("reading takes " + (readEnd - readStart) + " msec");
+
+ long sortStart = System.currentTimeMillis();
+ Collections.sort(unSafeTuples, new Comparator<ZeroCopyTuple>() {
+ @Override
+ public int compare(ZeroCopyTuple t1, ZeroCopyTuple t2) {
+ return NumberUtil.compare(t1.getInt4(2), t2.getInt4(2));
+ }
+ });
+ long sortEnd = System.currentTimeMillis();
+ LOG.info("sorting took " + (sortEnd - sortStart) + " msec");
+ rowBlock.release();
+ }
+
+ @Test
+ public void testVTuplePutAndGetBenchmark() {
+ int rowNum = 1000;
+
+ List<VTuple> rowBlock = Lists.newArrayList();
+ long writeStart = System.currentTimeMillis();
+ VTuple tuple;
+ for (int i = 0; i < rowNum; i++) {
+ tuple = new VTuple(schema.length);
+ fillVTuple(i, tuple);
+ rowBlock.add(tuple);
+ }
+ long writeEnd = System.currentTimeMillis();
+ LOG.info("Writing takes " + (writeEnd - writeStart) + " msec");
+
+ long readStart = System.currentTimeMillis();
+ int j = 0;
+ for (VTuple t : rowBlock) {
+ validateTupleResult(j, t);
+ j++;
+ }
+
+ assertEquals(rowNum, j);
+ long readEnd = System.currentTimeMillis();
+ LOG.info("reading takes " + (readEnd - readStart) + " msec");
+
+ int count = 0;
+ for (int l = 0; l < rowBlock.size(); l++) {
+ for(int m = 0; m < schema.length; m++ ) {
+ if (rowBlock.get(l).contains(m) && rowBlock.get(l).get(m).type() == Type.INT4) {
+ count ++;
+ }
+ }
+ }
+ // For preventing unnecessary code elimination optimization.
+ LOG.info("The number of INT4 values is " + count + ".");
+ }
+
+ @Test
+ public void testVTuplePutAndGetBenchmarkViaDirectRowEncoder() {
+ int rowNum = 1000;
+
+ MemoryRowBlock rowBlock = new MemoryRowBlock(schema, StorageUnit.MB * 100);
+
+ long writeStart = System.currentTimeMillis();
+ VTuple tuple = new VTuple(schema.length);
+ for (int i = 0; i < rowNum; i++) {
+ fillVTuple(i, tuple);
+ rowBlock.getWriter().addTuple(tuple);
+ }
+ long writeEnd = System.currentTimeMillis();
+ LOG.info("Writing takes " + (writeEnd - writeStart) + " msec");
+
+ validateResults(rowBlock);
+ rowBlock.release();
+ }
+
+ @Test
+ public void testSerDerOfRowBlock() {
+ int rowNum = 1000;
+
+ MemoryRowBlock rowBlock = createRowBlock(rowNum);
+
+ MemoryRowBlock restoredRowBlock = new MemoryRowBlock(rowBlock);
+ validateResults(restoredRowBlock);
+ rowBlock.release();
+ }
+
+ @Test
+ public void testSerDerOfZeroCopyTuple() {
+ int rowNum = 1000;
+
+ MemoryRowBlock rowBlock = createRowBlock(rowNum);
+
+ MemoryRowBlock restoredRowBlock = new MemoryRowBlock(rowBlock);
+ RowBlockReader reader = restoredRowBlock.getReader();
+
+ long readStart = System.currentTimeMillis();
+ UnSafeTuple tuple = new UnSafeTuple();
+
+ int j = 0;
+ List<ZeroCopyTuple> copyTuples = Lists.newArrayList();
+
+ while (reader.next(tuple)) {
+ validateTupleResult(j, tuple);
+
+ UnSafeTuple copyTuple = new UnSafeTuple();
+ copyTuple.set(tuple);
+ copyTuples.add(copyTuple);
+
+ j++;
+ }
+
+ assertEquals(rowNum, j);
+
+ for (int i = 0; i < j; i++) {
+ validateTupleResult(i, copyTuples.get(i));
+ }
+
+ long readEnd = System.currentTimeMillis();
+ LOG.info("reading takes " + (readEnd - readStart) + " msec");
+
+ rowBlock.release();
+ }
+
+ public static MemoryRowBlock createRowBlock(int rowNum) {
+ long allocateStart = System.currentTimeMillis();
+ MemoryRowBlock rowBlock = new MemoryRowBlock(schema, StorageUnit.MB * 8);
+ long allocatedEnd = System.currentTimeMillis();
+ LOG.info(FileUtil.humanReadableByteCount(rowBlock.capacity(), true) + " bytes allocated "
+ + (allocatedEnd - allocateStart) + " msec");
+
+ long writeStart = System.currentTimeMillis();
+ for (int i = 0; i < rowNum; i++) {
+ fillRow(i, rowBlock.getWriter());
+ }
+ long writeEnd = System.currentTimeMillis();
+ LOG.info("writing takes " + (writeEnd - writeStart) + " msec");
+
+ return rowBlock;
+ }
+
+ public static MemoryRowBlock createRowBlockWithNull(int rowNum) {
+ long allocateStart = System.currentTimeMillis();
+ MemoryRowBlock rowBlock = new MemoryRowBlock(schema, StorageUnit.MB * 8);
+ long allocatedEnd = System.currentTimeMillis();
+ LOG.info(FileUtil.humanReadableByteCount(rowBlock.capacity(), true) + " bytes allocated "
+ + (allocatedEnd - allocateStart) + " msec");
+
+ long writeStart = System.currentTimeMillis();
+ for (int i = 0; i < rowNum; i++) {
+ fillRowBlockWithNull(i, rowBlock.getWriter());
+ }
+ long writeEnd = System.currentTimeMillis();
+ LOG.info("writing and validating take " + (writeEnd - writeStart) + " msec");
+
+ return rowBlock;
+ }
+
+ public static void fillRow(int i, RowWriter builder) {
+ builder.startRow();
+ builder.putBool(i % 1 == 0 ? true : false); // 0
+ builder.putInt2((short) 1); // 1
+ builder.putInt4(i); // 2
+ builder.putInt8(i); // 3
+ builder.putFloat4(i); // 4
+ builder.putFloat8(i); // 5
+ builder.putText(UNICODE_FIELD_PREFIX + i); // 6
+ builder.putTimestamp(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + i); // 7
+ builder.putDate(DatumFactory.createDate("2014-04-16").asInt4() + i); // 8
+ builder.putTime(DatumFactory.createTime("08:48:00").asInt8() + i); // 9
+ builder.putInterval(DatumFactory.createInterval((i + 1) + " hours")); // 10
+ builder.putInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i); // 11
+ builder.putProtoDatum(new ProtobufDatum(ProtoUtil.convertString(i + ""))); // 12
+ builder.endRow();
+ }
+
+ public static void fillRowBlockWithNull(int i, RowWriter writer) {
+ writer.startRow();
+
+ if (i == 0) {
+ writer.skipField();
+ } else {
+ writer.putBool(i % 1 == 0 ? true : false); // 0
+ }
+ if (i % 1 == 0) {
+ writer.skipField();
+ } else {
+ writer.putInt2((short) 1); // 1
+ }
+
+ if (i % 2 == 0) {
+ writer.skipField();
+ } else {
+ writer.putInt4(i); // 2
+ }
+
+ if (i % 3 == 0) {
+ writer.skipField();
+ } else {
+ writer.putInt8(i); // 3
+ }
+
+ if (i % 4 == 0) {
+ writer.skipField();
+ } else {
+ writer.putFloat4(i); // 4
+ }
+
+ if (i % 5 == 0) {
+ writer.skipField();
+ } else {
+ writer.putFloat8(i); // 5
+ }
+
+ if (i % 6 == 0) {
+ writer.skipField();
+ } else {
+ writer.putText(UNICODE_FIELD_PREFIX + i); // 6
+ }
+
+ if (i % 7 == 0) {
+ writer.skipField();
+ } else {
+ writer.putTimestamp(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + i); // 7
+ }
+
+ if (i % 8 == 0) {
+ writer.skipField();
+ } else {
+ writer.putDate(DatumFactory.createDate("2014-04-16").asInt4() + i); // 8
+ }
+
+ if (i % 9 == 0) {
+ writer.skipField();
+ } else {
+ writer.putTime(DatumFactory.createTime("08:48:00").asInt8() + i); // 9
+ }
+
+ if (i % 10 == 0) {
+ writer.skipField();
+ } else {
+ writer.putInterval(DatumFactory.createInterval((i + 1) + " hours")); // 10
+ }
+
+ if (i % 11 == 0) {
+ writer.skipField();
+ } else {
+ writer.putInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i); // 11
+ }
+
+ if (i % 12 == 0) {
+ writer.skipField();
+ } else {
+ writer.putProtoDatum(new ProtobufDatum(ProtoUtil.convertString(i + ""))); // 12
+ }
+
+ writer.endRow();
+ }
+
+ public static void fillVTuple(int i, VTuple tuple) {
+ tuple.put(0, DatumFactory.createBool(i % 1 == 0));
+ tuple.put(1, DatumFactory.createInt2((short) 1));
+ tuple.put(2, DatumFactory.createInt4(i));
+ tuple.put(3, DatumFactory.createInt8(i));
+ tuple.put(4, DatumFactory.createFloat4(i));
+ tuple.put(5, DatumFactory.createFloat8(i));
+ tuple.put(6, DatumFactory.createText(UNICODE_FIELD_PREFIX + i));
+ tuple.put(7, DatumFactory.createTimestamp(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + i)); // 7
+ tuple.put(8, DatumFactory.createDate(DatumFactory.createDate("2014-04-16").asInt4() + i)); // 8
+ tuple.put(9, DatumFactory.createTime(DatumFactory.createTime("08:48:00").asInt8() + i)); // 9
+ tuple.put(10, DatumFactory.createInterval((i + 1) + " hours")); // 10
+ tuple.put(11, DatumFactory.createInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i)); // 11
+ tuple.put(12, new ProtobufDatum(ProtoUtil.convertString(i + ""))); // 12;
+ }
+
+ public static void validateResults(MemoryRowBlock rowBlock) {
+ long readStart = System.currentTimeMillis();
+ ZeroCopyTuple tuple = new UnSafeTuple();
+ int j = 0;
+ OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
+ reader.reset();
+ while(reader.next(tuple)) {
+ validateTupleResult(j, tuple);
+ j++;
+ }
+ assertEquals(rowBlock.rows(), j);
+ long readEnd = System.currentTimeMillis();
+ LOG.info("Reading takes " + (readEnd - readStart) + " msec");
+ }
+
+ public static void validateTupleResult(int j, Tuple t) {
+ assertTrue((j % 1 == 0) == t.getBool(0));
+ assertTrue(1 == t.getInt2(1));
+ assertEquals(j, t.getInt4(2));
+ assertEquals(j, t.getInt8(3));
+ assertTrue(j == t.getFloat4(4));
+ assertTrue(j == t.getFloat8(5));
+ assertEquals(UNICODE_FIELD_PREFIX + j, t.getText(6));
+ assertEquals(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + (long) j, t.getInt8(7));
+ assertEquals(DatumFactory.createDate("2014-04-16").asInt4() + j, t.getInt4(8));
+ assertEquals(DatumFactory.createTime("08:48:00").asInt8() + j, t.getInt8(9));
+ assertEquals(DatumFactory.createInterval((j + 1) + " hours"), t.getInterval(10));
+ assertEquals(DatumFactory.createInet4("192.168.0.1").asInt4() + j, t.getInt4(11));
+ assertEquals(new ProtobufDatum(ProtoUtil.convertString(j + "")), t.getProtobufDatum(12));
+ }
+
+ public static void validateNullity(int j, Tuple tuple) {
+ if (j == 0) {
+ tuple.isBlankOrNull(0);
+ } else {
+ assertTrue((j % 1 == 0) == tuple.getBool(0));
+ }
+
+ if (j % 1 == 0) {
+ tuple.isBlankOrNull(1);
+ } else {
+ assertTrue(1 == tuple.getInt2(1));
+ }
+
+ if (j % 2 == 0) {
+ tuple.isBlankOrNull(2);
+ } else {
+ assertEquals(j, tuple.getInt4(2));
+ }
+
+ if (j % 3 == 0) {
+ tuple.isBlankOrNull(3);
+ } else {
+ assertEquals(j, tuple.getInt8(3));
+ }
+
+ if (j % 4 == 0) {
+ tuple.isBlankOrNull(4);
+ } else {
+ assertTrue(j == tuple.getFloat4(4));
+ }
+
+ if (j % 5 == 0) {
+ tuple.isBlankOrNull(5);
+ } else {
+ assertTrue(j == tuple.getFloat8(5));
+ }
+
+ if (j % 6 == 0) {
+ tuple.isBlankOrNull(6);
+ } else {
+ assertEquals(UNICODE_FIELD_PREFIX + j, tuple.getText(6));
+ }
+
+ if (j % 7 == 0) {
+ tuple.isBlankOrNull(7);
+ } else {
+ assertEquals(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + (long) j, tuple.getInt8(7));
+ }
+
+ if (j % 8 == 0) {
+ tuple.isBlankOrNull(8);
+ } else {
+ assertEquals(DatumFactory.createDate("2014-04-16").asInt4() + j, tuple.getInt4(8));
+ }
+
+ if (j % 9 == 0) {
+ tuple.isBlankOrNull(9);
+ } else {
+ assertEquals(DatumFactory.createTime("08:48:00").asInt8() + j, tuple.getInt8(9));
+ }
+
+ if (j % 10 == 0) {
+ tuple.isBlankOrNull(10);
+ } else {
+ assertEquals(DatumFactory.createInterval((j + 1) + " hours"), tuple.getInterval(10));
+ }
+
+ if (j % 11 == 0) {
+ tuple.isBlankOrNull(11);
+ } else {
+ assertEquals(DatumFactory.createInet4("192.168.0.1").asInt4() + j, tuple.getInt4(11));
+ }
+
+ if (j % 12 == 0) {
+ tuple.isBlankOrNull(12);
+ } else {
+ assertEquals(new ProtobufDatum(ProtoUtil.convertString(j + "")), tuple.getProtobufDatum(12));
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-common/src/test/java/org/apache/tajo/tuple/memory/TestResizableSpec.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/test/java/org/apache/tajo/tuple/memory/TestResizableSpec.java b/tajo-common/src/test/java/org/apache/tajo/tuple/memory/TestResizableSpec.java
new file mode 100644
index 0000000..483dad5
--- /dev/null
+++ b/tajo-common/src/test/java/org/apache/tajo/tuple/memory/TestResizableSpec.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.memory;
+
+import org.apache.tajo.unit.StorageUnit;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TestResizableSpec {
+
+ @Test
+ public void testResizableLimit() {
+ ResizableLimitSpec limit = new ResizableLimitSpec(10 * StorageUnit.MB, 1000 * StorageUnit.MB, 0.1f, 1.0f);
+
+ long expectedMaxSize = (long) (1000 * StorageUnit.MB + (1000 * StorageUnit.MB * 0.1f));
+
+ assertTrue(limit.limit() == 1000 * StorageUnit.MB + (1000 * StorageUnit.MB * 0.1f));
+
+ assertEquals(20971520, limit.increasedSize(10 * StorageUnit.MB));
+
+ assertEquals(expectedMaxSize, limit.increasedSize(1600 * StorageUnit.MB));
+
+ assertEquals(0.98f, limit.remainRatio(980 * StorageUnit.MB), 0.1);
+
+ assertFalse(limit.canIncrease(limit.limit()));
+ }
+
+ @Test
+ public void testFixedLimit() {
+ FixedSizeLimitSpec limit = new FixedSizeLimitSpec(100 * StorageUnit.MB, 0.0f);
+
+ assertEquals(limit.limit(), 100 * StorageUnit.MB);
+
+ assertEquals(100 * StorageUnit.MB, limit.increasedSize(1000));
+
+ assertEquals(100 * StorageUnit.MB, limit.increasedSize(1600 * StorageUnit.MB));
+
+ assertTrue(0.98f == limit.remainRatio(98 * StorageUnit.MB));
+
+ assertFalse(limit.canIncrease(limit.limit()));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-core-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core-tests/pom.xml b/tajo-core-tests/pom.xml
index a23e420..8199f46 100644
--- a/tajo-core-tests/pom.xml
+++ b/tajo-core-tests/pom.xml
@@ -225,6 +225,10 @@
<groupId>com.sun.jersey.jersey-test-framework</groupId>
<artifactId>jersey-test-framework-grizzly2</artifactId>
</exclusion>
+ <exclusion>
+ <artifactId>netty-all</artifactId>
+ <groupId>io.netty</groupId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -280,12 +284,24 @@
<version>${hbase.version}</version>
<type>test-jar</type>
<scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>netty-all</artifactId>
+ <groupId>io.netty</groupId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
<scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>netty-all</artifactId>
+ <groupId>io.netty</groupId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-jdbc/pom.xml b/tajo-jdbc/pom.xml
index 27cc471..09fabdd 100644
--- a/tajo-jdbc/pom.xml
+++ b/tajo-jdbc/pom.xml
@@ -147,6 +147,10 @@
<groupId>com.sun.jersey.jersey-test-framework</groupId>
<artifactId>jersey-test-framework-grizzly2</artifactId>
</exclusion>
+ <exclusion>
+ <artifactId>netty-all</artifactId>
+ <groupId>io.netty</groupId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-common/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/pom.xml b/tajo-storage/tajo-storage-common/pom.xml
index afbeead..4321a13 100644
--- a/tajo-storage/tajo-storage-common/pom.xml
+++ b/tajo-storage/tajo-storage-common/pom.xml
@@ -222,6 +222,10 @@ limitations under the License.
<groupId>com.sun.jersey.jersey-test-framework</groupId>
<artifactId>jersey-test-framework-grizzly2</artifactId>
</exclusion>
+ <exclusion>
+ <artifactId>netty-all</artifactId>
+ <groupId>io.netty</groupId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -273,6 +277,10 @@ limitations under the License.
<artifactId>hadoop-mapreduce-client-core</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
+ <exclusion>
+ <artifactId>netty-all</artifactId>
+ <groupId>io.netty</groupId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java
deleted file mode 100644
index d611ee3..0000000
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.buffer.UnpooledByteBufAllocator;
-import io.netty.util.ResourceLeakDetector;
-import io.netty.util.internal.PlatformDependent;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.util.CommonTestingUtil;
-
-import java.lang.reflect.Field;
-
-/* this class is PooledBuffer holder */
-public class BufferPool {
-
- public static final String ALLOW_CACHE = "tajo.storage.buffer.thread-local.cache";
- private static final ByteBufAllocator ALLOCATOR;
-
- private BufferPool() {
- }
-
- static {
- /* TODO Enable thread cache
- * Create a pooled ByteBuf allocator but disables the thread-local cache.
- * Because the TaskRunner thread is newly created
- * */
-
- if (System.getProperty(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equalsIgnoreCase("TRUE")) {
- /* Disable pooling buffers for memory usage */
- ALLOCATOR = UnpooledByteBufAllocator.DEFAULT;
-
- /* if you are finding memory leak, please enable this line */
- ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED);
- } else {
- TajoConf tajoConf = new TajoConf();
- ALLOCATOR = createPooledByteBufAllocator(true, tajoConf.getBoolean(ALLOW_CACHE, false), 0);
- }
- }
-
- /**
- * borrowed from Spark
- */
- public static PooledByteBufAllocator createPooledByteBufAllocator(
- boolean allowDirectBufs,
- boolean allowCache,
- int numCores) {
- if (numCores == 0) {
- numCores = Runtime.getRuntime().availableProcessors();
- }
- return new PooledByteBufAllocator(
- allowDirectBufs && PlatformDependent.directBufferPreferred(),
- Math.min(getPrivateStaticField("DEFAULT_NUM_HEAP_ARENA"), numCores),
- Math.min(getPrivateStaticField("DEFAULT_NUM_DIRECT_ARENA"), allowDirectBufs ? numCores : 0),
- getPrivateStaticField("DEFAULT_PAGE_SIZE"),
- getPrivateStaticField("DEFAULT_MAX_ORDER"),
- allowCache ? getPrivateStaticField("DEFAULT_TINY_CACHE_SIZE") : 0,
- allowCache ? getPrivateStaticField("DEFAULT_SMALL_CACHE_SIZE") : 0,
- allowCache ? getPrivateStaticField("DEFAULT_NORMAL_CACHE_SIZE") : 0
- );
- }
-
- /** Used to get defaults from Netty's private static fields. */
- private static int getPrivateStaticField(String name) {
- try {
- Field f = PooledByteBufAllocator.DEFAULT.getClass().getDeclaredField(name);
- f.setAccessible(true);
- return f.getInt(null);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- public static long maxDirectMemory() {
- return PlatformDependent.maxDirectMemory();
- }
-
-
- public static ByteBuf directBuffer(int size) {
- return ALLOCATOR.directBuffer(size);
- }
-
- /**
- *
- * @param size the initial capacity
- * @param max the max capacity
- * @return allocated ByteBuf from pool
- */
- public static ByteBuf directBuffer(int size, int max) {
- return ALLOCATOR.directBuffer(size, max);
- }
-
- @InterfaceStability.Unstable
- public static void forceRelease(ByteBuf buf) {
- buf.release(buf.refCnt());
- }
-
- /**
- * the ByteBuf will increase to writable size
- * @param buf
- * @param minWritableBytes required minimum writable size
- */
- public static void ensureWritable(ByteBuf buf, int minWritableBytes) {
- buf.ensureWritable(minWritableBytes);
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
index 7708d52..8ca55cc 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
@@ -23,11 +23,9 @@ import org.apache.tajo.catalog.Schema;
import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.datum.IntervalDatum;
-import org.apache.tajo.datum.ProtobufDatum;
import org.apache.tajo.exception.TajoRuntimeException;
import org.apache.tajo.exception.UnsupportedException;
import org.apache.tajo.exception.ValueTooLongForTypeCharactersException;
-import org.apache.tajo.tuple.offheap.RowWriter;
import org.apache.tajo.util.BitArray;
import java.nio.ByteBuffer;
@@ -335,56 +333,4 @@ public class RowStoreUtil {
return schema;
}
}
-
- public static void convert(Tuple tuple, RowWriter writer) {
- writer.startRow();
-
- for (int i = 0; i < writer.dataTypes().length; i++) {
- if (tuple.isBlankOrNull(i)) {
- writer.skipField();
- continue;
- }
- switch (writer.dataTypes()[i].getType()) {
- case BOOLEAN:
- writer.putBool(tuple.getBool(i));
- break;
- case INT1:
- case INT2:
- writer.putInt2(tuple.getInt2(i));
- break;
- case INT4:
- case DATE:
- case INET4:
- writer.putInt4(tuple.getInt4(i));
- break;
- case INT8:
- case TIMESTAMP:
- case TIME:
- writer.putInt8(tuple.getInt8(i));
- break;
- case FLOAT4:
- writer.putFloat4(tuple.getFloat4(i));
- break;
- case FLOAT8:
- writer.putFloat8(tuple.getFloat8(i));
- break;
- case TEXT:
- writer.putText(tuple.getBytes(i));
- break;
- case INTERVAL:
- writer.putInterval((IntervalDatum) tuple.getInterval(i));
- break;
- case PROTOBUF:
- writer.putProtoDatum((ProtobufDatum) tuple.getProtobufDatum(i));
- break;
- case NULL_TYPE:
- writer.skipField();
- break;
- default:
- throw new TajoRuntimeException(
- new UnsupportedException("unknown data type '" + writer.dataTypes()[i].getType().name() + "'"));
- }
- }
- writer.endRow();
- }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
deleted file mode 100644
index c1835df..0000000
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SchemaUtil;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.tuple.offheap.HeapTuple;
-import org.apache.tajo.tuple.offheap.OffHeapRowWriter;
-import org.apache.tajo.tuple.offheap.ZeroCopyTuple;
-import org.apache.tajo.unit.StorageUnit;
-import org.apache.tajo.util.Deallocatable;
-import org.apache.tajo.util.FileUtil;
-import org.apache.tajo.util.UnsafeUtil;
-import sun.misc.Unsafe;
-import sun.nio.ch.DirectBuffer;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
-public class BaseTupleBuilder extends OffHeapRowWriter implements TupleBuilder, Deallocatable {
- private static final Log LOG = LogFactory.getLog(BaseTupleBuilder.class);
-
- private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
-
- // buffer
- private ByteBuffer buffer;
- private long address;
-
- public BaseTupleBuilder(Schema schema) {
- super(SchemaUtil.toDataTypes(schema));
- buffer = ByteBuffer.allocateDirect(64 * StorageUnit.KB).order(ByteOrder.nativeOrder());
- address = UnsafeUtil.getAddress(buffer);
- }
-
- @Override
- public long address() {
- return address;
- }
-
- public void ensureSize(int size) {
- if (buffer.remaining() - size < 0) { // check the remain size
- // enlarge new buffer and copy writing data
- int newBlockSize = UnsafeUtil.alignedSize(buffer.capacity() * 2);
- ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newBlockSize);
- long newAddress = ((DirectBuffer)newByteBuf).address();
- UNSAFE.copyMemory(this.address, newAddress, buffer.limit());
- LOG.debug("Increase the buffer size to " + FileUtil.humanReadableByteCount(newBlockSize, false));
-
- // release existing buffer and replace variables
- UnsafeUtil.free(buffer);
- buffer = newByteBuf;
- address = newAddress;
- }
- }
-
- @Override
- public int position() {
- return 0;
- }
-
- @Override
- public void forward(int length) {
- }
-
- @Override
- public void endRow() {
- super.endRow();
- buffer.position(0).limit(offset());
- }
-
- @Override
- public Tuple build() {
- return buildToHeapTuple();
- }
-
- public HeapTuple buildToHeapTuple() {
- byte [] bytes = new byte[buffer.limit()];
- UNSAFE.copyMemory(null, address, bytes, UnsafeUtil.ARRAY_BOOLEAN_BASE_OFFSET, buffer.limit());
- return new HeapTuple(bytes, dataTypes());
- }
-
- public ZeroCopyTuple buildToZeroCopyTuple() {
- ZeroCopyTuple zcTuple = new ZeroCopyTuple();
- zcTuple.set(buffer, 0, buffer.limit(), dataTypes());
- return zcTuple;
- }
-
- public void release() {
- UnsafeUtil.free(buffer);
- buffer = null;
- address = 0;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/RowBlockReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/RowBlockReader.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/RowBlockReader.java
deleted file mode 100644
index be734e1..0000000
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/RowBlockReader.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple;
-
-import org.apache.tajo.storage.Tuple;
-
-public interface RowBlockReader<T extends Tuple> {
-
- /**
- * Return for each tuple
- *
- * @return True if tuple block is filled with tuples. Otherwise, It will return false.
- */
- public boolean next(T tuple);
-
- public void reset();
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/TupleBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/TupleBuilder.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/TupleBuilder.java
deleted file mode 100644
index c43c018..0000000
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/TupleBuilder.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple;
-
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.tuple.offheap.RowWriter;
-
-public interface TupleBuilder extends RowWriter {
- public Tuple build();
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java
deleted file mode 100644
index 9662d5a..0000000
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import org.apache.tajo.util.Deallocatable;
-import org.apache.tajo.util.UnsafeUtil;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
-import static org.apache.tajo.common.TajoDataTypes.DataType;
-
-public class DirectBufTuple extends UnSafeTuple implements Deallocatable {
- private ByteBuffer bb;
-
- public DirectBufTuple(int length, DataType[] types) {
- bb = ByteBuffer.allocateDirect(length).order(ByteOrder.nativeOrder());
- set(bb, 0, length, types);
- }
-
- @Override
- public void release() {
- UnsafeUtil.free(bb);
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java
deleted file mode 100644
index a327123..0000000
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-/**
- * Fixed size limit specification
- */
-public class FixedSizeLimitSpec extends ResizableLimitSpec {
- public FixedSizeLimitSpec(long size) {
- super(size, size);
- }
-
- public FixedSizeLimitSpec(long size, float allowedOverflowRatio) {
- super(size, size, allowedOverflowRatio);
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
deleted file mode 100644
index 9b69536..0000000
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
+++ /dev/null
@@ -1,292 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.Message;
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.datum.*;
-import org.apache.tajo.exception.TajoRuntimeException;
-import org.apache.tajo.exception.UnsupportedException;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
-import org.apache.tajo.util.SizeOf;
-import org.apache.tajo.util.StringUtils;
-import org.apache.tajo.util.UnsafeUtil;
-import org.apache.tajo.util.datetime.TimeMeta;
-import sun.misc.Unsafe;
-
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-
-import static org.apache.tajo.common.TajoDataTypes.DataType;
-
-public class HeapTuple implements Tuple {
- private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
- private static final long BASE_OFFSET = UnsafeUtil.ARRAY_BYTE_BASE_OFFSET;
-
- private final byte [] data;
- private final DataType [] types;
-
- public HeapTuple(final byte [] bytes, final DataType [] types) {
- this.data = bytes;
- this.types = types;
- }
-
- @Override
- public int size() {
- return data.length;
- }
-
- @Override
- public TajoDataTypes.Type type(int fieldId) {
- return types[fieldId].getType();
- }
-
- @Override
- public int size(int fieldId) {
- return UNSAFE.getInt(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
- }
-
- @Override
- public void clearOffset() {
- }
-
- public ByteBuffer nioBuffer() {
- return ByteBuffer.wrap(data);
- }
-
- private int getFieldOffset(int fieldId) {
- return UNSAFE.getInt(data, BASE_OFFSET + SizeOf.SIZE_OF_INT + (fieldId * SizeOf.SIZE_OF_INT));
- }
-
- private int checkNullAndGetOffset(int fieldId) {
- int offset = getFieldOffset(fieldId);
- if (offset == OffHeapRowBlock.NULL_FIELD_OFFSET) {
- throw new RuntimeException("Invalid Field Access: " + fieldId);
- }
- return offset;
- }
-
- @Override
- public boolean contains(int fieldid) {
- return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET;
- }
-
- @Override
- public boolean isBlank(int fieldid) {
- return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET;
- }
-
- @Override
- public boolean isBlankOrNull(int fieldid) {
- return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET;
- }
-
- @Override
- public void put(int fieldId, Tuple tuple) {
- throw new TajoRuntimeException(new UnsupportedException());
- }
-
- @Override
- public void clear() {
- // nothing to do
- }
-
- @Override
- public void put(int fieldId, Datum value) {
- throw new TajoRuntimeException(new UnsupportedException());
- }
-
- @Override
- public void put(Datum[] values) {
- throw new TajoRuntimeException(new UnsupportedException());
- }
-
- @Override
- public Datum asDatum(int fieldId) {
- if (isBlankOrNull(fieldId)) {
- return NullDatum.get();
- }
-
- switch (types[fieldId].getType()) {
- case BOOLEAN:
- return DatumFactory.createBool(getBool(fieldId));
- case INT1:
- case INT2:
- return DatumFactory.createInt2(getInt2(fieldId));
- case INT4:
- return DatumFactory.createInt4(getInt4(fieldId));
- case INT8:
- return DatumFactory.createInt8(getInt4(fieldId));
- case FLOAT4:
- return DatumFactory.createFloat4(getFloat4(fieldId));
- case FLOAT8:
- return DatumFactory.createFloat8(getFloat8(fieldId));
- case TEXT:
- return DatumFactory.createText(getText(fieldId));
- case TIMESTAMP:
- return DatumFactory.createTimestamp(getInt8(fieldId));
- case DATE:
- return DatumFactory.createDate(getInt4(fieldId));
- case TIME:
- return DatumFactory.createTime(getInt8(fieldId));
- case INTERVAL:
- return getInterval(fieldId);
- case INET4:
- return DatumFactory.createInet4(getInt4(fieldId));
- case PROTOBUF:
- return getProtobufDatum(fieldId);
- default:
- throw new TajoRuntimeException(new UnsupportedException("data type '" + types[fieldId] + "'"));
- }
- }
-
- @Override
- public void setOffset(long offset) {
- }
-
- @Override
- public long getOffset() {
- return 0;
- }
-
- @Override
- public boolean getBool(int fieldId) {
- return UNSAFE.getByte(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)) == 0x01;
- }
-
- @Override
- public byte getByte(int fieldId) {
- return UNSAFE.getByte(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
- }
-
- @Override
- public char getChar(int fieldId) {
- return UNSAFE.getChar(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
- }
-
- @Override
- public byte[] getBytes(int fieldId) {
- long pos = checkNullAndGetOffset(fieldId);
- int len = UNSAFE.getInt(data, BASE_OFFSET + pos);
- pos += SizeOf.SIZE_OF_INT;
-
- byte [] bytes = new byte[len];
- UNSAFE.copyMemory(data, BASE_OFFSET + pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
- return bytes;
- }
-
- @Override
- public byte[] getTextBytes(int fieldId) {
- return getText(fieldId).getBytes();
- }
-
- @Override
- public short getInt2(int fieldId) {
- return UNSAFE.getShort(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
- }
-
- @Override
- public int getInt4(int fieldId) {
- return UNSAFE.getInt(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
- }
-
- @Override
- public long getInt8(int fieldId) {
- return UNSAFE.getLong(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
- }
-
- @Override
- public float getFloat4(int fieldId) {
- return UNSAFE.getFloat(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
- }
-
- @Override
- public double getFloat8(int fieldId) {
- return UNSAFE.getDouble(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
- }
-
- @Override
- public String getText(int fieldId) {
- return new String(getBytes(fieldId));
- }
-
- @Override
- public TimeMeta getTimeDate(int fieldId) {
- return asDatum(fieldId).asTimeMeta();
- }
-
- public IntervalDatum getInterval(int fieldId) {
- long pos = checkNullAndGetOffset(fieldId);
- int months = UNSAFE.getInt(data, BASE_OFFSET + pos);
- pos += SizeOf.SIZE_OF_INT;
- long millisecs = UNSAFE.getLong(data, BASE_OFFSET + pos);
- return new IntervalDatum(months, millisecs);
- }
-
- @Override
- public Datum getProtobufDatum(int fieldId) {
- byte [] bytes = getBytes(fieldId);
-
- ProtobufDatumFactory factory = ProtobufDatumFactory.get(types[fieldId].getCode());
- Message.Builder builder = factory.newBuilder();
- try {
- builder.mergeFrom(bytes);
- } catch (InvalidProtocolBufferException e) {
- return NullDatum.get();
- }
-
- return new ProtobufDatum(builder.build());
- }
-
- @Override
- public char[] getUnicodeChars(int fieldId) {
- long pos = checkNullAndGetOffset(fieldId);
- int len = UNSAFE.getInt(data, BASE_OFFSET + pos);
- pos += SizeOf.SIZE_OF_INT;
-
- byte [] bytes = new byte[len];
- UNSAFE.copyMemory(data, BASE_OFFSET + pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
- return StringUtils.convertBytesToChars(bytes, Charset.forName("UTF-8"));
- }
-
- @Override
- public Tuple clone() throws CloneNotSupportedException {
- return this;
- }
-
- @Override
- public Datum[] getValues() {
- Datum [] datums = new Datum[size()];
- for (int i = 0; i < size(); i++) {
- if (contains(i)) {
- datums[i] = asDatum(i);
- } else {
- datums[i] = NullDatum.get();
- }
- }
- return datums;
- }
-
- @Override
- public String toString() {
- return VTuple.toDisplayString(getValues());
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java
deleted file mode 100644
index 2f8e349..0000000
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.util.Deallocatable;
-import org.apache.tajo.util.FileUtil;
-import org.apache.tajo.util.UnsafeUtil;
-import sun.misc.Unsafe;
-import sun.nio.ch.DirectBuffer;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
-public class OffHeapMemory implements Deallocatable {
- private static final Log LOG = LogFactory.getLog(OffHeapMemory.class);
-
- protected static final Unsafe UNSAFE = UnsafeUtil.unsafe;
-
- protected ByteBuffer buffer;
- protected int memorySize;
- protected ResizableLimitSpec limitSpec;
- protected long address;
-
- @VisibleForTesting
- protected OffHeapMemory(ByteBuffer buffer, ResizableLimitSpec limitSpec) {
- this.buffer = buffer;
- this.address = ((DirectBuffer) buffer).address();
- this.memorySize = buffer.limit();
- this.limitSpec = limitSpec;
- }
-
- public OffHeapMemory(ResizableLimitSpec limitSpec) {
- this(ByteBuffer.allocateDirect((int) limitSpec.initialSize()).order(ByteOrder.nativeOrder()), limitSpec);
- }
-
- public long address() {
- return address;
- }
-
- public long size() {
- return memorySize;
- }
-
- public void resize(int newSize) {
- Preconditions.checkArgument(newSize > 0, "Size must be greater than 0 bytes");
-
- if (newSize > limitSpec.limit()) {
- throw new RuntimeException("Resize cannot exceed the size limit");
- }
-
- if (newSize < memorySize) {
- LOG.warn("The size reduction is ignored.");
- }
-
- int newBlockSize = UnsafeUtil.alignedSize(newSize);
- ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newBlockSize);
- long newAddress = ((DirectBuffer)newByteBuf).address();
-
- UNSAFE.copyMemory(this.address, newAddress, memorySize);
-
- UnsafeUtil.free(buffer);
- this.memorySize = newSize;
- this.buffer = newByteBuf;
- this.address = newAddress;
- }
-
- public java.nio.Buffer nioBuffer() {
- return (ByteBuffer) buffer.position(0).limit(memorySize);
- }
-
- @Override
- public void release() {
- UnsafeUtil.free(this.buffer);
- this.buffer = null;
- this.address = 0;
- this.memorySize = 0;
- }
-
- public String toString() {
- return "memory=" + FileUtil.humanReadableByteCount(memorySize, false) + "," + limitSpec;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java
deleted file mode 100644
index 90d4791..0000000
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SchemaUtil;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.storage.SeekableInputChannel;
-import org.apache.tajo.util.Deallocatable;
-import org.apache.tajo.util.FileUtil;
-import org.apache.tajo.util.SizeOf;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-
-import static org.apache.tajo.common.TajoDataTypes.DataType;
-
-public class OffHeapRowBlock extends OffHeapMemory implements Deallocatable {
- private static final Log LOG = LogFactory.getLog(OffHeapRowBlock.class);
-
- public static final int NULL_FIELD_OFFSET = -1;
-
- DataType [] dataTypes;
-
- // Basic States
- private int maxRowNum = Integer.MAX_VALUE; // optional
- private int rowNum;
- protected int position = 0;
-
- private OffHeapRowBlockWriter builder;
-
- private OffHeapRowBlock(ByteBuffer buffer, Schema schema, ResizableLimitSpec limitSpec) {
- super(buffer, limitSpec);
- initialize(schema);
- }
-
- public OffHeapRowBlock(Schema schema, ResizableLimitSpec limitSpec) {
- super(limitSpec);
- initialize(schema);
- }
-
- private void initialize(Schema schema) {
- dataTypes = SchemaUtil.toDataTypes(schema);
-
- this.builder = new OffHeapRowBlockWriter(this);
- }
-
- @VisibleForTesting
- public OffHeapRowBlock(Schema schema, int bytes) {
- this(schema, new ResizableLimitSpec(bytes));
- }
-
- @VisibleForTesting
- public OffHeapRowBlock(Schema schema, ByteBuffer buffer) {
- this(buffer, schema, ResizableLimitSpec.DEFAULT_LIMIT);
- }
-
- public void position(int pos) {
- this.position = pos;
- }
-
- public void clear() {
- this.position = 0;
- this.rowNum = 0;
-
- builder.clear();
- }
-
- @Override
- public ByteBuffer nioBuffer() {
- return (ByteBuffer) buffer.position(0).limit(position);
- }
-
- public int position() {
- return position;
- }
-
- public long usedMem() {
- return position;
- }
-
- /**
- * Ensure that this buffer has enough remaining space to add the size.
- * Creates and copies to a new buffer if necessary
- *
- * @param size Size to add
- */
- public void ensureSize(int size) {
- if (remain() - size < 0) {
- if (!limitSpec.canIncrease(memorySize)) {
- throw new RuntimeException("Cannot increase RowBlock anymore.");
- }
-
- int newBlockSize = limitSpec.increasedSize(memorySize);
- resize(newBlockSize);
- LOG.info("Increase DirectRowBlock to " + FileUtil.humanReadableByteCount(newBlockSize, false));
- }
- }
-
- public long remain() {
- return memorySize - position - builder.offset();
- }
-
- public int maxRowNum() {
- return maxRowNum;
- }
- public int rows() {
- return rowNum;
- }
-
- public void setRows(int rowNum) {
- this.rowNum = rowNum;
- }
-
-
- public boolean copyFromChannel(SeekableInputChannel channel, TableStats stats) throws IOException {
- if (channel.position() < channel.size()) {
- clear();
-
- buffer.clear();
- channel.read(buffer);
- memorySize = buffer.position();
-
- while (position < memorySize) {
- long recordPtr = address + position;
-
- if (remain() < SizeOf.SIZE_OF_INT) {
- channel.seek(channel.position() - remain());
- memorySize = (int) (memorySize - remain());
- return true;
- }
-
- int recordSize = UNSAFE.getInt(recordPtr);
-
- if (remain() < recordSize) {
- channel.seek(channel.position() - remain());
- memorySize = (int) (memorySize - remain());
- return true;
- }
-
- position += recordSize;
- rowNum++;
- }
-
- return true;
- } else {
- return false;
- }
- }
-
- public boolean copyFromChannel(FileChannel channel, TableStats stats) throws IOException {
- if (channel.position() < channel.size()) {
- clear();
-
- buffer.clear();
- channel.read(buffer);
- memorySize = buffer.position();
-
- while (position < memorySize) {
- long recordPtr = address + position;
-
- if (remain() < SizeOf.SIZE_OF_INT) {
- channel.position(channel.position() - remain());
- memorySize = (int) (memorySize - remain());
- return true;
- }
-
- int recordSize = UNSAFE.getInt(recordPtr);
-
- if (remain() < recordSize) {
- channel.position(channel.position() - remain());
- memorySize = (int) (memorySize - remain());
- return true;
- }
-
- position += recordSize;
- rowNum++;
- }
-
- return true;
- } else {
- return false;
- }
- }
-
- public RowWriter getWriter() {
- return builder;
- }
-
- public OffHeapRowBlockReader getReader() {
- return new OffHeapRowBlockReader(this);
- }
-}